Get List of Streams
GET /management/stream/streams
Response
If successful, returns a JSON representation of all streams.
This endpoint requires the metrics.streams.enabled=true.
Status Codes
- 200 _OK_ - Success.
- 403 FORBIDDEN - Stream metrics is not activated.
Sample
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/streams
[
{
"name": "bulk/recomputeThumbnails",
"partitions": 1,
"codec": "avro"
},
{
"name": "work/collections",
"partitions": 12,
"codec": "avro"
},
{
"name": "bulk/zipBlob",
"partitions": 2,
"codec": "avro"
},
...other streams...
]
List Consumers of a Stream
GET /management/stream/consumers
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| stream | string | The stream name. | Required |
Response
If successful, returns a JSON representation of the consumers for a given stream.
This endpoint requires the metrics.streams.enabled=true.
Status Codes
- 200 _OK_ - Success.
- 403 FORBIDDEN - Stream metrics is not activated.
Sample
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/consumers?stream=bulk/command
[
{
"stream": "bulk/command",
"consumer": "bulk/scroller"
}
]
View Stream Content (Cat and Tail)
GET /management/stream/cat
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| stream | string | The stream consumed by the consumer. | Required |
| limit | integer | The number of record to return. | Optional |
| fromGroup | string | Use the position of the consumer as starting point. | Optional |
| fromOffset | integer | Use the offset as starting point, must be used with partition parameter |
Optional |
| partition | integer | View record for this partition. | Optional |
| rewind | integer | Go back a number of record from the request position. | Optional |
| timeout | string | Time to wait for new records, format is 10s or 3min. |
Optional |
Response
If successful, returns a Server-Sent Events (SSE) stream of JSON records.
Status Codes
- 200 _OK_ - Success.
Sample
View 2 records from audit, the last processed and the next one:
curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/cat?stream=audit/audit&fromGroup=audit/writer&rewind=1&limit=2&timeout=1s"
data: {"type":"lag","group":"audit/writer","stream":"audit/audit","partition":0,"lag":4,"pos":2668,"end":2672}
data: {"type":"seek","group":"admin/streamServlet","stream":"audit/audit","partition":0,"pos":2667,"end":2672}
data: {"type":"record","stream":"audit/audit","partition":0,"offset":2667,"watermark":"2022-09-05 13:06:01.159","key":"23","length":390,"message": {"entity-type":"logEntry","id":0,"category":"NuxeoAuthentication","principalName":"Administrator","comment":"Administrator successfully logged in using AUTOMATION_BASIC_AUTH authentication","docLifeCycle":null,"docPath":null,"docType":null,"docUUID":null,"eventId":"loginSuccess","repositoryId":null,"eventDate":"2022-09-05T13:06:01.159Z","logDate":"2022-09-05T13:06:01.159Z","extended":{}}}
data: {"type":"record","stream":"audit/audit","partition":0,"offset":2668,"watermark":"2022-09-05 13:06:29.707","key":"24","length":390,"message": {"entity-type":"logEntry","id":0,"category":"NuxeoAuthentication","principalName":"Administrator","comment":"Administrator successfully logged in using AUTOMATION_BASIC_AUTH authentication","docLifeCycle":null,"docPath":null,"docType":null,"docUUID":null,"eventId":"loginSuccess","repositoryId":null,"eventDate":"2022-09-05T13:06:29.707Z","logDate":"2022-09-05T13:06:29.707Z","extended":{}}}
data: {"type":"disconnect","message":"Limit reached, bye."}
Tail on the bulk command stream:
curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/cat?stream=bulk/command&fromGroup=bulk/scroller&rewind=1&timeout=60s"
data: {"type":"lag","group":"bulk/scroller","stream":"bulk/command","partition":0,"lag":0,"pos":2,"end":2}
data: {"type":"lag","group":"bulk/scroller","stream":"bulk/command","partition":1,"lag":0,"pos":2,"end":2}
data: {"type":"seek","group":"admin/streamServlet","stream":"bulk/command","partition":0,"pos":1,"end":2}
data: {"type":"seek","group":"admin/streamServlet","stream":"bulk/command","partition":1,"pos":1,"end":2}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"record","stream":"bulk/command","partition":0,"offset":2,"watermark":"2022-09-05 13:16:02.214","key":"4eec2cb5-7470-48ec-8b63-bb2a43c1e847","length":137,"message": {"avroSchema":"BulkCommand","id": "4eec2cb5-7470-48ec-8b63-bb2a43c1e847", "action": "index", "query": "SELECT ecm:uuid FROM Document", "queryLimit": null, "username": "Administrator", "repository": "default", "bucketSize": 1000, "batchSize": 25, "batchTransactionTimeout": 0, "scroller": null, "genericScroller": false, "externalScroller": false, "params": "{\"updateAlias\":true}"}}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"record","stream":"bulk/command","partition":1,"offset":2,"watermark":"2022-09-05 13:16:29.896","key":"ada44abc-3bf2-4130-8ddb-7bed2aeaf2a2","length":437,"message": {"avroSchema":"BulkCommand","id": "ada44abc-3bf2-4130-8ddb-7bed2aeaf2a2", "action": "csvExport", "query": "SELECT * FROM Document WHERE ecm:primaryType NOT IN ('Domain', 'SectionRoot', 'TemplateRoot', 'WorkspaceRoot', 'Favorites') AND ecm:mixinType != 'HiddenInNavigation' AND NOT (ecm:mixinType = 'Collection' AND ecm:name = 'Locally Edited') AND ecm:isVersion = 0 AND ecm:isTrashed = 0 AND ecm:parentId IS NOT NULL AND ecm:uuid IS NOT NULL", "queryLimit": 100000, "username": "Administrator", "repository": "default", "bucketSize": 100, "batchSize": 50, "batchTransactionTimeout": 0, "scroller": "elastic", "genericScroller": false, "externalScroller": false, "params": null}}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"connect","message":"keepalive"}
data: {"type":"disconnect","message":"Read timeout, bye."}
Note that there is a JSP page that gives a demonstration of this cat endpoint, it requires Administrator privilege:
http://localhost:8080/nuxeo/stream.jsp
Get Detailed Nuxeo Stream and Processor Information
GET /management/stream
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| format | string | The output format, can be puml or d2. |
Optional |
Default format is JSON representation, a D2 (since 2023.39/2025.11) or Plant UML output can be requested.
This endpoint requires the metrics.streams.enabled=true.
Response
Returns a JSON representation of all available Nuxeo Stream information:
- a list of streams
- the list of deployed Stream processors and topologies
- all related metrics
The format parameter enables to ask for a Diagram output instead of JSON.
Status Codes
- 200 _OK_ - Success.
- 403 FORBIDDEN - Stream metrics is not activated.
Sample
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/
{
"streams": [
{
"name": "bulk/recomputeThumbnails",
"partitions": 1,
"codec": "avro"
},
... list of streams ...
],
"processors": [
{
"metadata": {
"hostname": "nuxeo",
"cpuCores": "16",
"ip": "172.26.0.10",
"processorName": "auditWriter",
"jvmHeapSize": "2147483648"
},
"computations": [
{
"name": "audit/writer",
"threads": 1,
"continueOnFailure": false,
"batchCapacity": 25,
"batchThresholdMs": 500,
"maxRetries": 20,
"retryDelayMs": 1000
}
],
"topology": [
[
"stream:audit/audit",
"computation:audit/writer"
]
]
},
... list of all processors ...
],
"metrics": [
{
"timestamp": 1662371154,
"hostname": "nuxeo",
"ip": "192.168.176.10",
"nodeId": null,
"metrics": [
{
"k": "nuxeo.streams.global.stream.group.end",
"group": "audit-writer",
"stream": "audit-audit",
"v": 2645
}, ...
{
"k": "nuxeo.streams.computation.processRecord",
"computation": "work-common",
"count": 18,
"rate1m": 0.0152098206590408,
"rate5m": 0.009001128456818261,
"sum": 19194601,
"max": 0.001976188,
"mean": 5.094094203003321E-4,
"min": 4.70333E-4,
"stddev": 1.5061167728046484E-5,
"p50": 5.09072E-4,
"p95": 5.09072E-4,
"p99": 5.89058E-4
},
... list of all metrics for all nodes ...
]
}
]
}
Generate an SVG D2 Diagram
# Get the D2 representation of the Nuxeo Streams
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/?format=d2 -o /tmp/streams.d2
# Generate a SVG
d2 --scale 0.2 /tmp/streams.d2 /tmp/streams.svg
Generate an SVG Plant UML Diagram
# Get Plant UML JAR (it requires Graphviz see https://plantuml.com/faq-install for more info)
curl https://netcologne.dl.sourceforge.net/project/plantuml/plantuml.jar -o /tmp/plantuml.jar
# Get the Plant UML representation of the Nuxeo Streams
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/?format=puml -o /tmp/streams.puml
# Generate a SVG
java -DPLANTUML_LIMIT_SIZE=16384 -jar /tmp/plantuml.jar /tmp/streams.puml -tsvg
Get Scaling Analysis
GET /management/stream/scale
This endpoint analyzes the current cluster workload and determines whether scaling is needed. It provides recommendations for scaling out (adding worker nodes) or scaling in (removing worker nodes) based on the active computations.
How It Works
The optimal number of worker nodes is calculated by maximizing concurrency on active computations. Each computation's concurrency is constrained by the number of partitions in its input streams.
Important: The system never scales down to zero worker nodes. At least one worker node is required to process asynchronous tasks that are essential for:
- Reporting metrics
- Logging audit entries
- Processing scheduled tasks
Metrics
The scale metric and the current number of worker nodes are exposed as real-time Nuxeo metrics:
nuxeo.streams.scale.metric: The recommended number of worker nodesnuxeo.cluster.worker.count: The current number of active worker nodes
Prerequisites
This endpoint requires metrics.streams.enabled=true in nuxeo.conf.
Query Parameters
None
Response
Returns a JSON object describing the scaling state of the cluster:
scale
currentNodes: The current number of worker nodes in the clusterbestNodes: The optimal number of worker nodes to handle the current loadmetric: A scale metric indicating the number of nodes to add (positive value) or remove (negative value). A value of 0 indicates optimal processing capacity.
nodes[]
- Array of node information showing where computations are currently running
computations[]
- Array of active computations (those with lag), including:
- Per-node metrics
- Cluster-level cumulative metrics
- ETA (estimated time to completion in milliseconds) calculated with:
- Current number of worker nodes
- Optimal number of worker nodes
Status Codes
- 200 _OK_ - Success. Returns scaling analysis data.
- 403 FORBIDDEN - Stream metrics are not activated. Set
metrics.streams.enabled=trueinnuxeo.conf.
Sample
Request:
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/scale
Response:
{
"scale": {
"currentNodes": 1,
"bestNodes": 3,
"metric": 2
},
"nodes": [
{
"hostname": "nuxeo-worker-5d4db8c4cc-zhndc",
"cpuCores": "12",
"created": "2023-03-16T14:23:52Z",
"ip": "10.60.114.6",
"jvmHeapSize": "25769803776",
"nodeId": "1d43600e-6c5c-4b35-aeef-fd44cf720daa",
"alive": "2023-03-16T14:36:25Z",
"type": "worker"
},
{
"hostname": "nuxeo-api-c66f6595-2sbgt",
"cpuCores": "12",
"created": "2023-03-16T14:24:24Z",
"ip": "10.60.112.6",
"jvmHeapSize": "25769803776",
"nodeId": "db9f6382-65ca-4bb4-99fb-edb47ebb4ba8",
"alive": "2023-03-16T14:36:24Z",
"type": "front"
}
],
"computations": [
{
"computation": "work-common",
"streams": {
"work-common": {
"stream": "work-common",
"partitions": 12,
"lag": 11223,
"end": 99515,
"latency": 335509
}
},
"nodes": [
{
"ip": "10.60.114.6",
"threads": 4,
"timestamp": 1678977385,
"count": 199030,
"sum": 4201101241938,
"rate1m": 437.83270847387524,
"rate5m": 319.5702889885148,
"min": 0.003909759,
"p50": 0.004975371,
"mean": 0.01355667013572912,
"p95": 0.101731737,
"max": 0.214049308,
"stddev": 0.02899623610194394
}
],
"current": {
"nodes": 1,
"threads": 4,
"rate1m": 437.8327,
"eta": 25
},
"best": {
"nodes": 3,
"threads": 12,
"rate1m": 1313.498,
"eta": 8
}
},
{
"computation": "work-elasticSearchIndexing",
"streams": {
"work-elasticSearchIndexing": {
"stream": "work-elasticSearchIndexing",
"partitions": 18,
"lag": 15,
"end": 90320,
"latency": 545
}
},
"nodes": [
{
"ip": "10.60.114.6",
"threads": 6,
"timestamp": 1678977385,
"count": 203038,
"sum": 892021842027,
"rate1m": 437.73966331810846,
"rate5m": 322.5043154801199,
"min": 0.002208503,
"p50": 0.002897732,
"mean": 0.0030058141376983634,
"p95": 0.003744013,
"max": 0.014256694,
"stddev": 6.861028639799985E-4
}
],
"current": {
"nodes": 1,
"threads": 6,
"rate1m": 437.73965,
"eta": 0
},
"best": {
"nodes": 3,
"threads": 18,
"rate1m": 1313.219,
"eta": 0
}
}
]
}
Get Consumer Positions
GET /management/stream/consumer/position
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| stream | string | The stream consumed by the consumer. | Required |
| consumer | string | The consumer name. | Required |
Response
Returns a JSON representation of the current consumer position.
Status Codes
- 200 _OK_ - Success.
Sample
curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position?stream=audit/audit&consumer=audit/writer"
{
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 1,
"lags": [
{
"partition": 0,
"pos": 2648,
"end": 2649,
"lag": 1
}
]
}
curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position?stream=work/common&consumer=work/common"
{
"stream": "work/common",
"consumer": "work/common",
"lag": 0,
"lags": [
{
"partition": 0,
"pos": 1107,
"end": 1107,
"lag": 0
},
{
"partition": 1,
"pos": 1162,
"end": 1162,
"lag": 0
},
{
"partition": 2,
"pos": 1195,
"end": 1195,
"lag": 0
},
{
"partition": 3,
"pos": 1182,
"end": 1182,
"lag": 0
},
{
"partition": 4,
"pos": 1100,
"end": 1100,
"lag": 0
},
{
"partition": 5,
"pos": 1162,
"end": 1162,
"lag": 0
}
]
}
Stop a Consumer Thread Pool on All Nuxeo Nodes
PUT /management/stream/consumer/stop
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| consumer | string | The consumer name. | Required |
Response
Always 204, this stops all consumer threads on all Nuxeo nodes of the clusters, the effective stop of the threads can take few seconds after returning.
Status Codes
- 204 _NO_CONTENT_
Sample
curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/stop?consumer=audit/writer"
This operation is traced at WARN level on all Nuxeo nodes running a consumer thread pool:
WARN [LogStreamProcessor] Stopping computation thread pool: Name{id='audit-writer', urn='audit/writer'}
Start Consumer Thread Pool on All Nuxeo Nodes
PUT /management/stream/consumer/start
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| consumer | string | The consumer name. | Required |
Response
Always 204, this start a consumer threads pools (previously stopped) on all Nuxeo nodes of the clusters.
Status Codes
- 204 _NO_CONTENT_
Sample
curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/start?consumer=audit/writer"
This is traced at WARN level on all Nuxeo logs:
WARN [LogStreamProcessor] Starting computation thread pool: Name{id='audit-writer', urn='audit/writer'}
Change Consumer Position to End of the Stream
PUT /management/stream/consumer/position/end
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| consumer | string | The consumer name. | Required |
| stream | string | The stream name. | Required |
Response
The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint.
Move the position to the end of the stream and returns a JSON representation of the consumer position before
and after the move.
Status Codes
- 409 CONFLICT - When consumers are not stopped
- 200 _OK_ - Success.
Sample
curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/end?consumer=audit/writer&stream=audit/audit"
{
"before": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 2,
"lags": [
{
"partition": 0,
"pos": 2658,
"end": 2660,
"lag": 2
}
]
},
"after": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 0,
"lags": [
{
"partition": 0,
"pos": 2660,
"end": 2660,
"lag": 0
}
]
}
}
Change Consumer Position to the Beginning of the Stream
PUT /management/stream/consumer/position/beginning
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| consumer | string | The consumer name. | Required |
| stream | string | The stream name. | Required |
Response
The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint.
Move the position to the beginning of the stream and returns a JSON representation of the consumer position before
and after the move.
Status Codes
- 409 CONFLICT - When consumers are not stopped
- 200 _OK_ - Success.
Sample
curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/beginning?consumer=audit/writer&stream=audit/audit"
{
"before": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 1,
"lags": [
{
"partition": 0,
"pos": 2661,
"end": 2662,
"lag": 1
}
]
},
"after": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 20,
"lags": [
{
"partition": 0,
"pos": 2642,
"end": 2662,
"lag": 20
}
]
}
}
Change Consumer Position to a Specific Offset
PUT /management/stream/consumer/position/offset
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| consumer | string | The consumer name. | Required |
| stream | string | The stream name. | Required |
| partition | integer | The partition number. | Required |
| offset | long | The offset value. | Required |
Response
The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint.
Move the position for a specific partition to the exact offset and returns a JSON representation of the consumer position before
and after the move.
Status Codes
- 409 CONFLICT - When consumers are not stopped
- 200 _OK_ - Success.
Sample
curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/offset?consumer=audit/writer&stream=audit/audit&partition=0&offset=2652"
{
"before": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 21,
"lags": [
{
"partition": 0,
"pos": 2642,
"end": 2663,
"lag": 21
}
]
},
"after": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 11,
"lags": [
{
"partition": 0,
"pos": 2652,
"end": 2663,
"lag": 11
}
]
}
}
Change Consumer Position After a Given Date
PUT /management/stream/consumer/position/after
Query Parameters
| Parameter Name | Type | Description | Notes |
|---|---|---|---|
| consumer | string | The consumer name. | Required |
| stream | string | The stream name. | Required |
| date | string | The date in ISO-8601 format, for instance 2022-09-05T12:49:34.308625Z. |
Required |
Response
The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint.
Move the position to record that were appended to the stream just after the given date and returns a JSON representation of the consumer position before
and after the move.
Status Codes
-
- 409 CONFLICT - When consumers are not stopped
- 200 _OK_ - Success.
Sample
curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/after?consumer=audit/writer&stream=audit/audit&date=2022-09-05T10:00:00Z"
{
"before": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 15,
"lags": [
{
"partition": 0,
"pos": 2652,
"end": 2667,
"lag": 15
}
]
},
"after": {
"stream": "audit/audit",
"consumer": "audit/writer",
"lag": 19,
"lags": [
{
"partition": 0,
"pos": 2648,
"end": 2667,
"lag": 19
}
]
}
}