Package org.nuxeo.ecm.core.work
Class WorkManagerImpl
java.lang.Object
org.nuxeo.runtime.model.DefaultComponent
org.nuxeo.ecm.core.work.WorkManagerImpl
- All Implemented Interfaces:
WorkManager
,Adaptable
,Component
,Extensible
,TimestampedService
- Direct Known Subclasses:
StreamWorkManager
The implementation of a
WorkManager
. This delegates the queuing implementation to a WorkQueuing
implementation.- Since:
- 5.6
-
Nested Class Summary
Modifier and TypeClassDescriptionprotected static class
Simple synchronizer to wake up when an in-JVM work is completed.class
A work instance and how to schedule it, for schedule-after-commit.protected class
AThreadPoolExecutor
that keeps available the list of running tasks.Nested classes/interfaces inherited from interface org.nuxeo.ecm.core.work.api.WorkManager
WorkManager.Scheduling
-
Field Summary
Modifier and TypeFieldDescriptionprotected boolean
static final Name
The dead letter queue stream name.static final String
static final String
protected final Map<String,
WorkManagerImpl.WorkThreadPoolExecutor> protected static final String
protected static final String
static final String
protected static final String
protected WorkQueuing
protected final io.dropwizard.metrics5.MetricRegistry
static final String
protected boolean
protected boolean
protected static final String
static final String
Deprecated.static final String
Fields inherited from class org.nuxeo.runtime.model.DefaultComponent
lastModified, name
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionboolean
awaitCompletion
(long duration, TimeUnit unit) Waits for completion of all work.boolean
awaitCompletion
(String queueId, long duration, TimeUnit unit) Waits for completion of work in a given queue.void
enableProcessing
(boolean value) Set processing for all queuesvoid
enableProcessing
(String queueId, boolean value) Set processing for a given queue id.find
(String workId, Work.State state) Finds a work instance.int
The component notification order forComponent.start(ComponentContext)
.getCategoryQueueId
(String category) Gets the queue id used for a given work category.protected WorkManagerImpl.WorkThreadPoolExecutor
getExecutor
(String queueId) getMetrics
(String queueId) Gets the metrics for thequeueId
protected long
getTimestampAfter
(long durationInMs) getWorkQueueDescriptor
(String queueId) Gets the work queue descriptor for a given queue id.Lists the ids of the existing work queues.getWorkState
(String workId) Gets the state in which a work instance is.protected boolean
hasWorkInState
(String workId, Work.State state) protected void
index()
void
init()
Starts up thisWorkManager
and attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)
time.protected void
protected boolean
boolean
Is processing enabled for at least one queueboolean
isProcessingEnabled
(String queueId) Is processing enabled for a given queue id.boolean
isQueuingEnabled
(String queueId) Is queuing enabled for a given queue id.boolean
listWork
(String queueId, Work.State state) Lists the work instances in a given queue in a defined state.listWorkIds
(String queueId, Work.State state) Lists the work ids in a given queue in a defined state.protected boolean
noScheduledOrRunningWork
(String queueId) void
registerContribution
(Object contribution, String xp, ComponentInstance component) protected long
remainingMillis
(long t0, long delay) Deprecated.since 10.2 because unusedprotected void
removeExecutor
(String queueId) Deprecated.since 10.2 because unusedvoid
Schedules work for execution at a later time.void
Schedules work for execution at a later time, after the current transaction (if any) has committed.void
schedule
(Work work, WorkManager.Scheduling scheduling) Schedules work for execution at a later time, with a specific scheduling policy.void
schedule
(Work work, WorkManager.Scheduling scheduling, boolean afterCommit) Schedules work for execution at a later time, with a specific scheduling policy.protected boolean
scheduleAfterCommit
(Work work, WorkManager.Scheduling scheduling) Schedule after commit.void
Sets the name for this component, as it was defined in its XML.boolean
Shuts down thisWorkManager
and attempts to suspend and save the running and scheduled work instances.protected boolean
shutdownExecutors
(Collection<WorkManagerImpl.WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) boolean
shutdownQueue
(String queueId, long timeout, TimeUnit unit) Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.void
start
(ComponentContext context) Start the component.boolean
Methods inherited from class org.nuxeo.runtime.model.DefaultComponent
activate, addRuntimeMessage, addRuntimeMessage, deactivate, getAdapter, getDescriptor, getDescriptors, getLastModified, getRegistry, register, registerExtension, setLastModified, setModifiedNow, stop, unregister, unregisterContribution, unregisterExtension
-
Field Details
-
NAME
- See Also:
-
QUEUES_EP
- See Also:
-
IMPL_EP
- See Also:
-
DEFAULT_QUEUE_ID
- See Also:
-
DEFAULT_CATEGORY
- See Also:
-
THREAD_PREFIX
- See Also:
-
SHUTDOWN_DELAY_MS_KEY
- Since:
- 10.2
- See Also:
-
WORKMANAGER_PROCESSING_DISABLE
Deprecated.UseWORKMANAGER_PROCESSING_ENABLED
instead- Since:
- 11.1
- See Also:
-
WORKMANAGER_PROCESSING_ENABLED
- Since:
- 11.2
- See Also:
-
DEAD_LETTER_QUEUE
The dead letter queue stream name.- Since:
- 11.1
-
DEAD_LETTER_QUEUE_CODEC
-
GLOBAL_METRIC_PREFIX
- See Also:
-
registry
protected final io.dropwizard.metrics5.MetricRegistry registry -
executors
-
categoryToQueueId
-
queuing
-
active
protected boolean active -
completionSynchronizer
-
started
protected volatile boolean started -
shutdownInProgress
protected volatile boolean shutdownInProgress
-
-
Constructor Details
-
WorkManagerImpl
public WorkManagerImpl()
-
-
Method Details
-
setName
Description copied from interface:Component
Sets the name for this component, as it was defined in its XML.This is called once after construction by the runtime framework.
- Specified by:
setName
in interfaceComponent
- Overrides:
setName
in classDefaultComponent
- Parameters:
name
- the name
-
registerContribution
- Overrides:
registerContribution
in classDefaultComponent
-
isQueuingEnabled
Description copied from interface:WorkManager
Is queuing enabled for a given queue id.- Specified by:
isQueuingEnabled
in interfaceWorkManager
-
enableProcessing
public void enableProcessing(boolean value) Description copied from interface:WorkManager
Set processing for all queues- Specified by:
enableProcessing
in interfaceWorkManager
-
enableProcessing
Description copied from interface:WorkManager
Set processing for a given queue id.- Specified by:
enableProcessing
in interfaceWorkManager
-
isProcessingEnabled
public boolean isProcessingEnabled()Description copied from interface:WorkManager
Is processing enabled for at least one queue- Specified by:
isProcessingEnabled
in interfaceWorkManager
-
isProcessingDisabled
protected boolean isProcessingDisabled() -
isProcessingEnabled
Description copied from interface:WorkManager
Is processing enabled for a given queue id.- Specified by:
isProcessingEnabled
in interfaceWorkManager
-
getWorkQueueIds
Description copied from interface:WorkManager
Lists the ids of the existing work queues.- Specified by:
getWorkQueueIds
in interfaceWorkManager
- Returns:
- the list of queue ids
-
getWorkQueueDescriptor
Description copied from interface:WorkManager
Gets the work queue descriptor for a given queue id.- Specified by:
getWorkQueueDescriptor
in interfaceWorkManager
- Parameters:
queueId
- the queue id- Returns:
- the work queue descriptor, or
null
-
getCategoryQueueId
Description copied from interface:WorkManager
Gets the queue id used for a given work category.- Specified by:
getCategoryQueueId
in interfaceWorkManager
- Parameters:
category
- the category- Returns:
- the queue id
-
getApplicationStartedOrder
public int getApplicationStartedOrder()Description copied from interface:Component
The component notification order forComponent.start(ComponentContext)
.Components are notified in increasing order. Order 1000 is the default order for components that don't care. Order 100 is the repository initialization.
- Specified by:
getApplicationStartedOrder
in interfaceComponent
- Returns:
- the order, 1000 by default
-
start
Description copied from interface:Component
Start the component. This method is called after all the components were resolved and activated- Specified by:
start
in interfaceComponent
- Overrides:
start
in classDefaultComponent
-
initDeadLetterQueueStream
protected void initDeadLetterQueueStream() -
init
public void init()Description copied from interface:WorkManager
Starts up thisWorkManager
and attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)
time.- Specified by:
init
in interfaceWorkManager
-
index
protected void index() -
getExecutor
-
shutdownQueue
public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException Description copied from interface:WorkManager
Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdownQueue
in interfaceWorkManager
- Parameters:
queueId
- the queue idtimeout
- the time to waitunit
- the timeout unit- Returns:
true
if shutdown is done,false
if there are still some threads executing after the timeout- Throws:
InterruptedException
-
shutdownExecutors
protected boolean shutdownExecutors(Collection<WorkManagerImpl.WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) throws InterruptedException - Throws:
InterruptedException
-
remainingMillis
Deprecated.since 10.2 because unused -
removeExecutor
Deprecated.since 10.2 because unused -
shutdown
Description copied from interface:WorkManager
Shuts down thisWorkManager
and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdown
in interfaceWorkManager
- Parameters:
timeout
- the time to waitunit
- the timeout unit- Returns:
true
if shutdown is done,false
if there are still some threads executing after the timeout- Throws:
InterruptedException
-
schedule
Description copied from interface:WorkManager
Schedules work for execution at a later time.This method is identical to
WorkManager.schedule(Work, boolean)
withafterCommit = false
.- Specified by:
schedule
in interfaceWorkManager
- Parameters:
work
- the work to execute
-
schedule
Description copied from interface:WorkManager
Schedules work for execution at a later time, after the current transaction (if any) has committed.- Specified by:
schedule
in interfaceWorkManager
- Parameters:
work
- the work to executeafterCommit
- iftrue
and the work is scheduled, it will only be run after the current transaction (if any) has committed
-
schedule
Description copied from interface:WorkManager
Schedules work for execution at a later time, with a specific scheduling policy.This method is identical to
WorkManager.schedule(Work, Scheduling, boolean)
withafterCommit = false
.- Specified by:
schedule
in interfaceWorkManager
- Parameters:
work
- the work to executescheduling
- the scheduling policy- See Also:
-
schedule
Description copied from interface:WorkManager
Schedules work for execution at a later time, with a specific scheduling policy.- Specified by:
schedule
in interfaceWorkManager
- Parameters:
work
- the work to executescheduling
- the scheduling policyafterCommit
- iftrue
and the work is scheduled, it will only be run after the current transaction (if any) has committed- See Also:
-
scheduleAfterCommit
Schedule after commit. Returnsfalse
if impossible (no transaction or transaction manager).- Since:
- 5.8
-
find
Description copied from interface:WorkManager
Finds a work instance.- Specified by:
find
in interfaceWorkManager
- Parameters:
workId
- the id of the work to findstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the found work instance, or
null
if not found
-
hasWorkInState
- Parameters:
state
- SCHEDULED, RUNNING or null for both
-
getWorkState
Description copied from interface:WorkManager
- Specified by:
getWorkState
in interfaceWorkManager
- Parameters:
workId
- the id of the work to find- Returns:
- the work state, or
null
if not found
-
listWork
Description copied from interface:WorkManager
Lists the work instances in a given queue in a defined state.- Specified by:
listWork
in interfaceWorkManager
- Parameters:
queueId
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work instances in the given state
-
listWorkIds
Description copied from interface:WorkManager
Lists the work ids in a given queue in a defined state.- Specified by:
listWorkIds
in interfaceWorkManager
- Parameters:
queueId
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work ids in the given state
-
getMetrics
Description copied from interface:WorkManager
Gets the metrics for thequeueId
- Specified by:
getMetrics
in interfaceWorkManager
-
awaitCompletion
Description copied from interface:WorkManager
Waits for completion of all work.- Specified by:
awaitCompletion
in interfaceWorkManager
- Parameters:
duration
- the time to waitunit
- the timeout unit- Returns:
true
if all work completed, orfalse
if there is still some non-completed work after the timeout- Throws:
InterruptedException
-
awaitCompletion
public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException Description copied from interface:WorkManager
Waits for completion of work in a given queue.- Specified by:
awaitCompletion
in interfaceWorkManager
- Parameters:
queueId
- the queue idduration
- the time to waitunit
- the timeout unit- Returns:
true
if all work completed in the queue, orfalse
if there is still some non-completed work after the timeout- Throws:
InterruptedException
-
getTimestampAfter
protected long getTimestampAfter(long durationInMs) -
noScheduledOrRunningWork
-
isStarted
public boolean isStarted()- Specified by:
isStarted
in interfaceWorkManager
- Returns:
true
if active- See Also:
-
supportsProcessingDisabling
public boolean supportsProcessingDisabling()- Specified by:
supportsProcessingDisabling
in interfaceWorkManager
- Returns:
- true if the implementation supports processing disabling
-
WORKMANAGER_PROCESSING_ENABLED
instead