Class LogStreamManager
- java.lang.Object
-
- org.nuxeo.lib.stream.computation.log.LogStreamManager
-
- All Implemented Interfaces:
AutoCloseable,StreamManager
public class LogStreamManager extends Object implements StreamManager
StreamManager based on a LogManager- Since:
- 11.1
-
-
Field Summary
Fields Modifier and Type Field Description protected Map<Name,RecordFilterChain>filtersstatic Codec<Record>INTERNAL_CODECprotected LogManagerlogManagerstatic StringMETRICS_STREAMprotected Map<String,StreamProcessor>processorsstatic StringPROCESSORS_STREAMprotected Map<String,Settings>settingsprotected Set<Name>streamsprotected Map<String,String>systemMetadataprotected Map<String,Topology>topologies
-
Constructor Summary
Constructors Constructor Description LogStreamManager(String nodeId, LogManager logManager)LogStreamManager(LogManager logManager)
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description LogOffsetappend(String streamUrn, Record record)Appends a record to a processor's source stream.voidclose()StreamProcessorcreateStreamProcessor(String processorName)Creates a registered processor without starting it.LogTailer<Record>createTailer(Name computationName, Collection<LogPartition> streamPartitions)protected Codec<Record>getCodec(Collection<Name> streams)RecordFiltergetFilter(Name stream)LogManagergetLogManager()protected StringgetNodeId()StreamProcessorgetProcessor(String processorName)Gets a processor.Set<String>getProcessorNames()Gets a set of processor names.protected voidinitAppenders(Collection<String> streams, Settings settings)protected voidinitInternalStream(Name stream)protected voidinitInternalStreams()protected voidinitStream(String streamName, Settings settings)protected voidinitStreams(Topology topology, Settings settings)voidregister(String processorName, Topology topology, Settings settings)Registers a processor and initializes the underlying streams, this is needed before creating a processor or appending record in source streams.voidregister(List<String> streams, Settings settings)Registers some source Streams without any processors.protected voidregisterFilters(Collection<String> streams, Settings settings)LogTailer<Record>subscribe(Name computationName, Collection<Name> streams, RebalanceListener listener)booleansupportSubscribe()Deprecated.since 2021.34 usesupportSubscribe(Name)insteadbooleansupportSubscribe(Name stream)Returnstrueif thesubscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener)method is supported for the specific stream.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.lib.stream.computation.StreamManager
registerAndCreateProcessor
-
-
-
-
Field Detail
-
PROCESSORS_STREAM
public static final String PROCESSORS_STREAM
- See Also:
- Constant Field Values
-
METRICS_STREAM
public static final String METRICS_STREAM
- See Also:
- Constant Field Values
-
logManager
protected final LogManager logManager
-
processors
protected final Map<String,StreamProcessor> processors
-
filters
protected final Map<Name,RecordFilterChain> filters
-
-
Constructor Detail
-
LogStreamManager
public LogStreamManager(LogManager logManager)
-
LogStreamManager
public LogStreamManager(String nodeId, LogManager logManager)
-
-
Method Detail
-
initInternalStreams
protected void initInternalStreams()
-
initInternalStream
protected void initInternalStream(Name stream)
-
register
public void register(String processorName, Topology topology, Settings settings)
Description copied from interface:StreamManagerRegisters a processor and initializes the underlying streams, this is needed before creating a processor or appending record in source streams.- Specified by:
registerin interfaceStreamManager
-
register
public void register(List<String> streams, Settings settings)
Description copied from interface:StreamManagerRegisters some source Streams without any processors.- Specified by:
registerin interfaceStreamManager
-
createStreamProcessor
public StreamProcessor createStreamProcessor(String processorName)
Description copied from interface:StreamManagerCreates a registered processor without starting it.- Specified by:
createStreamProcessorin interfaceStreamManager
-
getNodeId
protected String getNodeId()
-
getLogManager
public LogManager getLogManager()
-
append
public LogOffset append(String streamUrn, Record record)
Description copied from interface:StreamManagerAppends a record to a processor's source stream.- Specified by:
appendin interfaceStreamManager
-
getProcessorNames
public Set<String> getProcessorNames()
Description copied from interface:StreamManagerGets a set of processor names.- Specified by:
getProcessorNamesin interfaceStreamManager
-
getProcessor
public StreamProcessor getProcessor(String processorName)
Description copied from interface:StreamManagerGets a processor.- Specified by:
getProcessorin interfaceStreamManager- Returns:
- null if the processor doesn't exist
-
close
public void close()
- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceStreamManager
-
supportSubscribe
public boolean supportSubscribe(Name stream)
Returnstrueif thesubscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener)method is supported for the specific stream.- Since:
- 2021.34
-
supportSubscribe
public boolean supportSubscribe()
Deprecated.since 2021.34 usesupportSubscribe(Name)insteadReturnstrueif thesubscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener)method is supported. Now deprecated because some implementations support subscribe only on specific streams.
-
subscribe
public LogTailer<Record> subscribe(Name computationName, Collection<Name> streams, RebalanceListener listener)
-
createTailer
public LogTailer<Record> createTailer(Name computationName, Collection<LogPartition> streamPartitions)
-
getFilter
public RecordFilter getFilter(Name stream)
-
getCodec
protected Codec<Record> getCodec(Collection<Name> streams)
-
initAppenders
protected void initAppenders(Collection<String> streams, Settings settings)
-
registerFilters
protected void registerFilters(Collection<String> streams, Settings settings)
-
-