Class RedisWorkQueuing
- java.lang.Object
-
- org.nuxeo.ecm.core.redis.contribs.RedisWorkQueuing
-
- All Implemented Interfaces:
WorkQueuing
public class RedisWorkQueuing extends Object implements WorkQueuing
Implementation of aWorkQueuing
storingWork
instances in Redis.- Since:
- 5.8
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.nuxeo.ecm.core.work.WorkQueuing
WorkQueuing.Listener
-
-
Field Summary
Fields Modifier and Type Field Description protected Map<String,NuxeoBlockingQueue>
allQueued
protected byte[]
cancelledRunningWorkSha
protected byte[]
cancelledScheduledWorkSha
protected byte[]
completedWorkSha
protected byte[]
initWorkQueueSha
protected static byte[]
KEY_CANCELED
protected static String
KEY_CANCELED_PREFIX
protected static byte[]
KEY_COMPLETED
protected static String
KEY_COMPLETED_PREFIX
Per-queue set of counters.protected static String
KEY_COUNT_PREFIX
protected static String
KEY_DATA
Global hash of Work instance id -> serialized Work instance.protected static byte[]
KEY_QUEUE
protected static String
KEY_QUEUE_PREFIX
Per-queue list of scheduled Work instance ids.protected static byte[]
KEY_RUNNING
protected static String
KEY_RUNNING_PREFIX
Per-queue set of running Work instance ids.protected static byte[]
KEY_SCHEDULED
protected static String
KEY_SCHEDULED_PREFIX
Per-queue set of scheduled Work instance ids.protected static String
KEY_STATE
Global hash of Work instance id -> Work state.protected static byte[]
KEY_SUSPENDED
protected static String
KEY_SUSPENDED_PREFIX
Per-queue list of suspended Work instance ids.protected WorkQueuing.Listener
listener
protected byte[]
metricsWorkQueueSha
protected byte[]
popWorkSha
protected String
redisNamespace
protected byte[]
runningWorkSha
protected byte[]
schedulingWorkSha
protected static byte[]
STATE_RUNNING
protected static byte
STATE_RUNNING_B
protected static byte
STATE_RUNNING_C
protected static byte[]
STATE_SCHEDULED
protected static byte
STATE_SCHEDULED_B
protected static byte[]
STATE_UNKNOWN
-
Constructor Summary
Constructors Constructor Description RedisWorkQueuing(WorkQueuing.Listener listener)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected List<byte[]>
args(String workId)
protected List<byte[]>
args(Work work, boolean serialize)
protected byte[]
bytes(String string)
protected byte[]
bytes(Work.State state)
protected byte[]
canceledKey(String queueId)
protected static Number[]
coerceNullToZero(Number[] counters)
protected static Number[]
coerceNullToZero(List<Number> numbers)
protected byte[]
completedKey(String queueId)
long
count(String queueId, Work.State state)
Gets the number of work instances in the given state in a given queue.protected byte[]
countKey(String queueId)
protected byte[]
dataKey()
protected Work
deserializeWork(byte[] workBytes)
Work
find(String workId, Work.State state)
Finds a work instance in the scheduled or running or completed sets.NuxeoBlockingQueue
getQueue(String queueId)
Gets the blocking queue of work used by theThreadPoolExecutor
.protected Set<String>
getQueueIds(String queuePrefix)
Finds which queues have work for a given state prefix.protected Set<String>
getRunningQueueIds()
protected Set<String>
getScheduledQueueIds()
protected Set<String>
getSuspendedQueueIds()
Finds which queues have suspended work.protected Work
getWork(byte[] workIdBytes)
protected Work
getWorkData(byte[] workIdBytes)
protected Work
getWorkFromQueue(String queueId)
Removes first work from work queue.Work.State
getWorkState(String workId)
Gets the state in which a work instance is.protected Work.State
getWorkStateInfo(String workId)
Gets the work state.NuxeoBlockingQueue
init(WorkQueueDescriptor config)
Starts up thisWorkQueuing
and attempts to resume work previously suspended and saved at shutdown time.boolean
isWorkInState(String workId, Work.State state)
Checks if a work instance with the given id is in the given state.protected static String
key(String... names)
protected byte[]
keyBytes(String prefix)
protected byte[]
keyBytes(String prefix, String queueId)
protected List<byte[]>
keys(String queueid)
void
listen(WorkQueuing.Listener listener)
Set the callback for debugging purposeprotected List<String>
listNonCompletedIds(String queueId)
protected List<Work>
listRunning(String queueId)
protected List<String>
listRunningIds(String queueId)
protected List<Work>
listScheduled(String queueId)
protected List<String>
listScheduledIds(String queueId)
List<Work>
listWork(String queueId, Work.State state)
Lists the work instances in a given queue in a defined state.List<String>
listWorkIds(String queueId, Work.State state)
Lists the work ids in a given queue in a defined state.protected List<String>
listWorkIdsList(byte[] queueBytes)
protected List<String>
listWorkIdsSet(byte[] queueBytes)
protected List<Work>
listWorkList(byte[] queueBytes)
protected List<Work>
listWorkSet(byte[] queueBytes)
WorkQueueMetrics
metrics(String queueId)
Returns current metrics of queue identified by thequeueId
protected byte[]
queuedKey(String queueId)
void
removeScheduled(String queueId, String workId)
Finds a scheduled work instance and removes it from the scheduled queue.protected void
removeScheduledWork(String queueId, String workId)
Removes a given work from queue, move the work from scheduled to completed set.protected byte[]
runningKey(String queueId)
protected byte[]
scheduledKey(String queueId)
int
scheduleSuspendedWork(String queueId)
Resumes all suspended work instances by moving them to the scheduled queue.protected byte[]
serializeWork(Work work)
void
setActive(String queueId, boolean value)
Enable/disable thisqueueId
processingprotected byte[]
stateKey()
protected String
string(byte[] bytes)
boolean
supportsProcessingDisabling()
protected byte[]
suspendedKey(String queueId)
int
suspendScheduledWork(String queueId)
Suspends all scheduled work instances by moving them to the suspended queue.void
workCanceled(String queueId, Work work)
Removes a work instance from scheduled set.void
workCompleted(String queueId, Work work)
Moves a work instance from the running set to the completed set.protected byte[]
workId(String id)
protected byte[]
workId(Work work)
void
workReschedule(String queueId, Work work)
Moves back a work instance from running set to the scheduled set.void
workRunning(String queueId, Work work)
Put the work instance into the running set.void
workSchedule(String queueId, Work work)
Submit a work to theThreadPoolExecutor
and put it in the scheduled set.protected void
workSetCancelledScheduled(String queueId, Work work)
Switches a work to state completed, and saves its new state.protected void
workSetCompleted(String queueId, Work work)
Switches a work to state completed, and saves its new state.protected void
workSetReschedule(String queueId, Work work)
Switches a work to state canceled, and saves its new state.protected void
workSetRunning(String queueId, Work work)
Switches a work to state running.void
workSetScheduled(String queueId, Work work)
Persists a work instance and adds it to the scheduled queue.
-
-
-
Field Detail
-
KEY_DATA
protected static final String KEY_DATA
Global hash of Work instance id -> serialized Work instance.- See Also:
- Constant Field Values
-
KEY_STATE
protected static final String KEY_STATE
Global hash of Work instance id -> Work state. The completed state is followed by a completion time in milliseconds.- See Also:
- Constant Field Values
-
KEY_SUSPENDED_PREFIX
protected static final String KEY_SUSPENDED_PREFIX
Per-queue list of suspended Work instance ids.- See Also:
- Constant Field Values
-
KEY_SUSPENDED
protected static final byte[] KEY_SUSPENDED
-
KEY_QUEUE_PREFIX
protected static final String KEY_QUEUE_PREFIX
Per-queue list of scheduled Work instance ids.- See Also:
- Constant Field Values
-
KEY_QUEUE
protected static final byte[] KEY_QUEUE
-
KEY_SCHEDULED_PREFIX
protected static final String KEY_SCHEDULED_PREFIX
Per-queue set of scheduled Work instance ids.- See Also:
- Constant Field Values
-
KEY_SCHEDULED
protected static final byte[] KEY_SCHEDULED
-
KEY_RUNNING_PREFIX
protected static final String KEY_RUNNING_PREFIX
Per-queue set of running Work instance ids.- See Also:
- Constant Field Values
-
KEY_RUNNING
protected static final byte[] KEY_RUNNING
-
KEY_COMPLETED_PREFIX
protected static final String KEY_COMPLETED_PREFIX
Per-queue set of counters.- See Also:
- Constant Field Values
-
KEY_COMPLETED
protected static final byte[] KEY_COMPLETED
-
KEY_CANCELED_PREFIX
protected static final String KEY_CANCELED_PREFIX
- See Also:
- Constant Field Values
-
KEY_CANCELED
protected static final byte[] KEY_CANCELED
-
KEY_COUNT_PREFIX
protected static final String KEY_COUNT_PREFIX
- See Also:
- Constant Field Values
-
STATE_SCHEDULED_B
protected static final byte STATE_SCHEDULED_B
- See Also:
- Constant Field Values
-
STATE_RUNNING_B
protected static final byte STATE_RUNNING_B
- See Also:
- Constant Field Values
-
STATE_RUNNING_C
protected static final byte STATE_RUNNING_C
- See Also:
- Constant Field Values
-
STATE_SCHEDULED
protected static final byte[] STATE_SCHEDULED
-
STATE_RUNNING
protected static final byte[] STATE_RUNNING
-
STATE_UNKNOWN
protected static final byte[] STATE_UNKNOWN
-
listener
protected WorkQueuing.Listener listener
-
allQueued
protected final Map<String,NuxeoBlockingQueue> allQueued
-
redisNamespace
protected String redisNamespace
-
initWorkQueueSha
protected byte[] initWorkQueueSha
-
metricsWorkQueueSha
protected byte[] metricsWorkQueueSha
-
schedulingWorkSha
protected byte[] schedulingWorkSha
-
popWorkSha
protected byte[] popWorkSha
-
runningWorkSha
protected byte[] runningWorkSha
-
cancelledScheduledWorkSha
protected byte[] cancelledScheduledWorkSha
-
completedWorkSha
protected byte[] completedWorkSha
-
cancelledRunningWorkSha
protected byte[] cancelledRunningWorkSha
-
-
Constructor Detail
-
RedisWorkQueuing
public RedisWorkQueuing(WorkQueuing.Listener listener)
-
-
Method Detail
-
init
public NuxeoBlockingQueue init(WorkQueueDescriptor config)
Description copied from interface:WorkQueuing
Starts up thisWorkQueuing
and attempts to resume work previously suspended and saved at shutdown time.- Specified by:
init
in interfaceWorkQueuing
-
getQueue
public NuxeoBlockingQueue getQueue(String queueId)
Description copied from interface:WorkQueuing
Gets the blocking queue of work used by theThreadPoolExecutor
.- Specified by:
getQueue
in interfaceWorkQueuing
-
workSchedule
public void workSchedule(String queueId, Work work)
Description copied from interface:WorkQueuing
Submit a work to theThreadPoolExecutor
and put it in the scheduled set.- Specified by:
workSchedule
in interfaceWorkQueuing
- Parameters:
queueId
- the queue idwork
- the work instance
-
workRunning
public void workRunning(String queueId, Work work)
Description copied from interface:WorkQueuing
Put the work instance into the running set.- Specified by:
workRunning
in interfaceWorkQueuing
- Parameters:
queueId
- the queue idwork
- the work instance
-
workCanceled
public void workCanceled(String queueId, Work work)
Description copied from interface:WorkQueuing
Removes a work instance from scheduled set.- Specified by:
workCanceled
in interfaceWorkQueuing
-
workCompleted
public void workCompleted(String queueId, Work work)
Description copied from interface:WorkQueuing
Moves a work instance from the running set to the completed set.- Specified by:
workCompleted
in interfaceWorkQueuing
- Parameters:
queueId
- the queue idwork
- the work instance
-
workReschedule
public void workReschedule(String queueId, Work work)
Description copied from interface:WorkQueuing
Moves back a work instance from running set to the scheduled set.- Specified by:
workReschedule
in interfaceWorkQueuing
-
listWork
public List<Work> listWork(String queueId, Work.State state)
Description copied from interface:WorkQueuing
Lists the work instances in a given queue in a defined state.Note that an instance requested as RUNNING could be found SUSPENDING or SUSPENDED, and an instance requested as COMPLETED could be found FAILED.
- Specified by:
listWork
in interfaceWorkQueuing
- Parameters:
queueId
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work instances in the given state
-
listWorkIds
public List<String> listWorkIds(String queueId, Work.State state)
Description copied from interface:WorkQueuing
Lists the work ids in a given queue in a defined state.- Specified by:
listWorkIds
in interfaceWorkQueuing
- Parameters:
queueId
- the queue idstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
- the list of work ids in the given state
-
count
public long count(String queueId, Work.State state)
Description copied from interface:WorkQueuing
Gets the number of work instances in the given state in a given queue.- Specified by:
count
in interfaceWorkQueuing
- Parameters:
queueId
- the queue idstate
- the state,SCHEDULED
orRUNNING
- Returns:
- the number of scheduled work instances in the queue
-
find
public Work find(String workId, Work.State state)
Description copied from interface:WorkQueuing
Finds a work instance in the scheduled or running or completed sets.- Specified by:
find
in interfaceWorkQueuing
- Parameters:
workId
- the id of the work to findstate
- the state defining the state to look into,SCHEDULED
,RUNNING
, ornull
for SCHEDULED or RUNNING- Returns:
- the found work instance, or
null
if not found
-
isWorkInState
public boolean isWorkInState(String workId, Work.State state)
Description copied from interface:WorkQueuing
Checks if a work instance with the given id is in the given state.- Specified by:
isWorkInState
in interfaceWorkQueuing
- Parameters:
workId
- the work idstate
- the state,SCHEDULED
,RUNNING
, ornull
for non-completed- Returns:
true
if a work instance with the given id is in the given state
-
removeScheduled
public void removeScheduled(String queueId, String workId)
Description copied from interface:WorkQueuing
Finds a scheduled work instance and removes it from the scheduled queue.- Specified by:
removeScheduled
in interfaceWorkQueuing
- Parameters:
queueId
- the queue idworkId
- the id of the work to find
-
getWorkState
public Work.State getWorkState(String workId)
Description copied from interface:WorkQueuing
Gets the state in which a work instance is.This can be
Work.State.SCHEDULED
orWork.State.RUNNING
.- Specified by:
getWorkState
in interfaceWorkQueuing
- Parameters:
workId
- the id of the work to find- Returns:
- the work state, or
null
if not found
-
setActive
public void setActive(String queueId, boolean value)
Description copied from interface:WorkQueuing
Enable/disable thisqueueId
processing- Specified by:
setActive
in interfaceWorkQueuing
-
string
protected String string(byte[] bytes)
-
bytes
protected byte[] bytes(String string)
-
bytes
protected byte[] bytes(Work.State state)
-
keyBytes
protected byte[] keyBytes(String prefix)
-
workId
protected byte[] workId(Work work)
-
workId
protected byte[] workId(String id)
-
suspendedKey
protected byte[] suspendedKey(String queueId)
-
queuedKey
protected byte[] queuedKey(String queueId)
-
countKey
protected byte[] countKey(String queueId)
-
runningKey
protected byte[] runningKey(String queueId)
-
scheduledKey
protected byte[] scheduledKey(String queueId)
-
completedKey
protected byte[] completedKey(String queueId)
-
canceledKey
protected byte[] canceledKey(String queueId)
-
stateKey
protected byte[] stateKey()
-
dataKey
protected byte[] dataKey()
-
serializeWork
protected byte[] serializeWork(Work work) throws IOException
- Throws:
IOException
-
deserializeWork
protected Work deserializeWork(byte[] workBytes)
-
getSuspendedQueueIds
protected Set<String> getSuspendedQueueIds() throws IOException
Finds which queues have suspended work.- Returns:
- a set of queue ids
- Throws:
IOException
- Since:
- 5.8
-
getQueueIds
protected Set<String> getQueueIds(String queuePrefix)
Finds which queues have work for a given state prefix.- Returns:
- a set of queue ids
- Since:
- 5.8
-
scheduleSuspendedWork
public int scheduleSuspendedWork(String queueId) throws IOException
Resumes all suspended work instances by moving them to the scheduled queue.- Parameters:
queueId
- the queue id- Returns:
- the number of work instances scheduled
- Throws:
IOException
- Since:
- 5.8
-
suspendScheduledWork
public int suspendScheduledWork(String queueId) throws IOException
Suspends all scheduled work instances by moving them to the suspended queue.- Parameters:
queueId
- the queue id- Returns:
- the number of work instances suspended
- Throws:
IOException
- Since:
- 5.8
-
metrics
public WorkQueueMetrics metrics(String queueId)
Description copied from interface:WorkQueuing
Returns current metrics of queue identified by thequeueId
- Specified by:
metrics
in interfaceWorkQueuing
-
workSetScheduled
public void workSetScheduled(String queueId, Work work) throws IOException
Persists a work instance and adds it to the scheduled queue.- Parameters:
queueId
- the queue idwork
- the work instance- Throws:
IOException
-
workSetCancelledScheduled
protected void workSetCancelledScheduled(String queueId, Work work) throws IOException
Switches a work to state completed, and saves its new state.- Throws:
IOException
-
workSetRunning
protected void workSetRunning(String queueId, Work work) throws IOException
Switches a work to state running.- Parameters:
queueId
- the queue idwork
- the work- Throws:
IOException
-
workSetCompleted
protected void workSetCompleted(String queueId, Work work) throws IOException
Switches a work to state completed, and saves its new state.- Throws:
IOException
-
workSetReschedule
protected void workSetReschedule(String queueId, Work work) throws IOException
Switches a work to state canceled, and saves its new state.- Throws:
IOException
-
args
protected List<byte[]> args(String workId) throws IOException
- Throws:
IOException
-
args
protected List<byte[]> args(Work work, boolean serialize) throws IOException
- Throws:
IOException
-
getWorkStateInfo
protected Work.State getWorkStateInfo(String workId)
Gets the work state.- Parameters:
workId
- the work id- Returns:
- the state, or
null
if not found
-
listWorkIdsList
protected List<String> listWorkIdsList(byte[] queueBytes) throws IOException
- Throws:
IOException
-
listWorkIdsSet
protected List<String> listWorkIdsSet(byte[] queueBytes) throws IOException
- Throws:
IOException
-
listWorkList
protected List<Work> listWorkList(byte[] queueBytes) throws IOException
- Throws:
IOException
-
listWorkSet
protected List<Work> listWorkSet(byte[] queueBytes) throws IOException
- Throws:
IOException
-
getWork
protected Work getWork(byte[] workIdBytes)
-
getWorkData
protected Work getWorkData(byte[] workIdBytes) throws IOException
- Throws:
IOException
-
getWorkFromQueue
protected Work getWorkFromQueue(String queueId) throws IOException
Removes first work from work queue.- Parameters:
queueId
- the queue id- Returns:
- the work, or
null
if the scheduled queue is empty - Throws:
IOException
-
removeScheduledWork
protected void removeScheduledWork(String queueId, String workId) throws IOException
Removes a given work from queue, move the work from scheduled to completed set.- Throws:
IOException
-
listen
public void listen(WorkQueuing.Listener listener)
Description copied from interface:WorkQueuing
Set the callback for debugging purpose- Specified by:
listen
in interfaceWorkQueuing
-
supportsProcessingDisabling
public boolean supportsProcessingDisabling()
- Specified by:
supportsProcessingDisabling
in interfaceWorkQueuing
- Returns:
- true if the implementation supports processing disabling
-
-