Package org.nuxeo.lib.stream.computation
Class AbstractBatchComputation
- java.lang.Object
-
- org.nuxeo.lib.stream.computation.AbstractComputation
-
- org.nuxeo.lib.stream.computation.AbstractBatchComputation
-
- All Implemented Interfaces:
Computation
- Direct Known Subclasses:
StreamAuditWriter.AuditLogWriterComputation
public abstract class AbstractBatchComputation extends AbstractComputation
An abstractComputationthat processes records by batch.The batch capacity and threshold are defined in the computation policy.
- Since:
- 10.3
-
-
Field Summary
Fields Modifier and Type Field Description protected List<Record>batchRecordsprotected StringcurrentInputStreamprotected booleannewBatchprotected booleanremoveLastRecordOnRetryprotected longthresholdMillisstatic StringTIMER_BATCH-
Fields inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
INPUT_1, INPUT_2, INPUT_3, INPUT_NULL, metadata, OUTPUT_1, OUTPUT_2, OUTPUT_3, OUTPUT_4
-
-
Constructor Summary
Constructors Constructor Description AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams)Constructor
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description abstract voidbatchFailure(ComputationContext context, String inputStreamName, List<Record> records)Called when the retry policy has failed.protected abstract voidbatchProcess(ComputationContext context, String inputStreamName, List<Record> records)Called when: the batch capacity is reached the time threshold is reached the inputStreamName has changed If this method raises an exception the retry policy is applied.protected voidcheckpointBatch(ComputationContext context)voidinit(ComputationContext context)Called when the framework has registered the computation successfully.voidprocessFailure(ComputationContext context, Throwable failure)voidprocessRecord(ComputationContext context, String inputStreamName, Record record)Process an incoming record on one of the computation's input streams.voidprocessRetry(ComputationContext context, Throwable failure)voidprocessTimer(ComputationContext context, String key, long timestamp)Process a timer callback previously set viaComputationContext.setTimer(String, long).-
Methods inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
metadata
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.lib.stream.computation.Computation
destroy, signalStop
-
-
-
-
Field Detail
-
TIMER_BATCH
public static final String TIMER_BATCH
- See Also:
- Constant Field Values
-
currentInputStream
protected String currentInputStream
-
newBatch
protected boolean newBatch
-
thresholdMillis
protected long thresholdMillis
-
removeLastRecordOnRetry
protected boolean removeLastRecordOnRetry
-
-
Constructor Detail
-
AbstractBatchComputation
public AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams)
Constructor- Parameters:
name- the name of the computationnbInputStreams- the number of input streamsnbOutputStreams- the number of output streams
-
-
Method Detail
-
batchProcess
protected abstract void batchProcess(ComputationContext context, String inputStreamName, List<Record> records)
Called when:- the batch capacity is reached
- the time threshold is reached
- the inputStreamName has changed
- Parameters:
context- used to send records to output streams, note that the checkpoint is managed automatically.inputStreamName- the input streams where the records are coming fromrecords- the batch of records
-
batchFailure
public abstract void batchFailure(ComputationContext context, String inputStreamName, List<Record> records)
Called when the retry policy has failed.
-
init
public void init(ComputationContext context)
Description copied from interface:ComputationCalled when the framework has registered the computation successfully. Gives users a first opportunity to schedule timer callbacks and produce records. This method can be called multiple times.- Specified by:
initin interfaceComputation- Overrides:
initin classAbstractComputation- Parameters:
context- The computation context object provided by the system.
-
processTimer
public void processTimer(ComputationContext context, String key, long timestamp)
Description copied from interface:ComputationProcess a timer callback previously set viaComputationContext.setTimer(String, long).- Specified by:
processTimerin interfaceComputation- Overrides:
processTimerin classAbstractComputation- Parameters:
context- The computation context object provided by the system.key- The name of the timer.timestamp- The timestamp (in ms) for which the callback was scheduled.
-
processRecord
public void processRecord(ComputationContext context, String inputStreamName, Record record)
Description copied from interface:ComputationProcess an incoming record on one of the computation's input streams.- Parameters:
context- The computation context object provided by the system.inputStreamName- Name of the input stream that provides the record.record- The record.
-
checkpointBatch
protected void checkpointBatch(ComputationContext context)
-
processRetry
public void processRetry(ComputationContext context, Throwable failure)
Description copied from interface:ComputationCalled after a failure inComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)before retrying.- Specified by:
processRetryin interfaceComputation- Overrides:
processRetryin classAbstractComputation
-
processFailure
public void processFailure(ComputationContext context, Throwable failure)
Description copied from interface:ComputationCalled whenComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)fails and cannot be retried.- Specified by:
processFailurein interfaceComputation- Overrides:
processFailurein classAbstractComputation
-
-