Package org.nuxeo.ecm.core.work
Class WorkManagerImpl
java.lang.Object
org.nuxeo.runtime.model.DefaultComponent
org.nuxeo.ecm.core.work.WorkManagerImpl
- All Implemented Interfaces:
WorkManager,Adaptable,Component,Extensible,TimestampedService
- Direct Known Subclasses:
StreamWorkManager
The implementation of a
WorkManager. This delegates the queuing implementation to a WorkQueuing
implementation.- Since:
- 5.6
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classSimple synchronizer to wake up when an in-JVM work is completed.classA work instance and how to schedule it, for schedule-after-commit.protected classAThreadPoolExecutorthat keeps available the list of running tasks.Nested classes/interfaces inherited from interface org.nuxeo.ecm.core.work.api.WorkManager
WorkManager.Scheduling -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected booleanstatic final NameThe dead letter queue stream name.static final Stringstatic final Stringprotected final Map<String,WorkManagerImpl.WorkThreadPoolExecutor> protected static final Stringprotected static final Stringstatic final Stringprotected static final Stringprotected WorkQueuingprotected final io.dropwizard.metrics5.MetricRegistrystatic final Stringprotected booleanprotected booleanprotected static final Stringstatic final StringDeprecated.static final StringFields inherited from class org.nuxeo.runtime.model.DefaultComponent
lastModified, name -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionbooleanawaitCompletion(long duration, TimeUnit unit) Waits for completion of all work.booleanawaitCompletion(String queueId, long duration, TimeUnit unit) Waits for completion of work in a given queue.voidenableProcessing(boolean value) Set processing for all queuesvoidenableProcessing(String queueId, boolean value) Set processing for a given queue id.find(String workId, Work.State state) Finds a work instance.intThe component notification order forComponent.start(ComponentContext).getCategoryQueueId(String category) Gets the queue id used for a given work category.protected WorkManagerImpl.WorkThreadPoolExecutorgetExecutor(String queueId) getMetrics(String queueId) Gets the metrics for thequeueIdprotected longgetTimestampAfter(long durationInMs) getWorkQueueDescriptor(String queueId) Gets the work queue descriptor for a given queue id.Lists the ids of the existing work queues.getWorkState(String workId) Gets the state in which a work instance is.protected booleanhasWorkInState(String workId, Work.State state) protected voidindex()voidinit()Starts up thisWorkManagerand attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)time.protected voidprotected booleanbooleanIs processing enabled for at least one queuebooleanisProcessingEnabled(String queueId) Is processing enabled for a given queue id.booleanisQueuingEnabled(String queueId) Is queuing enabled for a given queue id.booleanlistWork(String queueId, Work.State state) Lists the work instances in a given queue in a defined state.listWorkIds(String queueId, Work.State state) Lists the work ids in a given queue in a defined state.protected booleannoScheduledOrRunningWork(String queueId) voidregisterContribution(Object contribution, String xp, ComponentInstance component) protected longremainingMillis(long t0, long delay) Deprecated.since 10.2 because unusedprotected voidremoveExecutor(String queueId) Deprecated.since 10.2 because unusedvoidSchedules work for execution at a later time.voidSchedules work for execution at a later time, after the current transaction (if any) has committed.voidschedule(Work work, WorkManager.Scheduling scheduling) Schedules work for execution at a later time, with a specific scheduling policy.voidschedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit) Schedules work for execution at a later time, with a specific scheduling policy.protected booleanscheduleAfterCommit(Work work, WorkManager.Scheduling scheduling) Schedule after commit.voidSets the name for this component, as it was defined in its XML.booleanShuts down thisWorkManagerand attempts to suspend and save the running and scheduled work instances.protected booleanshutdownExecutors(Collection<WorkManagerImpl.WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) booleanshutdownQueue(String queueId, long timeout, TimeUnit unit) Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.voidstart(ComponentContext context) Start the component.booleanMethods inherited from class org.nuxeo.runtime.model.DefaultComponent
activate, addRuntimeMessage, addRuntimeMessage, deactivate, getAdapter, getDescriptor, getDescriptors, getLastModified, getRegistry, register, registerExtension, setLastModified, setModifiedNow, stop, unregister, unregisterContribution, unregisterExtension
-
Field Details
-
NAME
- See Also:
-
QUEUES_EP
- See Also:
-
IMPL_EP
- See Also:
-
DEFAULT_QUEUE_ID
- See Also:
-
DEFAULT_CATEGORY
- See Also:
-
THREAD_PREFIX
- See Also:
-
SHUTDOWN_DELAY_MS_KEY
- Since:
- 10.2
- See Also:
-
WORKMANAGER_PROCESSING_DISABLE
Deprecated.UseWORKMANAGER_PROCESSING_ENABLEDinstead- Since:
- 11.1
- See Also:
-
WORKMANAGER_PROCESSING_ENABLED
- Since:
- 11.2
- See Also:
-
DEAD_LETTER_QUEUE
The dead letter queue stream name.- Since:
- 11.1
-
DEAD_LETTER_QUEUE_CODEC
-
GLOBAL_METRIC_PREFIX
- See Also:
-
registry
protected final io.dropwizard.metrics5.MetricRegistry registry -
executors
-
categoryToQueueId
-
queuing
-
active
protected boolean active -
completionSynchronizer
-
started
protected volatile boolean started -
shutdownInProgress
protected volatile boolean shutdownInProgress
-
-
Constructor Details
-
WorkManagerImpl
public WorkManagerImpl()
-
-
Method Details
-
setName
Description copied from interface:ComponentSets the name for this component, as it was defined in its XML.This is called once after construction by the runtime framework.
- Specified by:
setNamein interfaceComponent- Overrides:
setNamein classDefaultComponent- Parameters:
name- the name
-
registerContribution
- Overrides:
registerContributionin classDefaultComponent
-
isQueuingEnabled
Description copied from interface:WorkManagerIs queuing enabled for a given queue id.- Specified by:
isQueuingEnabledin interfaceWorkManager
-
enableProcessing
public void enableProcessing(boolean value) Description copied from interface:WorkManagerSet processing for all queues- Specified by:
enableProcessingin interfaceWorkManager
-
enableProcessing
Description copied from interface:WorkManagerSet processing for a given queue id.- Specified by:
enableProcessingin interfaceWorkManager
-
isProcessingEnabled
public boolean isProcessingEnabled()Description copied from interface:WorkManagerIs processing enabled for at least one queue- Specified by:
isProcessingEnabledin interfaceWorkManager
-
isProcessingDisabled
protected boolean isProcessingDisabled() -
isProcessingEnabled
Description copied from interface:WorkManagerIs processing enabled for a given queue id.- Specified by:
isProcessingEnabledin interfaceWorkManager
-
getWorkQueueIds
Description copied from interface:WorkManagerLists the ids of the existing work queues.- Specified by:
getWorkQueueIdsin interfaceWorkManager- Returns:
- the list of queue ids
-
getWorkQueueDescriptor
Description copied from interface:WorkManagerGets the work queue descriptor for a given queue id.- Specified by:
getWorkQueueDescriptorin interfaceWorkManager- Parameters:
queueId- the queue id- Returns:
- the work queue descriptor, or
null
-
getCategoryQueueId
Description copied from interface:WorkManagerGets the queue id used for a given work category.- Specified by:
getCategoryQueueIdin interfaceWorkManager- Parameters:
category- the category- Returns:
- the queue id
-
getApplicationStartedOrder
public int getApplicationStartedOrder()Description copied from interface:ComponentThe component notification order forComponent.start(ComponentContext).Components are notified in increasing order. Order 1000 is the default order for components that don't care. Order 100 is the repository initialization.
- Specified by:
getApplicationStartedOrderin interfaceComponent- Returns:
- the order, 1000 by default
-
start
Description copied from interface:ComponentStart the component. This method is called after all the components were resolved and activated- Specified by:
startin interfaceComponent- Overrides:
startin classDefaultComponent
-
initDeadLetterQueueStream
protected void initDeadLetterQueueStream() -
init
public void init()Description copied from interface:WorkManagerStarts up thisWorkManagerand attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)time.- Specified by:
initin interfaceWorkManager
-
index
protected void index() -
getExecutor
-
shutdownQueue
public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException Description copied from interface:WorkManagerShuts down a work queue and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdownQueuein interfaceWorkManager- Parameters:
queueId- the queue idtimeout- the time to waitunit- the timeout unit- Returns:
trueif shutdown is done,falseif there are still some threads executing after the timeout- Throws:
InterruptedException
-
shutdownExecutors
protected boolean shutdownExecutors(Collection<WorkManagerImpl.WorkThreadPoolExecutor> list, long timeout, TimeUnit unit) throws InterruptedException - Throws:
InterruptedException
-
remainingMillis
Deprecated.since 10.2 because unused -
removeExecutor
Deprecated.since 10.2 because unused -
shutdown
Description copied from interface:WorkManagerShuts down thisWorkManagerand attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdownin interfaceWorkManager- Parameters:
timeout- the time to waitunit- the timeout unit- Returns:
trueif shutdown is done,falseif there are still some threads executing after the timeout- Throws:
InterruptedException
-
schedule
Description copied from interface:WorkManagerSchedules work for execution at a later time.This method is identical to
WorkManager.schedule(Work, boolean)withafterCommit = false.- Specified by:
schedulein interfaceWorkManager- Parameters:
work- the work to execute
-
schedule
Description copied from interface:WorkManagerSchedules work for execution at a later time, after the current transaction (if any) has committed.- Specified by:
schedulein interfaceWorkManager- Parameters:
work- the work to executeafterCommit- iftrueand the work is scheduled, it will only be run after the current transaction (if any) has committed
-
schedule
Description copied from interface:WorkManagerSchedules work for execution at a later time, with a specific scheduling policy.This method is identical to
WorkManager.schedule(Work, Scheduling, boolean)withafterCommit = false.- Specified by:
schedulein interfaceWorkManager- Parameters:
work- the work to executescheduling- the scheduling policy- See Also:
-
schedule
Description copied from interface:WorkManagerSchedules work for execution at a later time, with a specific scheduling policy.- Specified by:
schedulein interfaceWorkManager- Parameters:
work- the work to executescheduling- the scheduling policyafterCommit- iftrueand the work is scheduled, it will only be run after the current transaction (if any) has committed- See Also:
-
scheduleAfterCommit
Schedule after commit. Returnsfalseif impossible (no transaction or transaction manager).- Since:
- 5.8
-
find
Description copied from interface:WorkManagerFinds a work instance.- Specified by:
findin interfaceWorkManager- Parameters:
workId- the id of the work to findstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the found work instance, or
nullif not found
-
hasWorkInState
- Parameters:
state- SCHEDULED, RUNNING or null for both
-
getWorkState
Description copied from interface:WorkManager- Specified by:
getWorkStatein interfaceWorkManager- Parameters:
workId- the id of the work to find- Returns:
- the work state, or
nullif not found
-
listWork
Description copied from interface:WorkManagerLists the work instances in a given queue in a defined state.- Specified by:
listWorkin interfaceWorkManager- Parameters:
queueId- the queue idstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the list of work instances in the given state
-
listWorkIds
Description copied from interface:WorkManagerLists the work ids in a given queue in a defined state.- Specified by:
listWorkIdsin interfaceWorkManager- Parameters:
queueId- the queue idstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the list of work ids in the given state
-
getMetrics
Description copied from interface:WorkManagerGets the metrics for thequeueId- Specified by:
getMetricsin interfaceWorkManager
-
awaitCompletion
Description copied from interface:WorkManagerWaits for completion of all work.- Specified by:
awaitCompletionin interfaceWorkManager- Parameters:
duration- the time to waitunit- the timeout unit- Returns:
trueif all work completed, orfalseif there is still some non-completed work after the timeout- Throws:
InterruptedException
-
awaitCompletion
public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException Description copied from interface:WorkManagerWaits for completion of work in a given queue.- Specified by:
awaitCompletionin interfaceWorkManager- Parameters:
queueId- the queue idduration- the time to waitunit- the timeout unit- Returns:
trueif all work completed in the queue, orfalseif there is still some non-completed work after the timeout- Throws:
InterruptedException
-
getTimestampAfter
protected long getTimestampAfter(long durationInMs) -
noScheduledOrRunningWork
-
isStarted
public boolean isStarted()- Specified by:
isStartedin interfaceWorkManager- Returns:
trueif active- See Also:
-
supportsProcessingDisabling
public boolean supportsProcessingDisabling()- Specified by:
supportsProcessingDisablingin interfaceWorkManager- Returns:
- true if the implementation supports processing disabling
-
WORKMANAGER_PROCESSING_ENABLEDinstead