REST API

Stream Endpoint

Updated: April 19, 2023

Get List of Streams

GET /management/stream/streams

Response

If successful, returns a JSON representation of all streams.

Status Codes

  • 200 _OK_ - Success.

Sample

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/streams
[
  {
    "name": "bulk/recomputeThumbnails",
    "partitions": 1,
    "codec": "avro"
  },
  {
    "name": "work/collections",
    "partitions": 12,
    "codec": "avro"
  },
  {
    "name": "bulk/zipBlob",
    "partitions": 2,
    "codec": "avro"
  },

    ...other streams...
]

List Consumers of a Stream

GET /management/stream/consumers

Query Parameters

Parameter Name Type Description Notes
stream string The stream name. Required

Response

If successful, returns a JSON representation of the consumers for a given stream.

Status Codes

  • 200 _OK_ - Success.

Sample

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/consumers?stream=bulk/command
[
  {
    "stream": "bulk/command",
    "consumer": "bulk/scroller"
  }
]

View Stream Content (Cat and Tail)

GET /management/stream/cat

Query Parameters

Parameter Name Type Description Notes
stream string The stream consumed by the consumer. Required
limit integer The number of record to return. Optional
fromGroup string Use the position of the consumer as starting point. Optional
fromOffset integer Use the offset as starting point, must be used with partition parameter Optional
partition integer View record for this partition. Optional
rewind integer Go back a number of record from the request position. Optional
timeout string Time to wait for new records, format is 10s or 3min. Optional

Response

If successful, returns a Server-Sent Events (SSE) stream of JSON records.

Status Codes

  • 200 _OK_ - Success.

Sample

View 2 records from audit, the last processed and the next one:

curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/cat?stream=audit/audit&fromGroup=audit/writer&rewind=1&limit=2&timeout=1s"
data: {"type":"lag","group":"audit/writer","stream":"audit/audit","partition":0,"lag":4,"pos":2668,"end":2672}

data: {"type":"seek","group":"admin/streamServlet","stream":"audit/audit","partition":0,"pos":2667,"end":2672}

data: {"type":"record","stream":"audit/audit","partition":0,"offset":2667,"watermark":"2022-09-05 13:06:01.159","key":"23","length":390,"message": {"entity-type":"logEntry","id":0,"category":"NuxeoAuthentication","principalName":"Administrator","comment":"Administrator successfully logged in using AUTOMATION_BASIC_AUTH authentication","docLifeCycle":null,"docPath":null,"docType":null,"docUUID":null,"eventId":"loginSuccess","repositoryId":null,"eventDate":"2022-09-05T13:06:01.159Z","logDate":"2022-09-05T13:06:01.159Z","extended":{}}}

data: {"type":"record","stream":"audit/audit","partition":0,"offset":2668,"watermark":"2022-09-05 13:06:29.707","key":"24","length":390,"message": {"entity-type":"logEntry","id":0,"category":"NuxeoAuthentication","principalName":"Administrator","comment":"Administrator successfully logged in using AUTOMATION_BASIC_AUTH authentication","docLifeCycle":null,"docPath":null,"docType":null,"docUUID":null,"eventId":"loginSuccess","repositoryId":null,"eventDate":"2022-09-05T13:06:29.707Z","logDate":"2022-09-05T13:06:29.707Z","extended":{}}}

data: {"type":"disconnect","message":"Limit reached, bye."}

Tail on the bulk command stream:

curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/cat?stream=bulk/command&fromGroup=bulk/scroller&rewind=1&timeout=60s"
data: {"type":"lag","group":"bulk/scroller","stream":"bulk/command","partition":0,"lag":0,"pos":2,"end":2}

data: {"type":"lag","group":"bulk/scroller","stream":"bulk/command","partition":1,"lag":0,"pos":2,"end":2}

data: {"type":"seek","group":"admin/streamServlet","stream":"bulk/command","partition":0,"pos":1,"end":2}

