REST API

Stream Endpoint

Updated: November 14, 2025

Get List of Streams

GET /management/stream/streams

Response

If successful, returns a JSON representation of all streams. This endpoint requires the metrics.streams.enabled=true.

Status Codes

  • 200 _OK_ - Success.
  • 403 FORBIDDEN - Stream metrics is not activated.

Sample

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/streams
[
  {
    "name": "bulk/recomputeThumbnails",
    "partitions": 1,
    "codec": "avro"
  },
  {
    "name": "work/collections",
    "partitions": 12,
    "codec": "avro"
  },
  {
    "name": "bulk/zipBlob",
    "partitions": 2,
    "codec": "avro"
  },

    ...other streams...
]

List Consumers of a Stream

GET /management/stream/consumers

Query Parameters

Parameter Name Type Description Notes
stream string The stream name. Required

Response

If successful, returns a JSON representation of the consumers for a given stream. This endpoint requires the metrics.streams.enabled=true.

Status Codes

  • 200 _OK_ - Success.
  • 403 FORBIDDEN - Stream metrics is not activated.

Sample

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/consumers?stream=bulk/command
[
  {
    "stream": "bulk/command",
    "consumer": "bulk/scroller"
  }
]

View Stream Content (Cat and Tail)

GET /management/stream/cat

Query Parameters

Parameter Name Type Description Notes
stream string The stream consumed by the consumer. Required
limit integer The number of record to return. Optional
fromGroup string Use the position of the consumer as starting point. Optional
fromOffset integer Use the offset as starting point, must be used with partition parameter Optional
partition integer View record for this partition. Optional
rewind integer Go back a number of record from the request position. Optional
timeout string Time to wait for new records, format is 10s or 3min. Optional

Response

If successful, returns a Server-Sent Events (SSE) stream of JSON records.

Status Codes

  • 200 _OK_ - Success.

Sample

View 2 records from audit, the last processed and the next one:

curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/cat?stream=audit/audit&fromGroup=audit/writer&rewind=1&limit=2&timeout=1s"
data: {"type":"lag","group":"audit/writer","stream":"audit/audit","partition":0,"lag":4,"pos":2668,"end":2672}

data: {"type":"seek","group":"admin/streamServlet","stream":"audit/audit","partition":0,"pos":2667,"end":2672}

data: {"type":"record","stream":"audit/audit","partition":0,"offset":2667,"watermark":"2022-09-05 13:06:01.159","key":"23","length":390,"message": {"entity-type":"logEntry","id":0,"category":"NuxeoAuthentication","principalName":"Administrator","comment":"Administrator successfully logged in using AUTOMATION_BASIC_AUTH authentication","docLifeCycle":null,"docPath":null,"docType":null,"docUUID":null,"eventId":"loginSuccess","repositoryId":null,"eventDate":"2022-09-05T13:06:01.159Z","logDate":"2022-09-05T13:06:01.159Z","extended":{}}}

data: {"type":"record","stream":"audit/audit","partition":0,"offset":2668,"watermark":"2022-09-05 13:06:29.707","key":"24","length":390,"message": {"entity-type":"logEntry","id":0,"category":"NuxeoAuthentication","principalName":"Administrator","comment":"Administrator successfully logged in using AUTOMATION_BASIC_AUTH authentication","docLifeCycle":null,"docPath":null,"docType":null,"docUUID":null,"eventId":"loginSuccess","repositoryId":null,"eventDate":"2022-09-05T13:06:29.707Z","logDate":"2022-09-05T13:06:29.707Z","extended":{}}}

data: {"type":"disconnect","message":"Limit reached, bye."}

Tail on the bulk command stream:

curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/cat?stream=bulk/command&fromGroup=bulk/scroller&rewind=1&timeout=60s"
data: {"type":"lag","group":"bulk/scroller","stream":"bulk/command","partition":0,"lag":0,"pos":2,"end":2}

data: {"type":"lag","group":"bulk/scroller","stream":"bulk/command","partition":1,"lag":0,"pos":2,"end":2}

data: {"type":"seek","group":"admin/streamServlet","stream":"bulk/command","partition":0,"pos":1,"end":2}

