Server

Bulk Action Framework

Updated: December 4, 2024

The Bulk Action Framework provides a service to be able to run resilient bulk actions on a possibly large set of documents.

Hyland University
Watch the related course on Hyland University:
Course on Nuxeo Stream.
university-stream.png
university-stream.png

This framework introduces several notions:

  • document set: a list of documents from a repository represented as a list of document identifiers.
  • action: an operation that can be applied to a document set.
  • command: a set of parameters building a request to apply an action on a document set.
  • bucket: a portion of a document set that fits into a stream record.
  • batch: a smaller (or equals) portion of a bucket where the action is applied within a transaction.
    Requirements
    To work properly, the Bulk Service needs a true KeyValue storage to store the command and its status, there are 2 possibles choices: - Use RedisKeyValueStore if you have nuxeo.redis.enabled=true in your nuxeo.conf. - Use MongoDBKeyValueStore if you are using the MongoDB template. You should not rely on the default MemKeyValueStore implementation that flushes the data on restart.

Bulk Service

This service provides ways to:

  • Submit a command to be executed.
  • Get the status of a submitted command.
  • Get the result of a submitted command.
  • Abort a submitted command.
  • Wait for a command to be completely executed.
  • Wait for all running commands to be completely executed (for tests).

The following is an example of bulk service usage:

// build command
BulkCommand command = new BulkCommand.Builder(SetPropertiesAction.ACTION_NAME,
        "SELECT * from Document").repository("default")
                                 .user("Administrator")
                                 .param("dc:nature", "article")
                                 .param("dc:subjects", Collections.singletonList("art/architecture"))
                                 // default is FALSE
                                 .param(SetPropertiesAction.PARAM_DISABLE_AUDIT, Boolean.TRUE)
                                 // default is null - only NONE can be set
                                 .param(SetPropertiesAction.PARAM_VERSIONING_OPTION, "NONE")
                                 .build();

// run command
BulkService bulkService = Framework.getService(BulkService.class);
String commandId = bulkService.submit(command);

// await end of computation
bulkService.await(commandId, Duration.ofMinutes(1));

// get status
BulkStatus status = bulkService.getStatus(commandId);

Execution Flow

Java Client Request flow

BulkService

The entry point is the BulkService that takes a bulk command as an input. The service submits this command, meaning it appends the BulkCommand to the command stream.

The BulkService can also return the status of a command which is internally stored into a KeyValueStore.

Scroller Computation

The command stream is the input of the Scroller computation.

This computation scrolls the database using a cursor to retrieve the document ids matching the NXQL query. The ids are grouped into a bucket that fit into a record.

The BulkBucket record is appended to the action's stream.

The scroller sends command status update to inform that the scroll is in progress or terminated and to set the total number of document in the materialized document set.

Actions Processors

Each action runs its own stream processor (a topology of computations).

The action processor must respect the following rules:

  • action must send a status update containing the number of processed documents since the last update.
  • action must handle possible error, for instance the user that send the command might not have write permission on all documents
  • the total number of processed documents reported must match at some point the number of documents in the document set.
  • action that aggregates bucket records per command must handle interleaved commands. This can be done by maintaining a local state for each command.
  • action that aggregates bucket records per command should checkpoint only when there no other interleaved command in progress. This is to prevent checkpoint while some records are not yet processed resulting in possible loss of record.

An AbstractBulkComputation is provided so an action can be implemented easily with a single computation. See SetPropertiesAction for a simple example.

See The CSVExportAction and particularly the MakeBlob computation for an advanced example.

Status Computation

This computation reads from the status stream and aggregate status update to build the current status of command. The status is saved into a KeyValueStore. When the number of processed document is equals to the number of document in the set, the state is changed to completed. And the computation appends the final status to the done stream.

This done stream can be used as an input by custom computation to execute other actions once a command is completed.

Building an Action With the Bulk Service

Edit the MANIFEST File

On your MANIFEST file, add your new contribution:

[...]
Nuxeo-Component: OSGI-INF/stream-contrib.xml

Contribute an Action

You need to register a couple action/stream processor:

<extension target="org.nuxeo.ecm.core.bulk" point="actions">
  <action name="myAction" bucketSize="100" batchSize="20"/>
</extension>

<extension target="org.nuxeo.runtime.stream.service" point="streamProcessor">
  <streamProcessor name="myAction" class="org.nuxeo.ecm.core.bulk.actions.MyActionProcessor" logConfig="bulk"
      defaultConcurrency="2" defaultPartitions="4">
  </streamProcessor>
</extension>

If your action has some parameters, you can also validate them by adding a validation class to the action contribution:

<action name="myAction" validationClass="org.nuxeo.myValidationClass"/>

It is possible to add several options to the stream processor to tune the way the documents are processed. Please visit the related README for more information.

Create the Stream Processor

Create a Java Class to declare a standard Stream processor. You should have a topology. It is possible to have as many computations as needed, but the progress of our process must be reported to status stream.

public class MyActionProcessor implements StreamProcessorTopology {

    private static final Log log = LogFactory.getLog(MyActionProcessor.class);

