Package org.nuxeo.ecm.core.work
Class StreamWorkManager
- java.lang.Object
-
- org.nuxeo.runtime.model.DefaultComponent
-
- org.nuxeo.ecm.core.work.WorkManagerImpl
-
- org.nuxeo.ecm.core.work.StreamWorkManager
-
- All Implemented Interfaces:
WorkManager
,Adaptable
,Component
,Extensible
,TimestampedService
public class StreamWorkManager extends WorkManagerImpl
WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed for performance reason.- Since:
- 9.3
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description class
StreamWorkManager.WorkScheduling
-
Nested classes/interfaces inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
WorkManagerImpl.WorkCompletionSynchronizer, WorkManagerImpl.WorkThreadPoolExecutor
-
Nested classes/interfaces inherited from interface org.nuxeo.ecm.core.work.api.WorkManager
WorkManager.Scheduling
-
-
Field Summary
-
Fields inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
active, categoryToQueueId, completionSynchronizer, DEAD_LETTER_QUEUE, DEAD_LETTER_QUEUE_CODEC, DEFAULT_CATEGORY, DEFAULT_QUEUE_ID, executors, GLOBAL_METRIC_PREFIX, IMPL_EP, NAME, QUEUES_EP, queuing, registry, SHUTDOWN_DELAY_MS_KEY, shutdownInProgress, started, THREAD_PREFIX, WORKMANAGER_PROCESSING_DISABLE, WORKMANAGER_PROCESSING_ENABLED
-
Fields inherited from class org.nuxeo.runtime.model.DefaultComponent
lastModified, name
-
-
Constructor Summary
Constructors Constructor Description StreamWorkManager()
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description protected void
activateQueueMetrics(String queueId)
boolean
awaitCompletion(String queueId, long duration, TimeUnit unit)
Waits for completion of work in a given queue.protected boolean
awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit)
boolean
awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit)
Deprecated.since 10.2 because unusedprotected void
deactivateQueueMetrics(String queueId)
Work
find(String s, Work.State state)
Finds a work instance.int
getApplicationStartedOrder()
The component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext)
.protected Codec<Record>
getCodec()
protected String
getCodecName()
protected String
getLogConfig()
Deprecated.protected LogManager
getLogManager()
protected long
getLowWaterMark(String queueId)
WorkQueueMetrics
getMetrics(String queueId)
Gets the metrics for thequeueId
protected WorkQueueMetrics
getMetricsWithNuxeoClassLoader(String queueId)
protected int
getOverProvisioningFactor()
protected int
getPartitions(int maxThreads)
int
getQueueSize(String queueId, Work.State state)
Gets the number of work instances in a given queue in a defined state.protected RecordFilterChain
getRecordFilter()
protected String
getRecordFilterClass()
protected Map<String,String>
getRecordFilterOptions()
protected StreamManager
getStreamManager()
Work.State
getWorkState(String workId)
Gets the state in which a work instance is.void
init()
Starts up thisWorkManager
and attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)
time.protected void
initTopology()
boolean
isProcessingEnabled(String queueId)
Is processing enabled for a given queue id.List<Work>
listWork(String s, Work.State state)
Lists the work instances in a given queue in a defined state.List<String>
listWorkIds(String s, Work.State state)
Lists the work ids in a given queue in a defined state.void
schedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit)
Schedules work for execution at a later time, with a specific scheduling policy.protected boolean
scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling)
Schedule after commit.boolean
shutdown(long timeout, TimeUnit timeUnit)
Shuts down thisWorkManager
and attempts to suspend and save the running and scheduled work instances.boolean
shutdownQueue(String queueId, long timeout, TimeUnit unit)
Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.void
start(ComponentContext context)
Start the component.boolean
supportsProcessingDisabling()
-
Methods inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
awaitCompletion, enableProcessing, enableProcessing, getCategoryQueueId, getExecutor, getTimestampAfter, getWorkQueueDescriptor, getWorkQueueIds, hasWorkInState, index, initDeadLetterQueueStream, isProcessingDisabled, isProcessingEnabled, isQueuingEnabled, isStarted, noScheduledOrRunningWork, registerContribution, remainingMillis, removeExecutor, schedule, schedule, schedule, setName, shutdownExecutors
-
Methods inherited from class org.nuxeo.runtime.model.DefaultComponent
activate, addRuntimeMessage, addRuntimeMessage, deactivate, getAdapter, getDescriptor, getDescriptors, getLastModified, getRegistry, register, registerExtension, setLastModified, setModifiedNow, stop, unregister, unregisterContribution, unregisterExtension
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.runtime.model.Component
applicationStarted
-
-
-
-
Field Detail
-
log
protected static final Log log
-
WORK_LOG_CONFIG_PROP
public static final String WORK_LOG_CONFIG_PROP
- See Also:
- Constant Field Values
-
DEFAULT_WORK_LOG_CONFIG
@Deprecated(since="2021.0") public static final String DEFAULT_WORK_LOG_CONFIG
Deprecated.- See Also:
- Constant Field Values
-
WORK_CODEC_PROP
public static final String WORK_CODEC_PROP
- See Also:
- Constant Field Values
-
DEFAULT_WORK_CODEC
public static final String DEFAULT_WORK_CODEC
- See Also:
- Constant Field Values
-
WORK_OVER_PROVISIONING_PROP
public static final String WORK_OVER_PROVISIONING_PROP
- See Also:
- Constant Field Values
-
DEFAULT_WORK_OVER_PROVISIONING
public static final String DEFAULT_WORK_OVER_PROVISIONING
- See Also:
- Constant Field Values
-
DEFAULT_CONCURRENCY
public static final int DEFAULT_CONCURRENCY
- See Also:
- Constant Field Values
-
lastMetrics
protected WorkQueueMetrics lastMetrics
-
lastMetricTime
protected long lastMetricTime
-
CACHE_LAST_METRIC_DURATION_MS
protected long CACHE_LAST_METRIC_DURATION_MS
-
NAMESPACE_PREFIX
public static final String NAMESPACE_PREFIX
- See Also:
- Constant Field Values
-
STATETTL_KEY
public static final String STATETTL_KEY
- Since:
- 10.2
- See Also:
- Constant Field Values
-
STORESTATE_KEY
public static final String STORESTATE_KEY
- Since:
- 10.2
- See Also:
- Constant Field Values
-
STATETTL_DEFAULT_VALUE
public static final long STATETTL_DEFAULT_VALUE
- Since:
- 10.2
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_CLASS_KEY
public static final String COMPUTATION_FILTER_CLASS_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_STORE_KEY
public static final String COMPUTATION_FILTER_STORE_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_STORE_TTL_KEY
public static final String COMPUTATION_FILTER_STORE_TTL_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_THRESHOLD_SIZE_KEY
public static final String COMPUTATION_FILTER_THRESHOLD_SIZE_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
COMPUTATION_FILTER_PREFIX_KEY
public static final String COMPUTATION_FILTER_PREFIX_KEY
- Since:
- 11.1
- See Also:
- Constant Field Values
-
topology
protected Topology topology
-
topologyDisabled
protected Topology topologyDisabled
-
settings
protected Settings settings
-
streamProcessor
protected StreamProcessor streamProcessor
-
logManager
protected LogManager logManager
-
streamManager
protected StreamManager streamManager
-
storeState
protected boolean storeState
-
stateTTL
protected long stateTTL
-
-
Method Detail
-
getOverProvisioningFactor
protected int getOverProvisioningFactor()
-
getCodecName
protected String getCodecName()
-
schedule
public void schedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit)
Description copied from interface:WorkManager
Schedules work for execution at a later time, with a specific scheduling policy.- Specified by:
schedule
in interfaceWorkManager
- Overrides:
schedule
in classWorkManagerImpl
- Parameters:
work
- the work to executescheduling
- the scheduling policyafterCommit
- iftrue
and the work is scheduled, it will only be run after the current transaction (if any) has committed- See Also:
WorkManager.schedule(Work)
-
getApplicationStartedOrder
public int getApplicationStartedOrder()
Description copied from interface:Component
The component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext)
.Components are notified in increasing order. Order 1000 is the default order for components that don't care. Order 100 is the repository initialization.
- Specified by:
getApplicationStartedOrder
in interfaceComponent
- Overrides:
getApplicationStartedOrder
in classWorkManagerImpl
- Returns:
- the order, 1000 by default
-
start
public void start(ComponentContext context)
Description copied from interface:Component
Start the component. This method is called after all the components were resolved and activated- Specified by:
start
in interfaceComponent
- Overrides:
start
in classWorkManagerImpl
-
getRecordFilter
protected RecordFilterChain getRecordFilter()
-
getRecordFilterClass
protected String getRecordFilterClass()
-
init
public void init()
Description copied from interface:WorkManager
Starts up thisWorkManager
and attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)
time.- Specified by:
init
in interfaceWorkManager
- Overrides:
init
in classWorkManagerImpl
-
getLogManager
protected LogManager getLogManager()
-
getStreamManager
protected StreamManager getStreamManager()
-
getLogConfig
@Deprecated(since="2021.0") protected String getLogConfig()
Deprecated.
-
isProcessingEnabled
public boolean isProcessingEnabled(String queueId)
Description copied from interface:WorkManager
Is processing enabled for a given queue id.- Specified by:
isProcessingEnabled
in interfaceWorkManager
- Overrides:
isProcessingEnabled
in classWorkManagerImpl
-
initTopology
protected void initTopology()
-
getPartitions
protected int getPartitions(int maxThreads)
-
activateQueueMetrics
protected void activateQueueMetrics(String queueId)
-
deactivateQueueMetrics
protected void deactivateQueueMetrics(String queueId)
-
shutdownQueue
public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit)
Description copied from interface:WorkManager
Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdownQueue
in interfaceWorkManager
- Overrides:
shutdownQueue
in classWorkManagerImpl
- Parameters:
queueId
- the queue idtimeout
- the time to waitunit
- the timeout unit- Returns:
true
if shutdown is done,false
if there are still some threads executing after the timeout
-
shutdown
public boolean shutdown(long timeout, TimeUnit timeUnit)
Description copied from interface:WorkManager
Shuts down thisWorkManager
and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdown
in interfaceWorkManager
- Overrides:
shutdown
in classWorkManagerImpl
- Parameters:
timeout
- the time to waittimeUnit
- the timeout unit- Returns:
true
if shutdown is done,false
if there are still some threads executing after the timeout
-
getQueueSize
public int getQueueSize(String queueId, Work.State state)
Description copied from interface:WorkManager
Gets the number of work instances in a given queue in a defined state.- Specified by:
getQueueSize
in interfaceWorkManager
- Overrides:
getQueueSize
in classWorkManagerImpl
- Parameters:
queueId
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed (SCHEDULED
orRUNNING
)- Returns:
- the number of work instances in the given state
-
getMetricsWithNuxeoClassLoader
protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId)
-
getMetrics
public WorkQueueMetrics getMetrics(String queueId)
Description copied from interface:WorkManager
Gets the metrics for thequeueId
- Specified by:
getMetrics
in interfaceWorkManager
- Overrides:
getMetrics
in classWorkManagerImpl
-
awaitCompletion
public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException
Description copied from interface:WorkManager
Waits for completion of work in a given queue.- Specified by:
awaitCompletion
in interfaceWorkManager
- Overrides:
awaitCompletion
in classWorkManagerImpl
- Parameters:
queueId
- the queue idduration
- the time to waitunit
- the timeout unit- Returns:
true
if all work completed in the queue, orfalse
if there is still some non-completed work after the timeout- Throws:
InterruptedException
-
awaitCompletionOnQueue
protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException
- Throws:
InterruptedException
-
awaitCompletionWithWaterMark
@Deprecated public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) throws InterruptedException
Deprecated.since 10.2 because unused- Throws:
InterruptedException
-
getLowWaterMark
protected long getLowWaterMark(String queueId)
-
getWorkState
public Work.State getWorkState(String workId)
Description copied from interface:WorkManager
- Specified by:
getWorkState
in interfaceWorkManager
- Overrides:
getWorkState
in classWorkManagerImpl
- Parameters:
workId
- the id of the work to find- Returns:
- the work state, or
null
if not found
-
find
public Work find(String s, Work.State state)
Description copied from interface:WorkManager
Finds a work instance.- Specified by:
find
in interfaceWorkManager
- Overrides:
find
in classWorkManagerImpl
- Parameters:
s
- the id of the work to findstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the found work instance, or
null
if not found
-
listWork
public List<Work> listWork(String s, Work.State state)
Description copied from interface:WorkManager
Lists the work instances in a given queue in a defined state.- Specified by:
listWork
in interfaceWorkManager
- Overrides:
listWork
in classWorkManagerImpl
- Parameters:
s
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work instances in the given state
-
listWorkIds
public List<String> listWorkIds(String s, Work.State state)
Description copied from interface:WorkManager
Lists the work ids in a given queue in a defined state.- Specified by:
listWorkIds
in interfaceWorkManager
- Overrides:
listWorkIds
in classWorkManagerImpl
- Parameters:
s
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work ids in the given state
-
scheduleAfterCommit
protected boolean scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling)
Description copied from class:WorkManagerImpl
Schedule after commit. Returnsfalse
if impossible (no transaction or transaction manager).- Overrides:
scheduleAfterCommit
in classWorkManagerImpl
-
supportsProcessingDisabling
public boolean supportsProcessingDisabling()
- Specified by:
supportsProcessingDisabling
in interfaceWorkManager
- Overrides:
supportsProcessingDisabling
in classWorkManagerImpl
- Returns:
- true if the implementation supports processing disabling
-
-