Package org.nuxeo.ecm.core.work
Class StreamWorkManager
java.lang.Object
org.nuxeo.runtime.model.DefaultComponent
org.nuxeo.ecm.core.work.WorkManagerImpl
org.nuxeo.ecm.core.work.StreamWorkManager
- All Implemented Interfaces:
WorkManager
,Adaptable
,Component
,Extensible
,TimestampedService
WorkManager impl that appends works into a Log. Works are therefore immutable (no state update) and can not be listed
for performance reason.
- Since:
- 9.3
-
Nested Class Summary
Nested classes/interfaces inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
WorkManagerImpl.WorkCompletionSynchronizer, WorkManagerImpl.WorkThreadPoolExecutor
Nested classes/interfaces inherited from interface org.nuxeo.ecm.core.work.api.WorkManager
WorkManager.Scheduling
-
Field Summary
Modifier and TypeFieldDescriptionprotected long
static final String
static final String
static final String
static final String
static final String
static final int
static final String
static final String
Deprecated.static final String
protected WorkQueueMetrics
protected long
protected LogManager
static final String
protected Settings
protected long
static final long
static final String
protected boolean
static final String
protected StreamManager
protected StreamProcessor
protected Topology
protected Topology
static final String
static final String
static final String
Fields inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
active, categoryToQueueId, completionSynchronizer, DEAD_LETTER_QUEUE, DEAD_LETTER_QUEUE_CODEC, DEFAULT_CATEGORY, DEFAULT_QUEUE_ID, executors, GLOBAL_METRIC_PREFIX, IMPL_EP, NAME, QUEUES_EP, queuing, registry, SHUTDOWN_DELAY_MS_KEY, shutdownInProgress, started, THREAD_PREFIX, WORKMANAGER_PROCESSING_DISABLE, WORKMANAGER_PROCESSING_ENABLED
Fields inherited from class org.nuxeo.runtime.model.DefaultComponent
lastModified, name
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionprotected void
activateQueueMetrics
(String queueId) boolean
awaitCompletion
(String queueId, long duration, TimeUnit unit) Waits for completion of work in a given queue.protected boolean
awaitCompletionOnQueue
(String queueId, long duration, TimeUnit unit) boolean
awaitCompletionWithWaterMark
(String queueId, long duration, TimeUnit unit) Deprecated.since 10.2 because unusedprotected void
deactivateQueueMetrics
(String queueId) find
(String s, Work.State state) Finds a work instance.int
The component notification order forComponent.start(ComponentContext)
.getCodec()
protected String
protected String
Deprecated.protected LogManager
protected long
getLowWaterMark
(String queueId) getMetrics
(String queueId) Gets the metrics for thequeueId
protected WorkQueueMetrics
getMetricsWithNuxeoClassLoader
(String queueId) protected int
protected int
getPartitions
(int maxThreads) protected RecordFilterChain
protected String
protected StreamManager
getWorkState
(String workId) Gets the state in which a work instance is.void
init()
Starts up thisWorkManager
and attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)
time.protected void
boolean
isProcessingEnabled
(String queueId) Is processing enabled for a given queue id.listWork
(String s, Work.State state) Lists the work instances in a given queue in a defined state.listWorkIds
(String s, Work.State state) Lists the work ids in a given queue in a defined state.void
schedule
(Work work, WorkManager.Scheduling scheduling, boolean afterCommit) Schedules work for execution at a later time, with a specific scheduling policy.protected boolean
scheduleAfterCommit
(Work work, WorkManager.Scheduling scheduling) Schedule after commit.boolean
Shuts down thisWorkManager
and attempts to suspend and save the running and scheduled work instances.boolean
shutdownQueue
(String queueId, long timeout, TimeUnit unit) Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.void
start
(ComponentContext context) Start the component.boolean
Methods inherited from class org.nuxeo.ecm.core.work.WorkManagerImpl
awaitCompletion, enableProcessing, enableProcessing, getCategoryQueueId, getExecutor, getTimestampAfter, getWorkQueueDescriptor, getWorkQueueIds, hasWorkInState, index, initDeadLetterQueueStream, isProcessingDisabled, isProcessingEnabled, isQueuingEnabled, isStarted, noScheduledOrRunningWork, registerContribution, remainingMillis, removeExecutor, schedule, schedule, schedule, setName, shutdownExecutors
Methods inherited from class org.nuxeo.runtime.model.DefaultComponent
activate, addRuntimeMessage, addRuntimeMessage, deactivate, getAdapter, getDescriptor, getDescriptors, getLastModified, getRegistry, register, registerExtension, setLastModified, setModifiedNow, stop, unregister, unregisterContribution, unregisterExtension
-
Field Details
-
WORK_LOG_CONFIG_PROP
- See Also:
-
DEFAULT_WORK_LOG_CONFIG
Deprecated.- See Also:
-
WORK_CODEC_PROP
- See Also:
-
DEFAULT_WORK_CODEC
- See Also:
-
WORK_OVER_PROVISIONING_PROP
- See Also:
-
DEFAULT_WORK_OVER_PROVISIONING
- See Also:
-
DEFAULT_CONCURRENCY
public static final int DEFAULT_CONCURRENCY- See Also:
-
lastMetrics
-
lastMetricTime
protected long lastMetricTime -
CACHE_LAST_METRIC_DURATION_MS
protected long CACHE_LAST_METRIC_DURATION_MS -
NAMESPACE_PREFIX
- See Also:
-
STATETTL_KEY
- Since:
- 10.2
- See Also:
-
STORESTATE_KEY
- Since:
- 10.2
- See Also:
-
STATETTL_DEFAULT_VALUE
public static final long STATETTL_DEFAULT_VALUE- Since:
- 10.2
- See Also:
-
COMPUTATION_FILTER_CLASS_KEY
- Since:
- 11.1
- See Also:
-
COMPUTATION_FILTER_STORE_KEY
- Since:
- 11.1
- See Also:
-
COMPUTATION_FILTER_STORE_TTL_KEY
- Since:
- 11.1
- See Also:
-
COMPUTATION_FILTER_THRESHOLD_SIZE_KEY
- Since:
- 11.1
- See Also:
-
COMPUTATION_FILTER_PREFIX_KEY
- Since:
- 11.1
- See Also:
-
topology
-
topologyDisabled
-
settings
-
streamProcessor
-
logManager
-
streamManager
-
storeState
protected boolean storeState -
stateTTL
protected long stateTTL
-
-
Constructor Details
-
StreamWorkManager
public StreamWorkManager()
-
-
Method Details
-
getOverProvisioningFactor
protected int getOverProvisioningFactor() -
getCodecName
-
getCodec
-
schedule
Description copied from interface:WorkManager
Schedules work for execution at a later time, with a specific scheduling policy.- Specified by:
schedule
in interfaceWorkManager
- Overrides:
schedule
in classWorkManagerImpl
- Parameters:
work
- the work to executescheduling
- the scheduling policyafterCommit
- iftrue
and the work is scheduled, it will only be run after the current transaction (if any) has committed- See Also:
-
getApplicationStartedOrder
public int getApplicationStartedOrder()Description copied from interface:Component
The component notification order forComponent.start(ComponentContext)
.Components are notified in increasing order. Order 1000 is the default order for components that don't care. Order 100 is the repository initialization.
- Specified by:
getApplicationStartedOrder
in interfaceComponent
- Overrides:
getApplicationStartedOrder
in classWorkManagerImpl
- Returns:
- the order, 1000 by default
-
start
Description copied from interface:Component
Start the component. This method is called after all the components were resolved and activated- Specified by:
start
in interfaceComponent
- Overrides:
start
in classWorkManagerImpl
-
getRecordFilter
-
getRecordFilterOptions
-
getRecordFilterClass
-
init
public void init()Description copied from interface:WorkManager
Starts up thisWorkManager
and attempts to resume work previously suspended and saved atWorkManager.shutdown(long, java.util.concurrent.TimeUnit)
time.- Specified by:
init
in interfaceWorkManager
- Overrides:
init
in classWorkManagerImpl
-
getLogManager
-
getStreamManager
-
getLogConfig
Deprecated. -
isProcessingEnabled
Description copied from interface:WorkManager
Is processing enabled for a given queue id.- Specified by:
isProcessingEnabled
in interfaceWorkManager
- Overrides:
isProcessingEnabled
in classWorkManagerImpl
-
initTopology
protected void initTopology() -
getPartitions
protected int getPartitions(int maxThreads) -
activateQueueMetrics
-
deactivateQueueMetrics
-
shutdownQueue
Description copied from interface:WorkManager
Shuts down a work queue and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdownQueue
in interfaceWorkManager
- Overrides:
shutdownQueue
in classWorkManagerImpl
- Parameters:
queueId
- the queue idtimeout
- the time to waitunit
- the timeout unit- Returns:
true
if shutdown is done,false
if there are still some threads executing after the timeout
-
shutdown
Description copied from interface:WorkManager
Shuts down thisWorkManager
and attempts to suspend and save the running and scheduled work instances.- Specified by:
shutdown
in interfaceWorkManager
- Overrides:
shutdown
in classWorkManagerImpl
- Parameters:
timeout
- the time to waittimeUnit
- the timeout unit- Returns:
true
if shutdown is done,false
if there are still some threads executing after the timeout
-
getMetricsWithNuxeoClassLoader
-
getMetrics
Description copied from interface:WorkManager
Gets the metrics for thequeueId
- Specified by:
getMetrics
in interfaceWorkManager
- Overrides:
getMetrics
in classWorkManagerImpl
-
awaitCompletion
public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException Description copied from interface:WorkManager
Waits for completion of work in a given queue.- Specified by:
awaitCompletion
in interfaceWorkManager
- Overrides:
awaitCompletion
in classWorkManagerImpl
- Parameters:
queueId
- the queue idduration
- the time to waitunit
- the timeout unit- Returns:
true
if all work completed in the queue, orfalse
if there is still some non-completed work after the timeout- Throws:
InterruptedException
-
awaitCompletionOnQueue
protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException - Throws:
InterruptedException
-
awaitCompletionWithWaterMark
@Deprecated public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) throws InterruptedException Deprecated.since 10.2 because unused- Throws:
InterruptedException
-
getLowWaterMark
-
getWorkState
Description copied from interface:WorkManager
- Specified by:
getWorkState
in interfaceWorkManager
- Overrides:
getWorkState
in classWorkManagerImpl
- Parameters:
workId
- the id of the work to find- Returns:
- the work state, or
null
if not found
-
find
Description copied from interface:WorkManager
Finds a work instance.- Specified by:
find
in interfaceWorkManager
- Overrides:
find
in classWorkManagerImpl
- Parameters:
s
- the id of the work to findstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the found work instance, or
null
if not found
-
listWork
Description copied from interface:WorkManager
Lists the work instances in a given queue in a defined state.- Specified by:
listWork
in interfaceWorkManager
- Overrides:
listWork
in classWorkManagerImpl
- Parameters:
s
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work instances in the given state
-
listWorkIds
Description copied from interface:WorkManager
Lists the work ids in a given queue in a defined state.- Specified by:
listWorkIds
in interfaceWorkManager
- Overrides:
listWorkIds
in classWorkManagerImpl
- Parameters:
s
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work ids in the given state
-
scheduleAfterCommit
Description copied from class:WorkManagerImpl
Schedule after commit. Returnsfalse
if impossible (no transaction or transaction manager).- Overrides:
scheduleAfterCommit
in classWorkManagerImpl
-
supportsProcessingDisabling
public boolean supportsProcessingDisabling()- Specified by:
supportsProcessingDisabling
in interfaceWorkManager
- Overrides:
supportsProcessingDisabling
in classWorkManagerImpl
- Returns:
- true if the implementation supports processing disabling
-