Package org.nuxeo.lib.stream.computation
Class AbstractBatchComputation
java.lang.Object
org.nuxeo.lib.stream.computation.AbstractComputation
org.nuxeo.lib.stream.computation.AbstractBatchComputation
- All Implemented Interfaces:
Computation
- Direct Known Subclasses:
IndexingProcessor.AbstractIndexingComputation,StreamAuditWriter.AuditLogWriterComputation
An abstract
Computation that processes records by batch.
The batch capacity and threshold are defined in the computation policy.
- Since:
- 10.3
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected Stringprotected booleanprotected booleanprotected longstatic final StringFields inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
INPUT_1, INPUT_2, INPUT_3, INPUT_NULL, metadata, OUTPUT_1, OUTPUT_2, OUTPUT_3, OUTPUT_4 -
Constructor Summary
ConstructorsConstructorDescriptionAbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams) Constructor -
Method Summary
Modifier and TypeMethodDescriptionabstract voidbatchFailure(ComputationContext context, String inputStreamName, List<Record> records) Called when the retry policy has failed.protected abstract voidbatchProcess(ComputationContext context, String inputStreamName, List<Record> records) Called when: the batch capacity is reached the time threshold is reached the inputStreamName has changed If this method raises an exception the retry policy is applied.protected voidcheckpointBatch(ComputationContext context) voidinit(ComputationContext context) Called when the framework has registered the computation successfully.voidprocessFailure(ComputationContext context, Throwable failure) voidprocessRecord(ComputationContext context, String inputStreamName, Record record) Process an incoming record on one of the computation's input streams.voidprocessRetry(ComputationContext context, Throwable failure) voidprocessTimer(ComputationContext context, String key, long timestamp) Process a timer callback previously set viaComputationContext.setTimer(String, long).Methods inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
metadataMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.nuxeo.lib.stream.computation.Computation
destroy, signalStop
-
Field Details
-
TIMER_BATCH
- See Also:
-
batchRecords
-
currentInputStream
-
newBatch
protected boolean newBatch -
thresholdMillis
protected long thresholdMillis -
removeLastRecordOnRetry
protected boolean removeLastRecordOnRetry
-
-
Constructor Details
-
AbstractBatchComputation
Constructor- Parameters:
name- the name of the computationnbInputStreams- the number of input streamsnbOutputStreams- the number of output streams
-
-
Method Details
-
batchProcess
protected abstract void batchProcess(ComputationContext context, String inputStreamName, List<Record> records) Called when:- the batch capacity is reached
- the time threshold is reached
- the inputStreamName has changed
- Parameters:
context- used to send records to output streams, note that the checkpoint is managed automatically.inputStreamName- the input streams where the records are coming fromrecords- the batch of records
-
batchFailure
public abstract void batchFailure(ComputationContext context, String inputStreamName, List<Record> records) Called when the retry policy has failed. -
init
Description copied from interface:ComputationCalled when the framework has registered the computation successfully. Gives users a first opportunity to schedule timer callbacks and produce records. This method can be called multiple times.- Specified by:
initin interfaceComputation- Overrides:
initin classAbstractComputation- Parameters:
context- The computation context object provided by the system.
-
processTimer
Description copied from interface:ComputationProcess a timer callback previously set viaComputationContext.setTimer(String, long).- Specified by:
processTimerin interfaceComputation- Overrides:
processTimerin classAbstractComputation- Parameters:
context- The computation context object provided by the system.key- The name of the timer.timestamp- The timestamp (in ms) for which the callback was scheduled.
-
processRecord
Description copied from interface:ComputationProcess an incoming record on one of the computation's input streams.- Parameters:
context- The computation context object provided by the system.inputStreamName- Name of the input stream that provides the record.record- The record.
-
checkpointBatch
-
processRetry
Description copied from interface:ComputationCalled after a failure inComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)before retrying.- Specified by:
processRetryin interfaceComputation- Overrides:
processRetryin classAbstractComputation
-
processFailure
Description copied from interface:ComputationCalled whenComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)fails and cannot be retried.- Specified by:
processFailurein interfaceComputation- Overrides:
processFailurein classAbstractComputation
-