Class LogStreamManager
- java.lang.Object
-
- org.nuxeo.lib.stream.computation.log.LogStreamManager
-
- All Implemented Interfaces:
AutoCloseable
,StreamManager
public class LogStreamManager extends Object implements StreamManager
StreamManager based on a LogManager- Since:
- 11.1
-
-
Field Summary
Fields Modifier and Type Field Description protected Map<Name,RecordFilterChain>
filters
static Codec<Record>
INTERNAL_CODEC
protected LogManager
logManager
static String
METRICS_STREAM
protected Map<String,StreamProcessor>
processors
static String
PROCESSORS_STREAM
protected Map<String,Settings>
settings
protected Set<Name>
streams
protected Map<String,String>
systemMetadata
protected Map<String,Topology>
topologies
-
Constructor Summary
Constructors Constructor Description LogStreamManager(String nodeId, LogManager logManager)
LogStreamManager(LogManager logManager)
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description LogOffset
append(String streamUrn, Record record)
Appends a record to a processor's source stream.void
close()
StreamProcessor
createStreamProcessor(String processorName)
Creates a registered processor without starting it.LogTailer<Record>
createTailer(Name computationName, Collection<LogPartition> streamPartitions)
protected Codec<Record>
getCodec(Collection<Name> streams)
RecordFilter
getFilter(Name stream)
LogManager
getLogManager()
protected String
getNodeId()
StreamProcessor
getProcessor(String processorName)
Gets a processor.Set<String>
getProcessorNames()
Gets a set of processor names.protected void
initAppenders(Collection<String> streams, Settings settings)
protected void
initInternalStream(Name stream)
protected void
initInternalStreams()
protected void
initStream(String streamName, Settings settings)
protected void
initStreams(Topology topology, Settings settings)
void
register(String processorName, Topology topology, Settings settings)
Registers a processor and initializes the underlying streams, this is needed before creating a processor or appending record in source streams.void
register(List<String> streams, Settings settings)
Registers some source Streams without any processors.protected void
registerFilters(Collection<String> streams, Settings settings)
LogTailer<Record>
subscribe(Name computationName, Collection<Name> streams, RebalanceListener listener)
boolean
supportSubscribe()
Deprecated.since 2021.34 usesupportSubscribe(Name)
insteadboolean
supportSubscribe(Name stream)
Returnstrue
if thesubscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener)
method is supported for the specific stream.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.lib.stream.computation.StreamManager
registerAndCreateProcessor
-
-
-
-
Field Detail
-
PROCESSORS_STREAM
public static final String PROCESSORS_STREAM
- See Also:
- Constant Field Values
-
METRICS_STREAM
public static final String METRICS_STREAM
- See Also:
- Constant Field Values
-
logManager
protected final LogManager logManager
-
processors
protected final Map<String,StreamProcessor> processors
-
filters
protected final Map<Name,RecordFilterChain> filters
-
-
Constructor Detail
-
LogStreamManager
public LogStreamManager(LogManager logManager)
-
LogStreamManager
public LogStreamManager(String nodeId, LogManager logManager)
-
-
Method Detail
-
initInternalStreams
protected void initInternalStreams()
-
initInternalStream
protected void initInternalStream(Name stream)
-
register
public void register(String processorName, Topology topology, Settings settings)
Description copied from interface:StreamManager
Registers a processor and initializes the underlying streams, this is needed before creating a processor or appending record in source streams.- Specified by:
register
in interfaceStreamManager
-
register
public void register(List<String> streams, Settings settings)
Description copied from interface:StreamManager
Registers some source Streams without any processors.- Specified by:
register
in interfaceStreamManager
-
createStreamProcessor
public StreamProcessor createStreamProcessor(String processorName)
Description copied from interface:StreamManager
Creates a registered processor without starting it.- Specified by:
createStreamProcessor
in interfaceStreamManager
-
getNodeId
protected String getNodeId()
-
getLogManager
public LogManager getLogManager()
-
append
public LogOffset append(String streamUrn, Record record)
Description copied from interface:StreamManager
Appends a record to a processor's source stream.- Specified by:
append
in interfaceStreamManager
-
getProcessorNames
public Set<String> getProcessorNames()
Description copied from interface:StreamManager
Gets a set of processor names.- Specified by:
getProcessorNames
in interfaceStreamManager
-
getProcessor
public StreamProcessor getProcessor(String processorName)
Description copied from interface:StreamManager
Gets a processor.- Specified by:
getProcessor
in interfaceStreamManager
- Returns:
- null if the processor doesn't exist
-
close
public void close()
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceStreamManager
-
supportSubscribe
public boolean supportSubscribe(Name stream)
Returnstrue
if thesubscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener)
method is supported for the specific stream.- Since:
- 2021.34
-
supportSubscribe
public boolean supportSubscribe()
Deprecated.since 2021.34 usesupportSubscribe(Name)
insteadReturnstrue
if thesubscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener)
method is supported. Now deprecated because some implementations support subscribe only on specific streams.
-
subscribe
public LogTailer<Record> subscribe(Name computationName, Collection<Name> streams, RebalanceListener listener)
-
createTailer
public LogTailer<Record> createTailer(Name computationName, Collection<LogPartition> streamPartitions)
-
getFilter
public RecordFilter getFilter(Name stream)
-
getCodec
protected Codec<Record> getCodec(Collection<Name> streams)
-
initAppenders
protected void initAppenders(Collection<String> streams, Settings settings)
-
registerFilters
protected void registerFilters(Collection<String> streams, Settings settings)
-
-