Class ConsumerRunner<M extends Message>
- java.lang.Object
-
- org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner<M>
-
- All Implemented Interfaces:
Callable<ConsumerStatus>
,RebalanceListener
public class ConsumerRunner<M extends Message> extends Object implements Callable<ConsumerStatus>, RebalanceListener
Read messages from a tailer and drive a consumer according to its policy.- Since:
- 9.1
-
-
Field Summary
Fields Modifier and Type Field Description protected long
acceptCounter
protected boolean
alreadySalted
protected long
batchCommitCounter
protected long
batchFailureCounter
protected long
committedCounter
protected Consumer<M>
consumer
protected String
consumerId
protected BatchPolicy
currentBatchPolicy
protected ConsumerFactory<M>
factory
protected io.dropwizard.metrics5.Timer
globalAcceptTimer
protected io.dropwizard.metrics5.Timer
globalBatchCommitTimer
protected io.dropwizard.metrics5.Counter
globalBatchFailureCounter
protected io.dropwizard.metrics5.Counter
globalCommittedCounter
protected io.dropwizard.metrics5.Counter
globalConsumersCounter
static String
NUXEO_METRICS_REGISTRY_NAME
protected ConsumerPolicy
policy
protected io.dropwizard.metrics5.MetricRegistry
registry
protected LogTailer<M>
tailer
protected String
threadName
protected boolean
usingSubscribe
-
Constructor Summary
Constructors Constructor Description ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, List<LogPartition> defaultAssignments)
Deprecated.since 11.1, due to serialization issue with java 11, useConsumerRunner(ConsumerFactory, ConsumerPolicy, LogManager, Codec, List)
which allows to give acodec
totailer
.ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, Codec<M> codec, List<LogPartition> defaultAssignments)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected BatchState
acceptBatch()
protected void
addSalt()
protected void
beginBatch()
ConsumerStatus
call()
protected void
commitBatch(BatchState state)
protected void
consumerLoop()
protected LogTailer<M>
createTailer(LogManager manager, Codec<M> codec, List<LogPartition> defaultAssignments)
void
onPartitionsAssigned(Collection<LogPartition> partitions)
void
onPartitionsLost(Collection<LogPartition> partitions)
void
onPartitionsRevoked(Collection<LogPartition> partitions)
protected boolean
processBatch()
protected boolean
processBatchWithRetry(net.jodah.failsafe.Execution execution)
protected void
restoreBatchPolicy()
protected void
rollbackBatch(Exception e)
protected void
setBatchRetryPolicy()
protected void
setMetrics()
protected void
setTailerPosition(LogManager manager)
protected void
setThreadName(String message)
-
-
-
Field Detail
-
NUXEO_METRICS_REGISTRY_NAME
public static final String NUXEO_METRICS_REGISTRY_NAME
- See Also:
- Constant Field Values
-
factory
protected final ConsumerFactory<M extends Message> factory
-
policy
protected final ConsumerPolicy policy
-
consumerId
protected String consumerId
-
currentBatchPolicy
protected BatchPolicy currentBatchPolicy
-
threadName
protected String threadName
-
registry
protected final io.dropwizard.metrics5.MetricRegistry registry
-
acceptCounter
protected long acceptCounter
-
committedCounter
protected long committedCounter
-
batchCommitCounter
protected long batchCommitCounter
-
batchFailureCounter
protected long batchFailureCounter
-
alreadySalted
protected boolean alreadySalted
-
globalAcceptTimer
protected io.dropwizard.metrics5.Timer globalAcceptTimer
-
globalCommittedCounter
protected io.dropwizard.metrics5.Counter globalCommittedCounter
-
globalBatchCommitTimer
protected io.dropwizard.metrics5.Timer globalBatchCommitTimer
-
globalBatchFailureCounter
protected io.dropwizard.metrics5.Counter globalBatchFailureCounter
-
globalConsumersCounter
protected io.dropwizard.metrics5.Counter globalConsumersCounter
-
usingSubscribe
protected boolean usingSubscribe
-
-
Constructor Detail
-
ConsumerRunner
@Deprecated public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, List<LogPartition> defaultAssignments)
Deprecated.since 11.1, due to serialization issue with java 11, useConsumerRunner(ConsumerFactory, ConsumerPolicy, LogManager, Codec, List)
which allows to give acodec
totailer
.
-
ConsumerRunner
public ConsumerRunner(ConsumerFactory<M> factory, ConsumerPolicy policy, LogManager manager, Codec<M> codec, List<LogPartition> defaultAssignments)
-
-
Method Detail
-
createTailer
protected LogTailer<M> createTailer(LogManager manager, Codec<M> codec, List<LogPartition> defaultAssignments)
-
call
public ConsumerStatus call() throws Exception
-
setMetrics
protected void setMetrics()
-
addSalt
protected void addSalt() throws InterruptedException
- Throws:
InterruptedException
-
setTailerPosition
protected void setTailerPosition(LogManager manager)
-
consumerLoop
protected void consumerLoop() throws InterruptedException
- Throws:
InterruptedException
-
processBatchWithRetry
protected boolean processBatchWithRetry(net.jodah.failsafe.Execution execution) throws InterruptedException
- Throws:
InterruptedException
-
setBatchRetryPolicy
protected void setBatchRetryPolicy()
-
restoreBatchPolicy
protected void restoreBatchPolicy()
-
processBatch
protected boolean processBatch() throws InterruptedException
- Throws:
InterruptedException
-
beginBatch
protected void beginBatch()
-
commitBatch
protected void commitBatch(BatchState state)
-
rollbackBatch
protected void rollbackBatch(Exception e)
-
acceptBatch
protected BatchState acceptBatch() throws InterruptedException
- Throws:
InterruptedException
-
setThreadName
protected void setThreadName(String message)
-
onPartitionsRevoked
public void onPartitionsRevoked(Collection<LogPartition> partitions)
- Specified by:
onPartitionsRevoked
in interfaceRebalanceListener
-
onPartitionsLost
public void onPartitionsLost(Collection<LogPartition> partitions)
- Specified by:
onPartitionsLost
in interfaceRebalanceListener
-
onPartitionsAssigned
public void onPartitionsAssigned(Collection<LogPartition> partitions)
- Specified by:
onPartitionsAssigned
in interfaceRebalanceListener
-
-