data: {"type":"seek","group":"admin/streamServlet","stream":"bulk/command","partition":1,"pos":1,"end":2}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"record","stream":"bulk/command","partition":0,"offset":2,"watermark":"2022-09-05 13:16:02.214","key":"4eec2cb5-7470-48ec-8b63-bb2a43c1e847","length":137,"message": {"avroSchema":"BulkCommand","id": "4eec2cb5-7470-48ec-8b63-bb2a43c1e847", "action": "index", "query": "SELECT ecm:uuid FROM Document", "queryLimit": null, "username": "Administrator", "repository": "default", "bucketSize": 1000, "batchSize": 25, "batchTransactionTimeout": 0, "scroller": null, "genericScroller": false, "externalScroller": false, "params": "{\"updateAlias\":true}"}}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"record","stream":"bulk/command","partition":1,"offset":2,"watermark":"2022-09-05 13:16:29.896","key":"ada44abc-3bf2-4130-8ddb-7bed2aeaf2a2","length":437,"message": {"avroSchema":"BulkCommand","id": "ada44abc-3bf2-4130-8ddb-7bed2aeaf2a2", "action": "csvExport", "query": "SELECT * FROM Document WHERE ecm:primaryType NOT IN ('Domain', 'SectionRoot', 'TemplateRoot', 'WorkspaceRoot', 'Favorites') AND ecm:mixinType != 'HiddenInNavigation' AND NOT (ecm:mixinType = 'Collection' AND ecm:name = 'Locally Edited') AND ecm:isVersion = 0 AND ecm:isTrashed = 0 AND ecm:parentId IS NOT NULL AND ecm:uuid IS NOT NULL", "queryLimit": 100000, "username": "Administrator", "repository": "default", "bucketSize": 100, "batchSize": 50, "batchTransactionTimeout": 0, "scroller": "elastic", "genericScroller": false, "externalScroller": false, "params": null}}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"disconnect","message":"Read timeout, bye."}

Note that there is a JSP page that gives a demonstration of this cat endpoint, it requires Administrator privilege:

http://localhost:8080/nuxeo/stream.jsp

Get Detailed Nuxeo Stream and Processor Information

GET /management/stream

Query Parameters

Parameter Name Type Description Notes
format string The output format, can be puml or d2. Optional

Default format is JSON representation, a D2 (since 2023.39/2025.11) or Plant UML output can be requested. This endpoint requires the metrics.streams.enabled=true.

Response

Returns a JSON representation of all available Nuxeo Stream information:

  • a list of streams
  • the list of deployed Stream processors and topologies
  • all related metrics

The format parameter enables to ask for a Diagram output instead of JSON.

Status Codes

  • 200 _OK_ - Success.
  • 403 FORBIDDEN - Stream metrics is not activated.

Sample

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/
{
  "streams": [
    {
      "name": "bulk/recomputeThumbnails",
      "partitions": 1,
      "codec": "avro"
    },
      ... list of streams ...
    ],
  "processors": [
    {
      "metadata": {
        "hostname": "nuxeo",
        "cpuCores": "16",
        "ip": "172.26.0.10",
        "processorName": "auditWriter",
        "jvmHeapSize": "2147483648"
      },
      "computations": [
        {
          "name": "audit/writer",
          "threads": 1,
          "continueOnFailure": false,
          "batchCapacity": 25,
          "batchThresholdMs": 500,
          "maxRetries": 20,
          "retryDelayMs": 1000
        }
      ],
      "topology": [
        [
          "stream:audit/audit",
          "computation:audit/writer"
        ]
      ]
    },
      ... list of all processors ...
    ],
  "metrics": [
    {
      "timestamp": 1662371154,
      "hostname": "nuxeo",
      "ip": "192.168.176.10",
      "nodeId": null,
      "metrics": [
        {
          "k": "nuxeo.streams.global.stream.group.end",
          "group": "audit-writer",
          "stream": "audit-audit",
          "v": 2645
        }, ...
        {
          "k": "nuxeo.streams.computation.processRecord",
          "computation": "work-common",
          "count": 18,
          "rate1m": 0.0152098206590408,
          "rate5m": 0.009001128456818261,
          "sum": 19194601,
          "max": 0.001976188,
          "mean": 5.094094203003321E-4,
          "min": 4.70333E-4,
          "stddev": 1.5061167728046484E-5,
          "p50": 5.09072E-4,
          "p95": 5.09072E-4,
          "p99": 5.89058E-4

        },
        ... list of all metrics for all nodes ...
        ]
      }
    ]
}

Generate an SVG D2 Diagram

Install D2

# Get the D2 representation of the Nuxeo Streams
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/?format=d2 -o /tmp/streams.d2

# Generate a SVG
d2 --scale 0.2 /tmp/streams.d2 /tmp/streams.svg

Generate an SVG Plant UML Diagram

# Get Plant UML JAR (it requires Graphviz see https://plantuml.com/faq-install for more info)
curl https://netcologne.dl.sourceforge.net/project/plantuml/plantuml.jar -o /tmp/plantuml.jar

# Get the Plant UML representation of the Nuxeo Streams
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/?format=puml -o /tmp/streams.puml

# Generate a SVG
java  -DPLANTUML_LIMIT_SIZE=16384  -jar /tmp/plantuml.jar /tmp/streams.puml -tsvg

Get Scaling Analysis

GET /management/stream/scale

