Class RedisWorkQueuing
- java.lang.Object
-
- org.nuxeo.ecm.core.redis.contribs.RedisWorkQueuing
-
- All Implemented Interfaces:
WorkQueuing
public class RedisWorkQueuing extends Object implements WorkQueuing
Implementation of aWorkQueuingstoringWorkinstances in Redis.- Since:
- 5.8
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.nuxeo.ecm.core.work.WorkQueuing
WorkQueuing.Listener
-
-
Field Summary
Fields Modifier and Type Field Description protected Map<String,NuxeoBlockingQueue>allQueuedprotected byte[]cancelledRunningWorkShaprotected byte[]cancelledScheduledWorkShaprotected byte[]completedWorkShaprotected byte[]initWorkQueueShaprotected static byte[]KEY_CANCELEDprotected static StringKEY_CANCELED_PREFIXprotected static byte[]KEY_COMPLETEDprotected static StringKEY_COMPLETED_PREFIXPer-queue set of counters.protected static StringKEY_COUNT_PREFIXprotected static StringKEY_DATAGlobal hash of Work instance id -> serialized Work instance.protected static byte[]KEY_QUEUEprotected static StringKEY_QUEUE_PREFIXPer-queue list of scheduled Work instance ids.protected static byte[]KEY_RUNNINGprotected static StringKEY_RUNNING_PREFIXPer-queue set of running Work instance ids.protected static byte[]KEY_SCHEDULEDprotected static StringKEY_SCHEDULED_PREFIXPer-queue set of scheduled Work instance ids.protected static StringKEY_STATEGlobal hash of Work instance id -> Work state.protected static byte[]KEY_SUSPENDEDprotected static StringKEY_SUSPENDED_PREFIXPer-queue list of suspended Work instance ids.protected WorkQueuing.Listenerlistenerprotected byte[]metricsWorkQueueShaprotected byte[]popWorkShaprotected StringredisNamespaceprotected byte[]runningWorkShaprotected byte[]schedulingWorkShaprotected static byte[]STATE_RUNNINGprotected static byteSTATE_RUNNING_Bprotected static byteSTATE_RUNNING_Cprotected static byte[]STATE_SCHEDULEDprotected static byteSTATE_SCHEDULED_Bprotected static byte[]STATE_UNKNOWN
-
Constructor Summary
Constructors Constructor Description RedisWorkQueuing(WorkQueuing.Listener listener)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected List<byte[]>args(String workId)protected List<byte[]>args(Work work, boolean serialize)protected byte[]bytes(String string)protected byte[]bytes(Work.State state)protected byte[]canceledKey(String queueId)protected static Number[]coerceNullToZero(Number[] counters)protected static Number[]coerceNullToZero(List<Number> numbers)protected byte[]completedKey(String queueId)longcount(String queueId, Work.State state)Gets the number of work instances in the given state in a given queue.protected byte[]countKey(String queueId)protected byte[]dataKey()protected WorkdeserializeWork(byte[] workBytes)Workfind(String workId, Work.State state)Finds a work instance in the scheduled or running or completed sets.NuxeoBlockingQueuegetQueue(String queueId)Gets the blocking queue of work used by theThreadPoolExecutor.protected Set<String>getQueueIds(String queuePrefix)Finds which queues have work for a given state prefix.protected Set<String>getRunningQueueIds()protected Set<String>getScheduledQueueIds()protected Set<String>getSuspendedQueueIds()Finds which queues have suspended work.protected WorkgetWork(byte[] workIdBytes)protected WorkgetWorkData(byte[] workIdBytes)protected WorkgetWorkFromQueue(String queueId)Removes first work from work queue.Work.StategetWorkState(String workId)Gets the state in which a work instance is.protected Work.StategetWorkStateInfo(String workId)Gets the work state.NuxeoBlockingQueueinit(WorkQueueDescriptor config)Starts up thisWorkQueuingand attempts to resume work previously suspended and saved at shutdown time.booleanisWorkInState(String workId, Work.State state)Checks if a work instance with the given id is in the given state.protected static Stringkey(String... names)protected byte[]keyBytes(String prefix)protected byte[]keyBytes(String prefix, String queueId)protected List<byte[]>keys(String queueid)voidlisten(WorkQueuing.Listener listener)Set the callback for debugging purposeprotected List<String>listNonCompletedIds(String queueId)protected List<Work>listRunning(String queueId)protected List<String>listRunningIds(String queueId)protected List<Work>listScheduled(String queueId)protected List<String>listScheduledIds(String queueId)List<Work>listWork(String queueId, Work.State state)Lists the work instances in a given queue in a defined state.List<String>listWorkIds(String queueId, Work.State state)Lists the work ids in a given queue in a defined state.protected List<String>listWorkIdsList(byte[] queueBytes)protected List<String>listWorkIdsSet(byte[] queueBytes)protected List<Work>listWorkList(byte[] queueBytes)protected List<Work>listWorkSet(byte[] queueBytes)WorkQueueMetricsmetrics(String queueId)Returns current metrics of queue identified by thequeueIdprotected byte[]queuedKey(String queueId)voidremoveScheduled(String queueId, String workId)Finds a scheduled work instance and removes it from the scheduled queue.protected voidremoveScheduledWork(String queueId, String workId)Removes a given work from queue, move the work from scheduled to completed set.protected byte[]runningKey(String queueId)protected byte[]scheduledKey(String queueId)intscheduleSuspendedWork(String queueId)Resumes all suspended work instances by moving them to the scheduled queue.protected byte[]serializeWork(Work work)voidsetActive(String queueId, boolean value)Enable/disable thisqueueIdprocessingprotected byte[]stateKey()protected Stringstring(byte[] bytes)booleansupportsProcessingDisabling()protected byte[]suspendedKey(String queueId)intsuspendScheduledWork(String queueId)Suspends all scheduled work instances by moving them to the suspended queue.voidworkCanceled(String queueId, Work work)Removes a work instance from scheduled set.voidworkCompleted(String queueId, Work work)Moves a work instance from the running set to the completed set.protected byte[]workId(String id)protected byte[]workId(Work work)voidworkReschedule(String queueId, Work work)Moves back a work instance from running set to the scheduled set.voidworkRunning(String queueId, Work work)Put the work instance into the running set.voidworkSchedule(String queueId, Work work)Submit a work to theThreadPoolExecutorand put it in the scheduled set.protected voidworkSetCancelledScheduled(String queueId, Work work)Switches a work to state completed, and saves its new state.protected voidworkSetCompleted(String queueId, Work work)Switches a work to state completed, and saves its new state.protected voidworkSetReschedule(String queueId, Work work)Switches a work to state canceled, and saves its new state.protected voidworkSetRunning(String queueId, Work work)Switches a work to state running.voidworkSetScheduled(String queueId, Work work)Persists a work instance and adds it to the scheduled queue.
-
-
-
Field Detail
-
KEY_DATA
protected static final String KEY_DATA
Global hash of Work instance id -> serialized Work instance.- See Also:
- Constant Field Values
-
KEY_STATE
protected static final String KEY_STATE
Global hash of Work instance id -> Work state. The completed state is followed by a completion time in milliseconds.- See Also:
- Constant Field Values
-
KEY_SUSPENDED_PREFIX
protected static final String KEY_SUSPENDED_PREFIX
Per-queue list of suspended Work instance ids.- See Also:
- Constant Field Values
-
KEY_SUSPENDED
protected static final byte[] KEY_SUSPENDED
-
KEY_QUEUE_PREFIX
protected static final String KEY_QUEUE_PREFIX
Per-queue list of scheduled Work instance ids.- See Also:
- Constant Field Values
-
KEY_QUEUE
protected static final byte[] KEY_QUEUE
-
KEY_SCHEDULED_PREFIX
protected static final String KEY_SCHEDULED_PREFIX
Per-queue set of scheduled Work instance ids.- See Also:
- Constant Field Values
-
KEY_SCHEDULED
protected static final byte[] KEY_SCHEDULED
-
KEY_RUNNING_PREFIX
protected static final String KEY_RUNNING_PREFIX
Per-queue set of running Work instance ids.- See Also:
- Constant Field Values
-
KEY_RUNNING
protected static final byte[] KEY_RUNNING
-
KEY_COMPLETED_PREFIX
protected static final String KEY_COMPLETED_PREFIX
Per-queue set of counters.- See Also:
- Constant Field Values
-
KEY_COMPLETED
protected static final byte[] KEY_COMPLETED
-
KEY_CANCELED_PREFIX
protected static final String KEY_CANCELED_PREFIX
- See Also:
- Constant Field Values
-
KEY_CANCELED
protected static final byte[] KEY_CANCELED
-
KEY_COUNT_PREFIX
protected static final String KEY_COUNT_PREFIX
- See Also:
- Constant Field Values
-
STATE_SCHEDULED_B
protected static final byte STATE_SCHEDULED_B
- See Also:
- Constant Field Values
-
STATE_RUNNING_B
protected static final byte STATE_RUNNING_B
- See Also:
- Constant Field Values
-
STATE_RUNNING_C
protected static final byte STATE_RUNNING_C
- See Also:
- Constant Field Values
-
STATE_SCHEDULED
protected static final byte[] STATE_SCHEDULED
-
STATE_RUNNING
protected static final byte[] STATE_RUNNING
-
STATE_UNKNOWN
protected static final byte[] STATE_UNKNOWN
-
listener
protected WorkQueuing.Listener listener
-
allQueued
protected final Map<String,NuxeoBlockingQueue> allQueued
-
redisNamespace
protected String redisNamespace
-
initWorkQueueSha
protected byte[] initWorkQueueSha
-
metricsWorkQueueSha
protected byte[] metricsWorkQueueSha
-
schedulingWorkSha
protected byte[] schedulingWorkSha
-
popWorkSha
protected byte[] popWorkSha
-
runningWorkSha
protected byte[] runningWorkSha
-
cancelledScheduledWorkSha
protected byte[] cancelledScheduledWorkSha
-
completedWorkSha
protected byte[] completedWorkSha
-
cancelledRunningWorkSha
protected byte[] cancelledRunningWorkSha
-
-
Constructor Detail
-
RedisWorkQueuing
public RedisWorkQueuing(WorkQueuing.Listener listener)
-
-
Method Detail
-
init
public NuxeoBlockingQueue init(WorkQueueDescriptor config)
Description copied from interface:WorkQueuingStarts up thisWorkQueuingand attempts to resume work previously suspended and saved at shutdown time.- Specified by:
initin interfaceWorkQueuing
-
getQueue
public NuxeoBlockingQueue getQueue(String queueId)
Description copied from interface:WorkQueuingGets the blocking queue of work used by theThreadPoolExecutor.- Specified by:
getQueuein interfaceWorkQueuing
-
workSchedule
public void workSchedule(String queueId, Work work)
Description copied from interface:WorkQueuingSubmit a work to theThreadPoolExecutorand put it in the scheduled set.- Specified by:
workSchedulein interfaceWorkQueuing- Parameters:
queueId- the queue idwork- the work instance
-
workRunning
public void workRunning(String queueId, Work work)
Description copied from interface:WorkQueuingPut the work instance into the running set.- Specified by:
workRunningin interfaceWorkQueuing- Parameters:
queueId- the queue idwork- the work instance
-
workCanceled
public void workCanceled(String queueId, Work work)
Description copied from interface:WorkQueuingRemoves a work instance from scheduled set.- Specified by:
workCanceledin interfaceWorkQueuing
-
workCompleted
public void workCompleted(String queueId, Work work)
Description copied from interface:WorkQueuingMoves a work instance from the running set to the completed set.- Specified by:
workCompletedin interfaceWorkQueuing- Parameters:
queueId- the queue idwork- the work instance
-
workReschedule
public void workReschedule(String queueId, Work work)
Description copied from interface:WorkQueuingMoves back a work instance from running set to the scheduled set.- Specified by:
workReschedulein interfaceWorkQueuing
-
listWork
public List<Work> listWork(String queueId, Work.State state)
Description copied from interface:WorkQueuingLists the work instances in a given queue in a defined state.Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as COMPLETED could be found FAILED.
- Specified by:
listWorkin interfaceWorkQueuing- Parameters:
queueId- the queue idstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the list of work instances in the given state
-
listWorkIds
public List<String> listWorkIds(String queueId, Work.State state)
Description copied from interface:WorkQueuingLists the work ids in a given queue in a defined state.- Specified by:
listWorkIdsin interfaceWorkQueuing- Parameters:
queueId- the queue idstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
- the list of work ids in the given state
-
count
public long count(String queueId, Work.State state)
Description copied from interface:WorkQueuingGets the number of work instances in the given state in a given queue.- Specified by:
countin interfaceWorkQueuing- Parameters:
queueId- the queue idstate- the state,SCHEDULEDorRUNNING- Returns:
- the number of scheduled work instances in the queue
-
find
public Work find(String workId, Work.State state)
Description copied from interface:WorkQueuingFinds a work instance in the scheduled or running or completed sets.- Specified by:
findin interfaceWorkQueuing- Parameters:
workId- the id of the work to findstate- the state defining the state to look into,SCHEDULED,RUNNING, ornullfor SCHEDULED or RUNNING- Returns:
- the found work instance, or
nullif not found
-
isWorkInState
public boolean isWorkInState(String workId, Work.State state)
Description copied from interface:WorkQueuingChecks if a work instance with the given id is in the given state.- Specified by:
isWorkInStatein interfaceWorkQueuing- Parameters:
workId- the work idstate- the state,SCHEDULED,RUNNING, ornullfor non-completed- Returns:
trueif a work instance with the given id is in the given state
-
removeScheduled
public void removeScheduled(String queueId, String workId)
Description copied from interface:WorkQueuingFinds a scheduled work instance and removes it from the scheduled queue.- Specified by:
removeScheduledin interfaceWorkQueuing- Parameters:
queueId- the queue idworkId- the id of the work to find
-
getWorkState
public Work.State getWorkState(String workId)
Description copied from interface:WorkQueuingGets the state in which a work instance is.This can be
Work.State.SCHEDULEDorWork.State.RUNNING.- Specified by:
getWorkStatein interfaceWorkQueuing- Parameters:
workId- the id of the work to find- Returns:
- the work state, or
nullif not found
-
setActive
public void setActive(String queueId, boolean value)
Description copied from interface:WorkQueuingEnable/disable thisqueueIdprocessing- Specified by:
setActivein interfaceWorkQueuing
-
string
protected String string(byte[] bytes)
-
bytes
protected byte[] bytes(String string)
-
bytes
protected byte[] bytes(Work.State state)
-
keyBytes
protected byte[] keyBytes(String prefix)
-
workId
protected byte[] workId(Work work)
-
workId
protected byte[] workId(String id)
-
suspendedKey
protected byte[] suspendedKey(String queueId)
-
queuedKey
protected byte[] queuedKey(String queueId)
-
countKey
protected byte[] countKey(String queueId)
-
runningKey
protected byte[] runningKey(String queueId)
-
scheduledKey
protected byte[] scheduledKey(String queueId)
-
completedKey
protected byte[] completedKey(String queueId)
-
canceledKey
protected byte[] canceledKey(String queueId)
-
stateKey
protected byte[] stateKey()
-
dataKey
protected byte[] dataKey()
-
serializeWork
protected byte[] serializeWork(Work work) throws IOException
- Throws:
IOException
-
deserializeWork
protected Work deserializeWork(byte[] workBytes)
-
getSuspendedQueueIds
protected Set<String> getSuspendedQueueIds() throws IOException
Finds which queues have suspended work.- Returns:
- a set of queue ids
- Throws:
IOException- Since:
- 5.8
-
getQueueIds
protected Set<String> getQueueIds(String queuePrefix)
Finds which queues have work for a given state prefix.- Returns:
- a set of queue ids
- Since:
- 5.8
-
scheduleSuspendedWork
public int scheduleSuspendedWork(String queueId) throws IOException
Resumes all suspended work instances by moving them to the scheduled queue.- Parameters:
queueId- the queue id- Returns:
- the number of work instances scheduled
- Throws:
IOException- Since:
- 5.8
-
suspendScheduledWork
public int suspendScheduledWork(String queueId) throws IOException
Suspends all scheduled work instances by moving them to the suspended queue.- Parameters:
queueId- the queue id- Returns:
- the number of work instances suspended
- Throws:
IOException- Since:
- 5.8
-
metrics
public WorkQueueMetrics metrics(String queueId)
Description copied from interface:WorkQueuingReturns current metrics of queue identified by thequeueId- Specified by:
metricsin interfaceWorkQueuing
-
workSetScheduled
public void workSetScheduled(String queueId, Work work) throws IOException
Persists a work instance and adds it to the scheduled queue.- Parameters:
queueId- the queue idwork- the work instance- Throws:
IOException
-
workSetCancelledScheduled
protected void workSetCancelledScheduled(String queueId, Work work) throws IOException
Switches a work to state completed, and saves its new state.- Throws:
IOException
-
workSetRunning
protected void workSetRunning(String queueId, Work work) throws IOException
Switches a work to state running.- Parameters:
queueId- the queue idwork- the work- Throws:
IOException
-
workSetCompleted
protected void workSetCompleted(String queueId, Work work) throws IOException
Switches a work to state completed, and saves its new state.- Throws:
IOException
-
workSetReschedule
protected void workSetReschedule(String queueId, Work work) throws IOException
Switches a work to state canceled, and saves its new state.- Throws:
IOException
-
args
protected List<byte[]> args(String workId) throws IOException
- Throws:
IOException
-
args
protected List<byte[]> args(Work work, boolean serialize) throws IOException
- Throws:
IOException
-
getWorkStateInfo
protected Work.State getWorkStateInfo(String workId)
Gets the work state.- Parameters:
workId- the work id- Returns:
- the state, or
nullif not found
-
listWorkIdsList
protected List<String> listWorkIdsList(byte[] queueBytes) throws IOException
- Throws:
IOException
-
listWorkIdsSet
protected List<String> listWorkIdsSet(byte[] queueBytes) throws IOException
- Throws:
IOException
-
listWorkList
protected List<Work> listWorkList(byte[] queueBytes) throws IOException
- Throws:
IOException
-
listWorkSet
protected List<Work> listWorkSet(byte[] queueBytes) throws IOException
- Throws:
IOException
-
getWork
protected Work getWork(byte[] workIdBytes)
-
getWorkData
protected Work getWorkData(byte[] workIdBytes) throws IOException
- Throws:
IOException
-
getWorkFromQueue
protected Work getWorkFromQueue(String queueId) throws IOException
Removes first work from work queue.- Parameters:
queueId- the queue id- Returns:
- the work, or
nullif the scheduled queue is empty - Throws:
IOException
-
removeScheduledWork
protected void removeScheduledWork(String queueId, String workId) throws IOException
Removes a given work from queue, move the work from scheduled to completed set.- Throws:
IOException
-
listen
public void listen(WorkQueuing.Listener listener)
Description copied from interface:WorkQueuingSet the callback for debugging purpose- Specified by:
listenin interfaceWorkQueuing
-
supportsProcessingDisabling
public boolean supportsProcessingDisabling()
- Specified by:
supportsProcessingDisablingin interfaceWorkQueuing- Returns:
- true if the implementation supports processing disabling
-
-