    @Override
    public Topology getTopology(Map<String, String> options) {
        return Topology.builder()
                       .addComputation(MyComputation::new, Arrays.asList(INPUT_1 + ":" + ACTION_NAME, //
                               OUTPUT_1 + ":" + STATUS_STREAM))
                       .build();
    }
}

In this example, there is a single computation called MyComputation. This topology takes as input a stream with the name of the action, and the output stream as output.

Create the Computations

The computation presented previously should be defined in another Java class.

An abstract Java class, AbstractBulkComputation, helps to create all that we need for the computation:

  • A CoreSession
  • The list of documents which are going to be processed
  • The action properties.

Everything inside the class is the custom logic to implement.

// ...
public class MyComputation extends AbstractBulkComputation {

    private static final Logger log = LogManager.getLogger(MyComputation.class);

    protected static final String ACTION_NAME = "myAction";

    public MyComputation() {
        super(ACTION_NAME);
    }

    @Override
    protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) {
        // Custom Logic
    }
    // ...
}

Note that you can load all the batch ids at once with loadDocument helper:

for (DocumentModel doc : loadDocuments(session, ids)) {
  // ...
}

Invoke the Bulk Service

The Bulk service can be called in different ways. For example:

  • From a page provider:

    curl -s -X POST "$SERVER_URL/nuxeo/api/v1/search/pp/default_search/bulk/csvExport"
    

    adding an adapter to invoke our bulk service, with the name of our action, which, in this case, is csvExport.

  • From an automation operation:

    curl -s -X POST "${SERVER_URL}/nuxeo/site/automation/Elasticsearch.BulkIndex"
    

Bulk REST API

Bulk Service APIs are accessible with REST API from two places:

  • search endpoint to submit a command
  • dedicated bulk endpoint for others actions

Objects transiting between server and client are parameters of bulk action which is a simple JSON object and depend on action needs, and BulkStatus object whose JSON representation is as below:

{
  "entity-type": "bulkStatus",
  "commandId": "00000000-0000-0000-0000-000000000000",
  "state": "COMPLETED",
  "action": "myAction",
  "username": "myUser",
  "total": 127,
  "processed": 127,
  "error": false,
  "errorCount": 0,
  "submitted": "2018-06-21T12:37:08.172Z",
  "scrollStart": "2018-06-21T12:38:08.172Z",
  "scrollEnd": "2018-06-21T12:39:08.172Z",
  "processingStart": "2018-06-21T12:39:08.272Z",
  "processingEnd": "2018-06-21T12:40:08.072Z",
  "processingMillis": 1234,
  "completed": "2018-06-21T12:40:08.172Z",
  "result": {
    "result1": "o1",
    "result2": ["o2", "o3"]
  }
}

Submit a Command

You can submit a bulk command through REST by using bulk endpoint leveraging the search endpoint. Thus, you can run your command on a query, page provider or saved search.

Description HTTP Method Path Request Body Response
Submit a bulk command on a NXQL query POST /api/v1/search/bulk/{actionId} Parameters for bulk action as a JSON object BulkStatus object
Submit a bulk command on a page provider POST /api/v1/search/pp/{providerName}/bulk/{actionId} Parameters for bulk action as a JSON object BulkStatus object
Submit a bulk command on a saved search POST /api/v1/search/saved/{searchId}/bulk/{actionId} Parameters for bulk action as a JSON object BulkStatus object

For instance:

curl -u Administrator:Administrator \
     -H 'Content-Type: application/json' \
     -X POST 'http://localhost:8080/nuxeo/api/v1/search/bulk/setProperties?query=SELECT * FROM Document' \
     -d '{
           "dc:nature": "article",
           "dc:subjects": ["art/architecture"],
           "disableAuditLogger": true,
           "VersioningOption": "NONE"
         }'

Handle Your Bulk Execution

Once you have submitted a bulk command, you can use these REST APIs:

Description HTTP Method Path Request Body Response
Fetch bulk status GET /api/v1/bulk/{commandId} / BulkStatus
Abort a bulk execution PUT /api/v1/bulk/{commandId}/abort / BulkStatus

Bulk Automation Operation

It is possible to submit a command through the Bulk.RunAction automation operation.

The following is an example of use of operation:

curl -u Administrator:Administrator \
     -H 'Content-Type: application/json' \
     -X POST 'http://localhost:8080/nuxeo/api/v1/automation/Bulk.RunAction' \
     -d '{"params":{
            "query":"SELECT * FROM Document",
            "action":"setProperties",
            "parameters":"{\"dc:nature\":\"article\",\"dc:subjects\":[\"art\/architecture\"]}"
          }
        }'

For testing purpose, it is possible to wait for the end of a bulk action with the Bulk.WaitForAction automation operation.

Testing a Bulk Action with REST API

Here is an example on how to launch a bulk command and get status:

## Run a bulk action
curl -s -X POST 'http://localhost:8080/nuxeo/api/v1/search/bulk/csvExport?query=SELECT%20*%20FROM%20File%20WHERE%20ecm:isVersion=0%20AND%20ecm:isTrashed=0' -u Administrator:Administrator -H ‘content-type: application/json’ -d ‘{}’ | tee /tmp/bulk-command.txt
# {"commandId":"e8cc059d-6b9d-480b-a6e1-b0edace6d982"}

## Extract the command id from the output
commandId=$(cat /tmp/bulk-command.txt | jq .[] | tr -d '"')

## Ask for the command status
curl -s -X GET "http://localhost:8080/nuxeo/api/v1/bulk/$commandId"  -u Administrator:Administrator  -H 'content-type: application/json' | jq .
# {
#  "entity-type": "bulkStatus",
#  "commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982",
#  "state": "RUNNING",
#  "processed": 0,
#  "total": 1844,
#  "submitted": "2018-10-11T13:10:26.825Z",
#  "scrollStart": "2018-10-11T13:10:26.827Z",
#  "scrollEnd": "2018-10-11T13:10:26.846Z",
#  "completed": null
#}

## Wait for the completion of the command, this is only for testing purpose
## a normal client should poll the status regularly instead of using this call:
curl -X POST 'http://localhost:8080/nuxeo/site/automation/Bulk.WaitForAction' -u Administrator:Administrator -H 'content-type: application/json' -d $'{
  "context": {},
  "params": {
    "commandId": "'"$commandId"'",
    "timeoutSecond": "3600"
  }
}'
# {"entity-type":"boolean","value":true}

## Get the status again:
curl -s -X GET "http://localhost:8080/nuxeo/api/v1/bulk/$commandId"  -u Administrator:Administrator  -H 'content-type: application/json' | jq .
#{
#  "entity-type": "bulkStatus",
#  "commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982",
#  "state": "COMPLETED",
#  "processed": 1844,
#  "total": 1844,
#  "submitted": "2018-10-11T13:10:26.825Z",
#  "scrollStart": "2018-10-11T13:10:26.827Z",
#  "scrollEnd": "2018-10-11T13:10:26.846Z",
#  "completed": "2018-10-11T13:10:28.243Z"
#}

It's possible to abort a command, this is useful for long-running command launched by error, or to by pass a command that fails systematically which blocks the entire action processor:

## Abort a command
curl -s -X PUT "http://localhost:8080/nuxeo/api/v1/bulk/$commandId/abort"  -u Administrator:Administrator  -H 'content-type: application/json' | jq .

Debugging

All streams used by the bulk service and action can be introspected using the Nuxeo bin/stream.sh script.

To get the latest commands submitted:

## When using Kafka
./bin/stream.sh tail -k -l bulk-command --codec avro
## When using Chronicle Queue
# ./bin/stream.sh tail --chronicle ./nxserver/data/stream/bulk -l command --codec avro
offset watermark flag key length data
bulk-command-01:+2 2018-10-11 11:18:34.955:0 [DEFAULT] setProperties 164 {"id": "b667b677-d40e-471a-8377-eb16dd301b42", "action": "setProperties", "query": "Select * from Document", "username": "Administrator", "repository": "default", "bucketSize": 100, "batchSize": 25, "params": "{\"dc:description\":\"a new new testbulk description\"}"}
bulk-command-00:+2 2018-10-11 15:10:26.826:0 [DEFAULT] csvExport 151 {"id": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", "action": "csvExport", "query": "SELECT * FROM File WHERE ecm:isVersion = 0 AND ecm:isTrashed = 0", "username": "Administrator", "repository": "default", "bucketSize": 100, "batchSize": 50, "params": null}

To get the latest commands completed:

./bin/stream.sh tail -k -l bulk-done --codec avro
offset watermark flag key length data
bulk-done-00:+4 2018-10-11 14:23:29.219:0 [DEFAULT] 580df47d-dd90-4d16-b23c-0e39ae363e06 96 {"commandId": "580df47d-dd90-4d16-b23c-0e39ae363e06", "action": "csvExport", "delta": false, "processed": 3873, "state": "COMPLETED", "submitTime": 1539260607207, "scrollStartTime": 1539260607275, "scrollEndTime": 1539260607326, "completedTime": 1539260609218, "total": 3873, "result": null}
bulk-done-00:+5 2018-10-11 15:10:28.244:0 [DEFAULT] e8cc059d-6b9d-480b-a6e1-b0edace6d982 96 {"commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", "action": "csvExport", "delta": false, "processed": 1844, "state": "COMPLETED", "submitTime": 1539263426825, "scrollStartTime": 1539263426827, "scrollEndTime": 1539263426846, "completedTime": 1539263428243, "total": 1844, "result": null}

To view the BulkBucket message:

./bin/stream.sh tail -k -l bulk-csvExport --codec avro
offset watermark flag key length data
bulk-csvExport-01:+48 2018-10-11 15:10:26.842:0 [DEFAULT] e8cc059d-6b9d-480b-a6e1-b0edace6d982:18 3750 {"commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", "ids": ["763135b8-ca49-4eea-9a52-1ceaa227e60a", ...]}

And check for any lag on any computation, for more information on stream.sh:

./bin/stream.sh help