This endpoint analyzes the current cluster workload and determines whether scaling is needed. It provides recommendations for scaling out (adding worker nodes) or scaling in (removing worker nodes) based on the active computations.

How It Works

The optimal number of worker nodes is calculated by maximizing concurrency on active computations. Each computation's concurrency is constrained by the number of partitions in its input streams.

Important: The system never scales down to zero worker nodes. At least one worker node is required to process asynchronous tasks that are essential for:

  • Reporting metrics
  • Logging audit entries
  • Processing scheduled tasks

Metrics

The scale metric and the current number of worker nodes are exposed as real-time Nuxeo metrics:

  • nuxeo.streams.scale.metric: The recommended number of worker nodes
  • nuxeo.cluster.worker.count: The current number of active worker nodes

Prerequisites

This endpoint requires metrics.streams.enabled=true in nuxeo.conf.

Query Parameters

None

Response

Returns a JSON object describing the scaling state of the cluster:

scale

  • currentNodes: The current number of worker nodes in the cluster
  • bestNodes: The optimal number of worker nodes to handle the current load
  • metric: A scale metric indicating the number of nodes to add (positive value) or remove (negative value). A value of 0 indicates optimal processing capacity.

nodes[]

  • Array of node information showing where computations are currently running

computations[]

  • Array of active computations (those with lag), including:
    • Per-node metrics
    • Cluster-level cumulative metrics
    • ETA (estimated time to completion in milliseconds) calculated with:
      • Current number of worker nodes
      • Optimal number of worker nodes

Status Codes

  • 200 _OK_ - Success. Returns scaling analysis data.
  • 403 FORBIDDEN - Stream metrics are not activated. Set metrics.streams.enabled=true in nuxeo.conf.

Sample

Request:

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/scale

Response:

{
    "scale": {
        "currentNodes": 1,
        "bestNodes": 3,
        "metric": 2
    },
    "nodes": [
        {
            "hostname": "nuxeo-worker-5d4db8c4cc-zhndc",
            "cpuCores": "12",
            "created": "2023-03-16T14:23:52Z",
            "ip": "10.60.114.6",
            "jvmHeapSize": "25769803776",
            "nodeId": "1d43600e-6c5c-4b35-aeef-fd44cf720daa",
            "alive": "2023-03-16T14:36:25Z",
            "type": "worker"
        },
        {
            "hostname": "nuxeo-api-c66f6595-2sbgt",
            "cpuCores": "12",
            "created": "2023-03-16T14:24:24Z",
            "ip": "10.60.112.6",
            "jvmHeapSize": "25769803776",
            "nodeId": "db9f6382-65ca-4bb4-99fb-edb47ebb4ba8",
            "alive": "2023-03-16T14:36:24Z",
            "type": "front"
        }
    ],
    "computations": [
        {
            "computation": "work-common",
            "streams": {
                "work-common": {
                    "stream": "work-common",
                    "partitions": 12,
                    "lag": 11223,
                    "end": 99515,
                    "latency": 335509
                }
            },
            "nodes": [
                {
                    "ip": "10.60.114.6",
                    "threads": 4,
                    "timestamp": 1678977385,
                    "count": 199030,
                    "sum": 4201101241938,
                    "rate1m": 437.83270847387524,
                    "rate5m": 319.5702889885148,
                    "min": 0.003909759,
                    "p50": 0.004975371,
                    "mean": 0.01355667013572912,
                    "p95": 0.101731737,
                    "max": 0.214049308,
                    "stddev": 0.02899623610194394
                }
            ],
            "current": {
                "nodes": 1,
                "threads": 4,
                "rate1m": 437.8327,
                "eta": 25
            },
            "best": {
                "nodes": 3,
                "threads": 12,
                "rate1m": 1313.498,
                "eta": 8
            }
        },
        {
            "computation": "work-elasticSearchIndexing",
            "streams": {
                "work-elasticSearchIndexing": {
                    "stream": "work-elasticSearchIndexing",
                    "partitions": 18,
                    "lag": 15,
                    "end": 90320,
                    "latency": 545
                }
            },
            "nodes": [
                {
                    "ip": "10.60.114.6",
                    "threads": 6,
                    "timestamp": 1678977385,
                    "count": 203038,
                    "sum": 892021842027,
                    "rate1m": 437.73966331810846,
                    "rate5m": 322.5043154801199,
                    "min": 0.002208503,
                    "p50": 0.002897732,
                    "mean": 0.0030058141376983634,
                    "p95": 0.003744013,
                    "max": 0.014256694,
                    "stddev": 6.861028639799985E-4
                }
            ],
            "current": {
                "nodes": 1,
                "threads": 6,
                "rate1m": 437.73965,
                "eta": 0
            },
            "best": {
                "nodes": 3,
                "threads": 18,
                "rate1m": 1313.219,
                "eta": 0
            }
        }
    ]
}

