Package org.nuxeo.ecm.core.work
Class StreamWorkManager
- java.lang.Object
-
- org.nuxeo.runtime.model.DefaultComponent
-
- org.nuxeo.ecm.core.work.WorkManagerImpl
-
- org.nuxeo.ecm.core.work.StreamWorkManager
-
- All Implemented Interfaces:
WorkManager,Adaptable,Component,Extensible,TimestampedService
public class StreamWorkManager extends WorkManagerImpl
WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed for performance reason.- Since:
- 9.3
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description classStreamWorkManager.WorkScheduling-
Nested classes/interfaces inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
WorkManagerImpl.WorkCompletionSynchronizer, WorkManagerImpl.WorkThreadPoolExecutor
-
Nested classes/interfaces inherited from interface org.nuxeo.ecm.core.work.api.WorkManager
WorkManager.Scheduling
-
-
Field Summary
-
Fields inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
active, categoryToQueueId, completionSynchronizer, DEAD_LETTER_QUEUE, DEAD_LETTER_QUEUE_CODEC, DEFAULT_CATEGORY, DEFAULT_QUEUE_ID, executors, GLOBAL_METRIC_PREFIX, IMPL_EP, NAME, QUEUES_EP, queuing, registry, SHUTDOWN_DELAY_MS_KEY, shutdownInProgress, started, THREAD_PREFIX, WORKMANAGER_PROCESSING_DISABLE, WORKMANAGER_PROCESSING_ENABLED
-
Fields inherited from class org.nuxeo.runtime.model.DefaultComponent
lastModified, name
-
-
Constructor Summary
Constructors Constructor Description StreamWorkManager()
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description protected voidactivateQueueMetrics(String queueId)booleanawaitCompletion(String queueId, long duration, TimeUnit unit)Waits for completion of work in a given queue.protected booleanawaitCompletionOnQueue(String queueId, long duration, TimeUnit unit)booleanawaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit)Deprecated.since 10.2 because unusedprotected voiddeactivateQueueMetrics(String queueId)Workfind(String s, Work.State state)Finds a work instance.intgetApplicationStartedOrder()The component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext).protected Codec<Record>getCodec()protected StringgetCodecName()protected StringgetLogConfig()Deprecated.protected LogManagergetLogManager()protected longgetLowWaterMark(String queueId)WorkQueueMetricsgetMetrics(String queueId)Gets the metrics for thequeueIdprotected WorkQueueMetricsgetMetricsWithNuxeoClassLoader(String queueId)protected intgetOverProvisioningFactor()protected intgetPartitions(int maxThreads)intgetQueueSize(String queueId, Work.State state)Gets the number of work instances in a given queue in a defined state.protected RecordFilterChaingetRecordFilter()protected StringgetRecordFilterClass()protected Map<String,String>getRecordFilterOptions()protected StreamManagergetStreamManager()Work.StategetWorkState(String workId)Gets the state in which a work instance is.voidinit()Starts up thisWorkManagerand attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)time.protected voidinitTopology()booleanisProcessingEnabled(String queueId)Is processing enabled for a given queue id.List<Work>listWork(String s, Work.State state)Lists the work instances in a given queue in a defined state.List<String>listWorkIds(String s, Work.State state)Lists the work ids in a given queue in a defined state.voidschedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit)Schedules work for execution at a later time, with a specific scheduling policy.protected booleanscheduleAfterCommit(Work work, WorkManager.Scheduling scheduling)Schedule after commit.booleanshutdown(long timeout, TimeUnit timeUnit)Shuts down thisWorkManagerand attempts to suspend and save the running and scheduled work instances.booleanshutdownQueue(String queueId, long timeout, TimeUnit unit)Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.voidstart(ComponentContext context)Start the component.booleansupportsProcessingDisabling()-
Methods inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
awaitCompletion, enableProcessing, enableProcessing, getCategoryQueueId, getExecutor, getTimestampAfter, getWorkQueueDescriptor, getWorkQueueIds, hasWorkInState, index, initDeadLetterQueueStream, isProcessingDisabled, isProcessingEnabled, isQueuingEnabled, isStarted, noScheduledOrRunningWork, registerContribution, remainingMillis, removeExecutor, schedule, schedule, schedule, setName, shutdownExecutors
-
Methods inherited from class org.nuxeo.runtime.model.DefaultComponent
activate, addRuntimeMessage, addRuntimeMessage, deactivate, getAdapter, getDescriptor, getDescriptors, getLastModified, getRegistry, register, registerExtension, setLastModified, setModifiedNow, stop, unregister, unregisterContribution, unregisterExtension
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.runtime.model.Component
applicationStarted
-
-
-
-
Field Detail
-
log
protected static final Log log
-
WORK_LOG_CONFIG_PROP
public static final String WORK_LOG_CONFIG_PROP
- See Also:
- Constant Field Values
-
DEFAULT_WORK_LOG_CONFIG
@Deprecated(since="2021.0") public static final String DEFAULT_WORK_LOG_CONFIG
Deprecated.- See Also:
- Constant Field Values
-
WORK_CODEC_PROP
public static final String WORK_CODEC_PROP
- See Also:
- Constant Field Values
-
DEFAULT_WORK_CODEC
public static final String DEFAULT_WORK_CODEC
- See Also:
- Constant Field Values
-
WORK_OVER_PROVISIONING_PROP
public static final String WORK_OVER_PROVISIONING_PROP
- See Also:
- Constant Field Values
-
DEFAULT_WORK_OVER_PROVISIONING
public static final String DEFAULT_WORK_OVER_PROVISIONING
- See Also:
- Constant Field Values
-
DEFAULT_CONCURRENCY
public static final int DEFAULT_CONCURRENCY
- See Also:
- Constant Field Values
-
lastMetrics
protected WorkQueueMetrics lastMetrics
-
lastMetricTime
protected long lastMetricTime
-
CACHE_LAST_METRIC_DURATION_MS
protected long CACHE_LAST_METRIC_DURATION_MS
-
NAMESPACE_PREFIX
public static final String NAMESPACE_PREFIX
- See Also:
- Constant Field Values
-
STATETTL_KEY
public static final String STATETTL_KEY
- Since:
- 10.2
- See Also:
- Constant Field Values
-
STORESTATE_KEY
public static final String STORESTATE_KEY
- Since:
- 10.2
- See Also:
- Constant Field Values
-
STATETTL_DEFAULT_VALUE
public static final long STATETTL_DEFAULT_VALUE
- Since:
- 10.2
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_CLASS_KEY
public static final String COMPUTATION_FILTER_CLASS_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_STORE_KEY
public static final String COMPUTATION_FILTER_STORE_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_STORE_TTL_KEY
public static final String COMPUTATION_FILTER_STORE_TTL_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_THRESHOLD_SIZE_KEY
public static final String COMPUTATION_FILTER_THRESHOLD_SIZE_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_PREFIX_KEY
public static final String COMPUTATION_FILTER_PREFIX_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
topology
protected Topology topology
-
topologyDisabled
protected Topology topologyDisabled
-
settings
protected Settings settings
-
streamProcessor
protected StreamProcessor streamProcessor
-
logManager
protected LogManager logManager
-
streamManager
protected StreamManager streamManager
-
storeState
protected boolean storeState
-
stateTTL
protected long stateTTL
-
-
Method Detail
-
getOverProvisioningFactor
protected int getOverProvisioningFactor()
-
getCodecName
protected String getCodecName()
-
schedule
public void schedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit)
Description copied from interface:WorkManagerSchedules work for execution at a later time, with a specific scheduling policy.- Specified by:
schedulein interfaceWorkManager- Overrides:
schedulein classWorkManagerImpl- Parameters:
work- the work to executescheduling- the scheduling policyafterCommit- iftrueand the work is scheduled, it will only be run after the current transaction (if any) has committed- See Also:
WorkManager.schedule(Work)
-
getApplicationStartedOrder
public int getApplicationStartedOrder()
Description copied from interface:ComponentThe component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext).Components are notified in increasing order. Order 1000 is the default order for components that don't care. Order 100 is the repository initialization.
- Specified by:
getApplicationStartedOrderin interfaceComponent- Overrides:
getApplicationStartedOrderin classWorkManagerImpl- Returns:
- the order, 1000 by default
-
start
public void start(ComponentContext context)
Description copied from interface:ComponentStart the component. This method is called after all the components were resolved and activated- Specified by:
startin interfaceComponent- Overrides:
startin classWorkManagerImpl
-
getRecordFilter
protected RecordFilterChain getRecordFilter()
-
getRecordFilterClass
protected String getRecordFilterClass()
-
init
public void init()
Description copied from interface:WorkManagerStarts up thisWorkManagerand attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)time.- Specified by:
initin interfaceWorkManager- Overrides:
initin classWorkManagerImpl
-
getLogManager
protected LogManager getLogManager()
-
getStreamManager
protected StreamManager getStreamManager()
-
getLogConfig
@Deprecated(since="2021.0") protected String getLogConfig()
Deprecated.
-
isProcessingEnabled
public boolean isProcessingEnabled(String queueId)
Description copied from interface:WorkManagerIs processing enabled for a given queue id.- Specified by:
isProcessingEnabledin interfaceWorkManager- Overrides:
isProcessingEnabledin classWorkManagerImpl
-
initTopology
protected void initTopology()
-
getPartitions
protected int getPartitions(int maxThreads)
-
activateQueueMetrics
protected void activateQueueMetrics(String queueId)
-
deactivateQueueMetrics
protected void deactivateQueueMetrics(String queueId)
-
shutdownQueue
public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit)
Description copied from interface:WorkManagerShuts down a work queue and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdownQueuein interfaceWorkManager- Overrides:
shutdownQueuein classWorkManagerImpl- Parameters:
queueId- the queue idtimeout- the time to waitunit- the timeout unit- Returns:
trueif shutdown is done,falseif there are still some threads executing after the timeout
-
shutdown
public boolean shutdown(long timeout, TimeUnit timeUnit)Description copied from interface:WorkManagerShuts down thisWorkManagerand attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdownin interfaceWorkManager- Overrides:
shutdownin classWorkManagerImpl- Parameters:
timeout- the time to waittimeUnit- the timeout unit- Returns:
trueif shutdown is done,falseif there are still some threads executing after the timeout
-
getQueueSize
public int getQueueSize(String queueId, Work.State state)
Description copied from interface:WorkManagerGets the number of work instances in a given queue in a defined state.- Specified by:
getQueueSizein interfaceWorkManager- Overrides:
getQueueSizein classWorkManagerImpl- Parameters:
queueId- the queue idstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed (SCHEDULEDorRUNNING)- Returns:
- the number of work instances in the given state
-
getMetricsWithNuxeoClassLoader
protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId)
-
getMetrics
public WorkQueueMetrics getMetrics(String queueId)
Description copied from interface:WorkManagerGets the metrics for thequeueId- Specified by:
getMetricsin interfaceWorkManager- Overrides:
getMetricsin classWorkManagerImpl
-
awaitCompletion
public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException
Description copied from interface:WorkManagerWaits for completion of work in a given queue.- Specified by:
awaitCompletionin interfaceWorkManager- Overrides:
awaitCompletionin classWorkManagerImpl- Parameters:
queueId- the queue idduration- the time to waitunit- the timeout unit- Returns:
trueif all work completed in the queue, orfalseif there is still some non-completed work after the timeout- Throws:
InterruptedException
-
awaitCompletionOnQueue
protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException
- Throws:
InterruptedException
-
awaitCompletionWithWaterMark
@Deprecated public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) throws InterruptedException
Deprecated.since 10.2 because unused- Throws:
InterruptedException
-
getLowWaterMark
protected long getLowWaterMark(String queueId)
-
getWorkState
public Work.State getWorkState(String workId)
Description copied from interface:WorkManager- Specified by:
getWorkStatein interfaceWorkManager- Overrides:
getWorkStatein classWorkManagerImpl- Parameters:
workId- the id of the work to find- Returns:
- the work state, or
nullif not found
-
find
public Work find(String s, Work.State state)
Description copied from interface:WorkManagerFinds a work instance.- Specified by:
findin interfaceWorkManager- Overrides:
findin classWorkManagerImpl- Parameters:
s- the id of the work to findstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the found work instance, or
nullif not found
-
listWork
public List<Work> listWork(String s, Work.State state)
Description copied from interface:WorkManagerLists the work instances in a given queue in a defined state.- Specified by:
listWorkin interfaceWorkManager- Overrides:
listWorkin classWorkManagerImpl- Parameters:
s- the queue idstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the list of work instances in the given state
-
listWorkIds
public List<String> listWorkIds(String s, Work.State state)
Description copied from interface:WorkManagerLists the work ids in a given queue in a defined state.- Specified by:
listWorkIdsin interfaceWorkManager- Overrides:
listWorkIdsin classWorkManagerImpl- Parameters:
s- the queue idstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the list of work ids in the given state
-
scheduleAfterCommit
protected boolean scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling)
Description copied from class:WorkManagerImplSchedule after commit. Returnsfalseif impossible (no transaction or transaction manager).- Overrides:
scheduleAfterCommitin classWorkManagerImpl
-
supportsProcessingDisabling
public boolean supportsProcessingDisabling()
- Specified by:
supportsProcessingDisablingin interfaceWorkManager- Overrides:
supportsProcessingDisablingin classWorkManagerImpl- Returns:
- true if the implementation supports processing disabling
-
-