data: {"type":"seek","group":"admin/streamServlet","stream":"bulk/command","partition":1,"pos":1,"end":2}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"record","stream":"bulk/command","partition":0,"offset":2,"watermark":"2022-09-05 13:16:02.214","key":"4eec2cb5-7470-48ec-8b63-bb2a43c1e847","length":137,"message": {"avroSchema":"BulkCommand","id": "4eec2cb5-7470-48ec-8b63-bb2a43c1e847", "action": "index", "query": "SELECT ecm:uuid FROM Document", "queryLimit": null, "username": "Administrator", "repository": "default", "bucketSize": 1000, "batchSize": 25, "batchTransactionTimeout": 0, "scroller": null, "genericScroller": false, "externalScroller": false, "params": "{\"updateAlias\":true}"}}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"record","stream":"bulk/command","partition":1,"offset":2,"watermark":"2022-09-05 13:16:29.896","key":"ada44abc-3bf2-4130-8ddb-7bed2aeaf2a2","length":437,"message": {"avroSchema":"BulkCommand","id": "ada44abc-3bf2-4130-8ddb-7bed2aeaf2a2", "action": "csvExport", "query": "SELECT * FROM Document WHERE ecm:primaryType NOT IN ('Domain', 'SectionRoot', 'TemplateRoot', 'WorkspaceRoot', 'Favorites') AND ecm:mixinType != 'HiddenInNavigation' AND NOT (ecm:mixinType = 'Collection' AND ecm:name = 'Locally Edited') AND ecm:isVersion = 0 AND ecm:isTrashed = 0 AND ecm:parentId IS NOT NULL AND ecm:uuid IS NOT NULL", "queryLimit": 100000, "username": "Administrator", "repository": "default", "bucketSize": 100, "batchSize": 50, "batchTransactionTimeout": 0, "scroller": "elastic", "genericScroller": false, "externalScroller": false, "params": null}}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"connect","message":"keepalive"}

data: {"type":"disconnect","message":"Read timeout, bye."}

Note that there is a JSP page that gives a demonstration of this cat endpoint, it requires Administrator privilege:

http://localhost:8080/nuxeo/stream.jsp

Get Detailed Nuxeo Stream and Processor Information

GET /management/stream

Query Parameters

Parameter Name Type Description Notes
format string The output format, only puml is supported. Optional

Default format is JSON representation, a Plant UML output can be requested.

Response

Returns a JSON representation of all available Nuxeo Stream information:

  • a list of streams
  • the list of deployed Stream processors and topologies
  • all related metrics

The format parameter enables to ask for a graphical Plant UML representation instead of JSON.

Status Codes

  • 200 _OK_ - Success.

Sample

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/
{
  "streams": [
    {
      "name": "bulk/recomputeThumbnails",
      "partitions": 1,
      "codec": "avro"
    },
      ... list of streams ...
    ],
  "processors": [
    {
      "metadata": {
        "hostname": "nuxeo",
        "cpuCores": "16",
        "ip": "172.26.0.10",
        "processorName": "auditWriter",
        "jvmHeapSize": "2147483648"
      },
      "computations": [
        {
          "name": "audit/writer",
          "threads": 1,
          "continueOnFailure": false,
          "batchCapacity": 25,
          "batchThresholdMs": 500,
          "maxRetries": 20,
          "retryDelayMs": 1000
        }
      ],
      "topology": [
        [
          "stream:audit/audit",
          "computation:audit/writer"
        ]
      ]
    },
      ... list of all processors ...
    ],
  "metrics": [
    {
      "timestamp": 1662371154,
      "hostname": "nuxeo",
      "ip": "192.168.176.10",
      "nodeId": null,
      "metrics": [
        {
          "k": "nuxeo.streams.global.stream.group.end",
          "group": "audit-writer",
          "stream": "audit-audit",
          "v": 2645
        }, ...
        {
          "k": "nuxeo.streams.computation.processRecord",
          "computation": "work-common",
          "count": 18,
          "rate1m": 0.0152098206590408,
          "rate5m": 0.009001128456818261,
          "sum": 19194601,
          "max": 0.001976188,
          "mean": 5.094094203003321E-4,
          "min": 4.70333E-4,
          "stddev": 1.5061167728046484E-5,
          "p50": 5.09072E-4,
          "p95": 5.09072E-4,
          "p99": 5.89058E-4

        },
        ... list of all metrics for all nodes ...
        ]
      }
    ]
}

Generate a graphical representation of the Stream processing:

# Get Plant UML JAR (it requires Graphviz see https://plantuml.com/faq-install for more info)
curl https://netcologne.dl.sourceforge.net/project/plantuml/plantuml.jar -o /tmp/plantuml.jar

# Get the Plant UML representation of the Nuxeo Streams
curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/?format=puml -o /tmp/streams.puml

# Generate a SVG
java  -DPLANTUML_LIMIT_SIZE=16384  -jar /tmp/plantuml.jar /tmp/streams.puml -tsvg

# view it
x-www-browser /tmp/streams.svg

Get Scaling Analysis

GET /management/stream/scale

This endpoints describes if the current load requires scale up (add worker nodes) or scale down (remove worker nodes).

The best number of worker nodes is determined by trying to maximize the concurrency on active computations, that are limited by the number of partitions in their input streams.

Note that we don't scale to 0 worker node, because there is always a worker node needed in order to process async tasks that are necessary to report metrics, log audit entries, process scheduled tasks ...

