Class LogStreamProcessor
- java.lang.Object
-
- org.nuxeo.lib.stream.computation.log.LogStreamProcessor
-
- All Implemented Interfaces:
StreamProcessor
public class LogStreamProcessor extends Object implements StreamProcessor
- Since:
- 9.3
-
-
Field Summary
Fields Modifier and Type Field Description protected LogManager
manager
protected boolean
needRegister
protected static com.fasterxml.jackson.databind.ObjectMapper
OBJECT_MAPPER
protected List<ComputationPool>
pools
protected Settings
settings
protected LogStreamManager
streamManager
protected Topology
topology
-
Constructor Summary
Constructors Constructor Description LogStreamProcessor(LogStreamManager streamManager)
LogStreamProcessor(LogManager manager)
Deprecated.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
drainAndStop(Duration timeout)
Stop computations when input streams are empty.protected Codec<Record>
getCodecForStreams(String name, Set<String> streams)
protected List<List<LogPartition>>
getDefaultAssignments(ComputationMetadataMapping meta)
protected String
getEdgeName(Topology.Vertex edge)
Latency
getLatency(String computationName)
Returns the latency for a computation.long
getLowWatermark()
Returns the low watermark for all the computations of the topology.long
getLowWatermark(String computationName)
Returns the low watermark for the computation.StreamProcessor
init(Topology topology, Settings settings)
Initialize streams, but don't run the computationsprotected List<ComputationPool>
initPools()
boolean
isDone(long timestamp)
Returns true if all messages with a lower timestamp has been processed by the topology.boolean
isTerminated()
True if there is no active processing threads.void
shutdown()
Shutdown immediately.void
start()
Run the initialized computations.boolean
startComputation(Name computation)
Start a computation thread pool that has been stopped usingStreamProcessor.stopComputation(Name)
boolean
stop(Duration timeout)
Try to stop computations gracefully after processing a record or a timer within the timeout duration.boolean
stopComputation(Name computation)
Stop a computation thread pool.String
toJson(Map<String,String> meta)
Returns a JSON representation of the processor, this includes the list of streams and computations with their settings along with the topology.boolean
waitForAssignments(Duration timeout)
Wait for the computations to have assigned partitions ready to process records.
-
-
-
Field Detail
-
manager
protected final LogManager manager
-
topology
protected Topology topology
-
settings
protected Settings settings
-
pools
protected List<ComputationPool> pools
-
streamManager
protected LogStreamManager streamManager
-
needRegister
protected final boolean needRegister
-
OBJECT_MAPPER
protected static final com.fasterxml.jackson.databind.ObjectMapper OBJECT_MAPPER
-
-
Constructor Detail
-
LogStreamProcessor
@Deprecated public LogStreamProcessor(LogManager manager)
Deprecated.
-
LogStreamProcessor
public LogStreamProcessor(LogStreamManager streamManager)
-
-
Method Detail
-
init
public StreamProcessor init(Topology topology, Settings settings)
Description copied from interface:StreamProcessor
Initialize streams, but don't run the computations- Specified by:
init
in interfaceStreamProcessor
-
start
public void start()
Description copied from interface:StreamProcessor
Run the initialized computations.- Specified by:
start
in interfaceStreamProcessor
-
waitForAssignments
public boolean waitForAssignments(Duration timeout) throws InterruptedException
Description copied from interface:StreamProcessor
Wait for the computations to have assigned partitions ready to process records. The processor must be started. This is useful for writing unit test.Returns
true
if all computations have assigned partitions during the timeout delay.- Specified by:
waitForAssignments
in interfaceStreamProcessor
- Throws:
InterruptedException
-
isTerminated
public boolean isTerminated()
Description copied from interface:StreamProcessor
True if there is no active processing threads.- Specified by:
isTerminated
in interfaceStreamProcessor
-
toJson
public String toJson(Map<String,String> meta)
Description copied from interface:StreamProcessor
Returns a JSON representation of the processor, this includes the list of streams and computations with their settings along with the topology.- Specified by:
toJson
in interfaceStreamProcessor
-
stopComputation
public boolean stopComputation(Name computation)
Description copied from interface:StreamProcessor
Stop a computation thread pool.- Specified by:
stopComputation
in interfaceStreamProcessor
- Returns:
- true if computation thread pool was found and stopped.
-
startComputation
public boolean startComputation(Name computation)
Description copied from interface:StreamProcessor
Start a computation thread pool that has been stopped usingStreamProcessor.stopComputation(Name)
- Specified by:
startComputation
in interfaceStreamProcessor
- Returns:
- true if computation thread pool has been started.
-
getEdgeName
protected String getEdgeName(Topology.Vertex edge)
-
stop
public boolean stop(Duration timeout)
Description copied from interface:StreamProcessor
Try to stop computations gracefully after processing a record or a timer within the timeout duration. If this can not be done within the timeout, shutdown and returns false.- Specified by:
stop
in interfaceStreamProcessor
-
drainAndStop
public boolean drainAndStop(Duration timeout)
Description copied from interface:StreamProcessor
Stop computations when input streams are empty. The timeout is applied for each computation, the total duration can be up to nb computations * timeoutReturns
true
if computations are stopped during the timeout delay.- Specified by:
drainAndStop
in interfaceStreamProcessor
-
shutdown
public void shutdown()
Description copied from interface:StreamProcessor
Shutdown immediately.- Specified by:
shutdown
in interfaceStreamProcessor
-
getLowWatermark
public long getLowWatermark()
Description copied from interface:StreamProcessor
Returns the low watermark for all the computations of the topology. Any message with an offset below the low watermark has been processed. The returned watermark is local to this processing node.- Specified by:
getLowWatermark
in interfaceStreamProcessor
-
getLatency
public Latency getLatency(String computationName)
Description copied from interface:StreamProcessor
Returns the latency for a computation. This works also for distributed computations.- Specified by:
getLatency
in interfaceStreamProcessor
-
getLowWatermark
public long getLowWatermark(String computationName)
Description copied from interface:StreamProcessor
Returns the low watermark for the computation. Any message with an offset below the low watermark has been processed by this computation and its ancestors. The returned watermark is local to this processing node, if the computation is distributed the global low watermark is the minimum of all nodes low watermark.- Specified by:
getLowWatermark
in interfaceStreamProcessor
-
isDone
public boolean isDone(long timestamp)
Description copied from interface:StreamProcessor
Returns true if all messages with a lower timestamp has been processed by the topology.- Specified by:
isDone
in interfaceStreamProcessor
-
initPools
protected List<ComputationPool> initPools()
-
getDefaultAssignments
protected List<List<LogPartition>> getDefaultAssignments(ComputationMetadataMapping meta)
-
-