The WorkManager service allows you to run code later, asynchronously, in a separate thread and transaction.
There are two implementations of the WorkManager: the in-memory
WorkManagerImpl and the StreamWorkManager.
The only implementation that persists the works and is cluster-aware is the
StreamWorkManager. It has been introduced in Nuxeo 9.3 and is based on Nuxeo Stream. The in-memory implementation should not be used in production.
The code to be executed asynchronously must be implemented in an
AbstractWork subclass. In theory you could just implement
Work yourself but this is strongly discouraged, for forward-compatibility reasons.
Each Work instance must have a unique id describing the Work in its entirety. The id can either be random (just call the empty
AbstractWork() constructor), or specified by the caller (
It is important to note that 2 works with the same id are considered identical, the work manager implementation can skip successive invocation to save resources.
At construction time, the Work instance should also have its
setDocument() method called to set the repository name and document(s) id(s) that this Work instance is going to be dealing with, and a flag specifying whether the work is actually about a whole subtree under the given document. This is important for monitoring and locking purposes.
A Work instance must be
Serializable in order for it to be persisted between server restarts. All fields that are used only during work execution but don't hold any configuration state must be marked
A Work instance must implement the following methods:
getCategory()should return the Work category, which is used to choose which Work Queue to use when queuing it for later execution.
getTitle()should return a human-readable name for the Work instance, for monitoring purposes.
getRetryCount()should return the number of retries allowed; this is used if
retryableWork()should be implemented to do the actual work.
cleanUp()can be overridden if there is some cleanup to do when the work is done. Make sure you call
rollbackAndRetryTransaction()can be overridden for a retryable Work if there is some cleanup to do before a Work is retried. Make sure you call
The meat of the Work instance is the
work() method (or
retryableWork(), but we won't repeat this each time). The following is available during method execution:
setStatus()should be called with a human-readable status at key times during the execution (for instance Starting, Extracting, Done), for monitoring purposes.
setProgress()should be called periodically during the execution, if there is any progress to measure, for monitoring purposes.
initSession()can be called to initialize the
CoreSession) on the repository specified at construction time.
For long-running Work,
work() should periodically check for
isSuspending() and if
true save the work state in non-transient fields of the Work instance and call
suspended() and then return. This is how a server shutdown can interrupt a long-running Work, persist it, and relaunch it when the server restarts.
Once a Work instance is constructed, it must be sent to the WorkManager for execution using the
schedule() method. The recommended way to call this method is:
WorkManager workManager = Framework.getService(WorkManager.class); workManager.schedule(work, true);
schedule() method takes a second parameter which is
afterCommit, and should be
true in most cases otherwise the Work may be scheduled immediately even before the current transaction has committed, which would prevent the Work from seeing the results of the current transaction.
You should avoid using the
Scheduling argument to the
schedule() method, as it may become deprecated in the near future.
Every Work instance is queued in a Work Queue. Each queue is associated with a thread pool of one or more threads, which will execute the Work instances whenever a thread becomes available.
<extension point="queues" target="org.nuxeo.ecm.core.work.service"> <queue id="myqueue"> <name>My Queue</name> <maxThreads>2</maxThreads> <category>somecategory1</category> <category>somecategory2</category> </queue> </extension>
The details of the configuration can be seen in the extension point documentation. Here we'll concentrate on the important points:
- The name is a human-readable name, for monitoring purposes.
- The categories define which Work instances go to this queue.
- The number of threads define how many parallel executions of Work instances can be happening.
If a Work instance has a category that isn't registered with any queue, then it will be sent to the
If you want a given Nuxeo instance to stop processing a given queue, you can specify in this instance's configuration to disable the queue:
<extension point="queues" target="org.nuxeo.ecm.core.work.service"> <queue id="myqueue" processing="false"/> </extension>
To disable all queues and just re-enable one you want to see active, you can use something like:
<extension point="queues" target="org.nuxeo.ecm.core.work.service"> <queue id="*" processing="false"/> <queue id="myqueue" processing="true"/> </extension>
When deploying a Nuxeo Cluster, you may be interested in dedicating some of the nodes to some specific tasks.
There may be several motivations for that:
- You have some heavy asynchronous processing and you don't want this slow down the interactive processing. A typical use case is to use the Video conversion jobs on nodes that do not handle user facing processing.
Some asynchronous processing requires some specific resource that is not available on all nodes:
- Hardware resources like GPU used for image processing
- Software resources like an antivirus software
The Nuxeo Platform allows to achieve this for asynchronous processing using the
The idea is that all the Nuxeo nodes can remain exactly identical to each other, at least in terms of Nuxeo bundles deployment (it is usually easier to keep all nodes aligned on the same packages).
Dedicating nodes to a given task is handled by two aspects:
- The load balancer: typically interactive processing nodes are used by the load balancer to handle end users requests.
- The WorkManager queues: some queues will be assigned to some background processing nodes so that only these dedicated nodes will consume the jobs in these queues.
This is where the queue configuration comes into play. Let's say you define a queue for managing Video conversions:
<extension point="queues" target="org.nuxeo.ecm.core.work.service"> <queue id="HeavyProcessing"> <name>HeavyProcessingQueue</name> <maxThreads>2</maxThreads> <category>VideoConversion</category> <category>PictureConversions</category> </queue> </extension>
Typically, the "interactive nodes" are configured to not consume this type of queue.
<extension point="queues" target="org.nuxeo.ecm.core.work.service"> <queue id="HeavyProcessing" processing="false"/> </extension>
Then the "background processing nodes" are on the contrary, configured to consume these queues: so they will do the heavy processing that was scheduled by the interactive nodes.
<extension point="queues" target="org.nuxeo.ecm.core.work.service"> <queue id="HeavyProcessing" processing="true"/> </extension>
Since Nuxeo 10.2 we have deprecated some of WorkManager original API that cannot scale or are not reliable in distributed environments.
- Retrieving a Work object is not safe, the Work being mutable the returned instance can be different that the scheduled or running one.
- Listing works in a specific state cannot scale properly when having millions of works and it is not reliable because a work listed as scheduled might be in another state while the list is returned. Another problem with these methods is that they provide a way to manage manually the work execution which creates complex and untested (in distributed environments) cases.
In our code base we found usage of these methods were not needed or misuse of the WorkManager API that could be better implemented using other ways.
We now have optimizations flags like:
isIdempotentto prevent Work with the same identifier to be executed multiple times, only the first Work is executed, others can be skipped.
isCoalescingwhen scheduling many times the same Work (same identifier): we care only about the latest execution. These optimization flags are taken into account with the
In addition we have added a way to trigger an action after a group of Works is completed. This is called a
GroupJoin Work, the work has to set the
isGroupJoin flag and
onGroupJoinCompletion method, Works will be part of the same group based on the
GroupJoin Work is supported by both
WorkManager implementations (default and