Package org.nuxeo.lib.stream.computation
Class AbstractBatchComputation
- java.lang.Object
-
- org.nuxeo.lib.stream.computation.AbstractComputation
-
- org.nuxeo.lib.stream.computation.AbstractBatchComputation
-
- All Implemented Interfaces:
Computation
- Direct Known Subclasses:
StreamAuditWriter.AuditLogWriterComputation
public abstract class AbstractBatchComputation extends AbstractComputation
An abstractComputation
that processes records by batch.The batch capacity and threshold are defined in the computation policy.
- Since:
- 10.3
-
-
Field Summary
Fields Modifier and Type Field Description protected List<Record>
batchRecords
protected String
currentInputStream
protected boolean
newBatch
protected boolean
removeLastRecordOnRetry
protected long
thresholdMillis
static String
TIMER_BATCH
-
Fields inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
INPUT_1, INPUT_2, INPUT_3, INPUT_NULL, metadata, OUTPUT_1, OUTPUT_2, OUTPUT_3, OUTPUT_4
-
-
Constructor Summary
Constructors Constructor Description AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams)
Constructor
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description abstract void
batchFailure(ComputationContext context, String inputStreamName, List<Record> records)
Called when the retry policy has failed.protected abstract void
batchProcess(ComputationContext context, String inputStreamName, List<Record> records)
Called when: the batch capacity is reached the time threshold is reached the inputStreamName has changed If this method raises an exception the retry policy is applied.protected void
checkpointBatch(ComputationContext context)
void
init(ComputationContext context)
Called when the framework has registered the computation successfully.void
processFailure(ComputationContext context, Throwable failure)
void
processRecord(ComputationContext context, String inputStreamName, Record record)
Process an incoming record on one of the computation's input streams.void
processRetry(ComputationContext context, Throwable failure)
void
processTimer(ComputationContext context, String key, long timestamp)
Process a timer callback previously set viaComputationContext.setTimer(String, long)
.-
Methods inherited from class org.nuxeo.lib.stream.computation.AbstractComputation
metadata
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.lib.stream.computation.Computation
destroy, signalStop
-
-
-
-
Field Detail
-
TIMER_BATCH
public static final String TIMER_BATCH
- See Also:
- Constant Field Values
-
currentInputStream
protected String currentInputStream
-
newBatch
protected boolean newBatch
-
thresholdMillis
protected long thresholdMillis
-
removeLastRecordOnRetry
protected boolean removeLastRecordOnRetry
-
-
Constructor Detail
-
AbstractBatchComputation
public AbstractBatchComputation(String name, int nbInputStreams, int nbOutputStreams)
Constructor- Parameters:
name
- the name of the computationnbInputStreams
- the number of input streamsnbOutputStreams
- the number of output streams
-
-
Method Detail
-
batchProcess
protected abstract void batchProcess(ComputationContext context, String inputStreamName, List<Record> records)
Called when:- the batch capacity is reached
- the time threshold is reached
- the inputStreamName has changed
- Parameters:
context
- used to send records to output streams, note that the checkpoint is managed automatically.inputStreamName
- the input streams where the records are coming fromrecords
- the batch of records
-
batchFailure
public abstract void batchFailure(ComputationContext context, String inputStreamName, List<Record> records)
Called when the retry policy has failed.
-
init
public void init(ComputationContext context)
Description copied from interface:Computation
Called when the framework has registered the computation successfully. Gives users a first opportunity to schedule timer callbacks and produce records. This method can be called multiple times.- Specified by:
init
in interfaceComputation
- Overrides:
init
in classAbstractComputation
- Parameters:
context
- The computation context object provided by the system.
-
processTimer
public void processTimer(ComputationContext context, String key, long timestamp)
Description copied from interface:Computation
Process a timer callback previously set viaComputationContext.setTimer(String, long)
.- Specified by:
processTimer
in interfaceComputation
- Overrides:
processTimer
in classAbstractComputation
- Parameters:
context
- The computation context object provided by the system.key
- The name of the timer.timestamp
- The timestamp (in ms) for which the callback was scheduled.
-
processRecord
public void processRecord(ComputationContext context, String inputStreamName, Record record)
Description copied from interface:Computation
Process an incoming record on one of the computation's input streams.- Parameters:
context
- The computation context object provided by the system.inputStreamName
- Name of the input stream that provides the record.record
- The record.
-
checkpointBatch
protected void checkpointBatch(ComputationContext context)
-
processRetry
public void processRetry(ComputationContext context, Throwable failure)
Description copied from interface:Computation
Called after a failure inComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)
orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)
before retrying.- Specified by:
processRetry
in interfaceComputation
- Overrides:
processRetry
in classAbstractComputation
-
processFailure
public void processFailure(ComputationContext context, Throwable failure)
Description copied from interface:Computation
Called whenComputation.processRecord(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, org.nuxeo.lib.stream.computation.Record)
orComputation.processTimer(org.nuxeo.lib.stream.computation.ComputationContext, java.lang.String, long)
fails and cannot be retried.- Specified by:
processFailure
in interfaceComputation
- Overrides:
processFailure
in classAbstractComputation
-
-