Get Consumer Positions

GET /management/stream/consumer/position

Query Parameters

Parameter Name Type Description Notes
stream string The stream consumed by the consumer. Required
consumer string The consumer name. Required

Response

Returns a JSON representation of the current consumer position.

Status Codes

  • 200 _OK_ - Success.

Sample

curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position?stream=audit/audit&consumer=audit/writer"
{
  "stream": "audit/audit",
  "consumer": "audit/writer",
  "lag": 1,
  "lags": [
    {
      "partition": 0,
      "pos": 2648,
      "end": 2649,
      "lag": 1
    }
  ]
}
curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position?stream=work/common&consumer=work/common"
{
  "stream": "work/common",
  "consumer": "work/common",
  "lag": 0,
  "lags": [
    {
      "partition": 0,
      "pos": 1107,
      "end": 1107,
      "lag": 0
    },
    {
      "partition": 1,
      "pos": 1162,
      "end": 1162,
      "lag": 0
    },
    {
      "partition": 2,
      "pos": 1195,
      "end": 1195,
      "lag": 0
    },
    {
      "partition": 3,
      "pos": 1182,
      "end": 1182,
      "lag": 0
    },
    {
      "partition": 4,
      "pos": 1100,
      "end": 1100,
      "lag": 0
    },
    {
      "partition": 5,
      "pos": 1162,
      "end": 1162,
      "lag": 0
    }
  ]
}

Stop a Consumer Thread Pool on All Nuxeo Nodes

PUT /management/stream/consumer/stop

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required

Response

Always 204, this stops all consumer threads on all Nuxeo nodes of the clusters, the effective stop of the threads can take few seconds after returning.

Status Codes

  • 204 _NO_CONTENT_

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/stop?consumer=audit/writer"

This operation is traced at WARN level on all Nuxeo nodes running a consumer thread pool:

WARN  [LogStreamProcessor] Stopping computation thread pool: Name{id='audit-writer', urn='audit/writer'}

Start Consumer Thread Pool on All Nuxeo Nodes

PUT /management/stream/consumer/start

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required

Response

Always 204, this start a consumer threads pools (previously stopped) on all Nuxeo nodes of the clusters.

Status Codes

  • 204 _NO_CONTENT_

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/start?consumer=audit/writer"

This is traced at WARN level on all Nuxeo logs:

WARN  [LogStreamProcessor] Starting computation thread pool: Name{id='audit-writer', urn='audit/writer'}

Change Consumer Position to End of the Stream

PUT /management/stream/consumer/position/end

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position to the end of the stream and returns a JSON representation of the consumer position before and after the move.

Status Codes

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/end?consumer=audit/writer&stream=audit/audit"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 2,
    "lags": [
      {
        "partition": 0,
        "pos": 2658,
        "end": 2660,
        "lag": 2
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 0,
    "lags": [
      {
        "partition": 0,
        "pos": 2660,
        "end": 2660,
        "lag": 0
      }
    ]
  }
}

Change Consumer Position to the Beginning of the Stream

PUT /management/stream/consumer/position/beginning

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position to the beginning of the stream and returns a JSON representation of the consumer position before and after the move.

Status Codes

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/beginning?consumer=audit/writer&stream=audit/audit"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 1,
    "lags": [
      {
        "partition": 0,
        "pos": 2661,
        "end": 2662,
        "lag": 1
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 20,
    "lags": [
      {
        "partition": 0,
        "pos": 2642,
        "end": 2662,
        "lag": 20
      }
    ]
  }
}

Change Consumer Position to a Specific Offset

PUT /management/stream/consumer/position/offset

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required
partition integer The partition number. Required
offset long The offset value. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position for a specific partition to the exact offset and returns a JSON representation of the consumer position before and after the move.

Status Codes

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/offset?consumer=audit/writer&stream=audit/audit&partition=0&offset=2652"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 21,
    "lags": [
      {
        "partition": 0,
        "pos": 2642,
        "end": 2663,
        "lag": 21
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 11,
    "lags": [
      {
        "partition": 0,
        "pos": 2652,
        "end": 2663,
        "lag": 11
      }
    ]
  }
}

Change Consumer Position After a Given Date

PUT /management/stream/consumer/position/after

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required
date string The date in ISO-8601 format, for instance 2022-09-05T12:49:34.308625Z. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position to record that were appended to the stream just after the given date and returns a JSON representation of the consumer position before and after the move.

Status Codes

-

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/after?consumer=audit/writer&stream=audit/audit&date=2022-09-05T10:00:00Z"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 15,
    "lags": [
      {
        "partition": 0,
        "pos": 2652,
        "end": 2667,
        "lag": 15
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 19,
    "lags": [
      {
        "partition": 0,
        "pos": 2648,
        "end": 2667,
        "lag": 19
      }
    ]
  }
}