The scale metric and the number of worker nodes are also exposed as metric in realtime:

  • nuxeo.streams.scale.metric
  • nuxeo.cluster.worker.count

Query Parameters

None

Response

Returns a JSON describing the scaling state:

  • scale/currentNodes: The current number of worker nodes.
  • scale/bestNodes: The best number of worker nodes to handle the load.
  • scale/metric: A scale metric that indicates the number of node to add (> 0) or to remove (<0), 0 should be the target for optimal processing.
  • nodes[]: The list of nodes information where computations are running
  • computations[]: list of active computations (with a lag), including all metrics per nodes and cumulated at cluster level, you can find an ETA (estimated time of completion in millisecond) with the current number of worker nodes and with the optimal number of nodes.

Status Codes

  • 200 _OK_ - Success.

Sample

curl -u Administrator:Administrator \
http://localhost:8080/nuxeo/api/v1/management/stream/scale
{
    "scale": {
        "currentNodes": 1,
        "bestNodes": 3,
        "metric": 2
    },
    "nodes": [
        {
            "hostname": "nuxeo-worker-5d4db8c4cc-zhndc",
            "cpuCores": "12",
            "created": "2023-03-16T14:23:52Z",
            "ip": "10.60.114.6",
            "jvmHeapSize": "25769803776",
            "nodeId": "1d43600e-6c5c-4b35-aeef-fd44cf720daa",
            "alive": "2023-03-16T14:36:25Z",
            "type": "worker"
        },
        {
            "hostname": "nuxeo-api-c66f6595-2sbgt",
            "cpuCores": "12",
            "created": "2023-03-16T14:24:24Z",
            "ip": "10.60.112.6",
            "jvmHeapSize": "25769803776",
            "nodeId": "db9f6382-65ca-4bb4-99fb-edb47ebb4ba8",
            "alive": "2023-03-16T14:36:24Z",
            "type": "front"
        }
    ],
    "computations": [
        {
            "computation": "work-common",
            "streams": {
                "work-common": {
                    "stream": "work-common",
                    "partitions": 12,
                    "lag": 11223,
                    "end": 99515,
                    "latency": 335509
                }
            },
            "nodes": [
                {
                    "ip": "10.60.114.6",
                    "threads": 4,
                    "timestamp": 1678977385,
                    "count": 199030,
                    "sum": 4201101241938,
                    "rate1m": 437.83270847387524,
                    "rate5m": 319.5702889885148,
                    "min": 0.003909759,
                    "p50": 0.004975371,
                    "mean": 0.01355667013572912,
                    "p95": 0.101731737,
                    "max": 0.214049308,
                    "stddev": 0.02899623610194394
                }
            ],
            "current": {
                "nodes": 1,
                "threads": 4,
                "rate1m": 437.8327,
                "eta": 25
            },
            "best": {
                "nodes": 3,
                "threads": 12,
                "rate1m": 1313.498,
                "eta": 8
            }
        },
        {
            "computation": "work-elasticSearchIndexing",
            "streams": {
                "work-elasticSearchIndexing": {
                    "stream": "work-elasticSearchIndexing",
                    "partitions": 18,
                    "lag": 15,
                    "end": 90320,
                    "latency": 545
                }
            },
            "nodes": [
                {
                    "ip": "10.60.114.6",
                    "threads": 6,
                    "timestamp": 1678977385,
                    "count": 203038,
                    "sum": 892021842027,
                    "rate1m": 437.73966331810846,
                    "rate5m": 322.5043154801199,
                    "min": 0.002208503,
                    "p50": 0.002897732,
                    "mean": 0.0030058141376983634,
                    "p95": 0.003744013,
                    "max": 0.014256694,
                    "stddev": 6.861028639799985E-4
                }
            ],
            "current": {
                "nodes": 1,
                "threads": 6,
                "rate1m": 437.73965,
                "eta": 0
            },
            "best": {
                "nodes": 3,
                "threads": 18,
                "rate1m": 1313.219,
                "eta": 0
            }
        }
    ]
}

Get Consumer Positions

GET /management/stream/consumer/position

Query Parameters

Parameter Name Type Description Notes
stream string The stream consumed by the consumer. Required
consumer string The consumer name. Required

Response

Returns a JSON representation of the current consumer position.

Status Codes

  • 200 _OK_ - Success.

Sample

curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position?stream=audit/audit&consumer=audit/writer"
{
  "stream": "audit/audit",
  "consumer": "audit/writer",
  "lag": 1,
  "lags": [
    {
      "partition": 0,
      "pos": 2648,
      "end": 2649,
      "lag": 1
    }
  ]
}
curl -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position?stream=work/common&consumer=work/common"
{
  "stream": "work/common",
  "consumer": "work/common",
  "lag": 0,
  "lags": [
    {
      "partition": 0,
      "pos": 1107,
      "end": 1107,
      "lag": 0
    },
    {
      "partition": 1,
      "pos": 1162,
      "end": 1162,
      "lag": 0
    },
    {
      "partition": 2,
      "pos": 1195,
      "end": 1195,
      "lag": 0
    },
    {
      "partition": 3,
      "pos": 1182,
      "end": 1182,
      "lag": 0
    },
    {
      "partition": 4,
      "pos": 1100,
      "end": 1100,
      "lag": 0
    },
    {
      "partition": 5,
      "pos": 1162,
      "end": 1162,
      "lag": 0
    }
  ]
}

Stop a Consumer Thread Pool on All Nuxeo Nodes

PUT /management/stream/consumer/stop

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required

Response

Always 204, this stops all consumer threads on all Nuxeo nodes of the clusters, the effective stop of the threads can take few seconds after returning.

Status Codes

  • 204 _NO_CONTENT_

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/stop?consumer=audit/writer"

This operation is traced at WARN level on all Nuxeo nodes running a consumer thread pool:

WARN  [LogStreamProcessor] Stopping computation thread pool: Name{id='audit-writer', urn='audit/writer'}

Start Consumer Thread Pool on All Nuxeo Nodes

PUT /management/stream/consumer/start

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required

Response

Always 204, this start a consumer threads pools (previously stopped) on all Nuxeo nodes of the clusters.

Status Codes

  • 204 _NO_CONTENT_

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/start?consumer=audit/writer"

This is traced at WARN level on all Nuxeo logs:

WARN  [LogStreamProcessor] Starting computation thread pool: Name{id='audit-writer', urn='audit/writer'}

Change Consumer Position to End of the Stream

PUT /management/stream/consumer/position/end

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position to the end of the stream and returns a JSON representation of the consumer position before and after the move.

Status Codes

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/end?consumer=audit/writer&stream=audit/audit"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 2,
    "lags": [
      {
        "partition": 0,
        "pos": 2658,
        "end": 2660,
        "lag": 2
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 0,
    "lags": [
      {
        "partition": 0,
        "pos": 2660,
        "end": 2660,
        "lag": 0
      }
    ]
  }
}

Change Consumer Position to the Beginning of the Stream

PUT /management/stream/consumer/position/beginning

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position to the beginning of the stream and returns a JSON representation of the consumer position before and after the move.

Status Codes

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/beginning?consumer=audit/writer&stream=audit/audit"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 1,
    "lags": [
      {
        "partition": 0,
        "pos": 2661,
        "end": 2662,
        "lag": 1
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 20,
    "lags": [
      {
        "partition": 0,
        "pos": 2642,
        "end": 2662,
        "lag": 20
      }
    ]
  }
}

Change Consumer Position to a Specific Offset

PUT /management/stream/consumer/position/offset

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required
partition integer The partition number. Required
offset long The offset value. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position for a specific partition to the exact offset and returns a JSON representation of the consumer position before and after the move.

Status Codes

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/offset?consumer=audit/writer&stream=audit/audit&partition=0&offset=2652"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 21,
    "lags": [
      {
        "partition": 0,
        "pos": 2642,
        "end": 2663,
        "lag": 21
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 11,
    "lags": [
      {
        "partition": 0,
        "pos": 2652,
        "end": 2663,
        "lag": 11
      }
    ]
  }
}

Change Consumer Position After a Given Date

PUT /management/stream/consumer/position/after

Query Parameters

Parameter Name Type Description Notes
consumer string The consumer name. Required
stream string The stream name. Required
date string The date in ISO-8601 format, for instance 2022-09-05T12:49:34.308625Z. Required

Response

The consumer thread pool must be first stopped with the /management/stream/consumer/stop endpoint. Move the position to record that were appended to the stream just after the given date and returns a JSON representation of the consumer position before and after the move.

Status Codes

-

  • 409 CONFLICT - When consumers are not stopped
  • 200 _OK_ - Success.

Sample

curl -XPUT -u Administrator:Administrator \
"http://localhost:8080/nuxeo/api/v1/management/stream/consumer/position/after?consumer=audit/writer&stream=audit/audit&date=2022-09-05T10:00:00Z"
{
  "before": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 15,
    "lags": [
      {
        "partition": 0,
        "pos": 2652,
        "end": 2667,
        "lag": 15
      }
    ]
  },
  "after": {
    "stream": "audit/audit",
    "consumer": "audit/writer",
    "lag": 19,
    "lags": [
      {
        "partition": 0,
        "pos": 2648,
        "end": 2667,
        "lag": 19
      }
    ]
  }
}