Package org.nuxeo.lib.stream.computation
Class AbstractBatchComputation
java.lang.Object
org.nuxeo.lib.stream.computation.AbstractComputation
org.nuxeo.lib.stream.computation.AbstractBatchComputation
- All Implemented Interfaces:
Computation
- Direct Known Subclasses:
StreamAuditWriter.AuditLogWriterComputation
An abstract
Computation
that processes records by batch.
The batch capacity and threshold are defined in the computation policy.
- Since:
- 10.3
-
Field Summary
Modifier and TypeFieldDescriptionprotected String
protected boolean
protected boolean
protected long
static final String
Fields inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
INPUT_1, INPUT_2, INPUT_3, INPUT_NULL, metadata, OUTPUT_1, OUTPUT_2, OUTPUT_3, OUTPUT_4
-
Constructor Summary
ConstructorDescriptionAbstractBatchComputation
(String name, int nbInputStreams, int nbOutputStreams) Constructor -
Method Summary
Modifier and TypeMethodDescriptionabstract void
batchFailure
(ComputationContext context, String inputStreamName, List<Record> records) Called when the retry policy has failed.protected abstract void
batchProcess
(ComputationContext context, String inputStreamName, List<Record> records) Called when: the batch capacity is reached the time threshold is reached the inputStreamName has changed If this method raises an exception the retry policy is applied.protected void
checkpointBatch
(ComputationContext context) void
init
(ComputationContext context) Called when the framework has registered the computation successfully.void
processFailure
(ComputationContext context, Throwable failure) void
processRecord
(ComputationContext context, String inputStreamName, Record record) Process an incoming record on one of the computation's input streams.void
processRetry
(ComputationContext context, Throwable failure) void
processTimer
(ComputationContext context, String key, long timestamp) Process a timer callback previously set viaComputationContext.setTimer(String, long)
.Methods inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
metadata
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.nuxeo.lib.stream.computation.Computation
destroy, signalStop
-
Field Details
-
TIMER_BATCH
- See Also:
-
batchRecords
-
currentInputStream
-
newBatch
protected boolean newBatch -
thresholdMillis
protected long thresholdMillis -
removeLastRecordOnRetry
protected boolean removeLastRecordOnRetry
-
-
Constructor Details
-
AbstractBatchComputation
Constructor- Parameters:
name
- the name of the computationnbInputStreams
- the number of input streamsnbOutputStreams
- the number of output streams
-
-
Method Details
-
batchProcess
protected abstract void batchProcess(ComputationContext context, String inputStreamName, List<Record> records) Called when:- the batch capacity is reached
- the time threshold is reached
- the inputStreamName has changed
- Parameters:
context
- used to send records to output streams, note that the checkpoint is managed automatically.inputStreamName
- the input streams where the records are coming fromrecords
- the batch of records
-
batchFailure
public abstract void batchFailure(ComputationContext context, String inputStreamName, List<Record> records) Called when the retry policy has failed. -
init
Description copied from interface:Computation
Called when the framework has registered the computation successfully. Gives users a first opportunity to schedule timer callbacks and produce records. This method can be called multiple times.- Specified by:
init
in interfaceComputation
- Overrides:
init
in classAbstractComputation
- Parameters:
context
- The computation context object provided by the system.
-
processTimer
Description copied from interface:Computation
Process a timer callback previously set viaComputationContext.setTimer(String, long)
.- Specified by:
processTimer
in interfaceComputation
- Overrides:
processTimer
in classAbstractComputation
- Parameters:
context
- The computation context object provided by the system.key
- The name of the timer.timestamp
- The timestamp (in ms) for which the callback was scheduled.
-
processRecord
Description copied from interface:Computation
Process an incoming record on one of the computation's input streams.- Parameters:
context
- The computation context object provided by the system.inputStreamName
- Name of the input stream that provides the record.record
- The record.
-
checkpointBatch
-
processRetry
Description copied from interface:Computation
Called after a failure inComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)
orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)
before retrying.- Specified by:
processRetry
in interfaceComputation
- Overrides:
processRetry
in classAbstractComputation
-
processFailure
Description copied from interface:Computation
Called whenComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)
orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)
fails and cannot be retried.- Specified by:
processFailure
in interfaceComputation
- Overrides:
processFailure
in classAbstractComputation
-