Package org.nuxeo.runtime.stream
Class StreamServiceImpl
- java.lang.Object
-
- org.nuxeo.runtime.model.DefaultComponent
-
- org.nuxeo.runtime.stream.StreamServiceImpl
-
- All Implemented Interfaces:
Adaptable
,Component
,Extensible
,TimestampedService
,StreamService
public class StreamServiceImpl extends DefaultComponent implements StreamService
- Since:
- 9.3
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
StreamServiceImpl.ComponentsLifeCycleListener
-
Field Summary
Fields Modifier and Type Field Description static String
DEFAULT_CODEC
protected Boolean
isStreamProcessingDisabled
protected LogManager
logManager
static String
NUXEO_STREAM_DIR_PROP
static String
NUXEO_STREAM_RET_DURATION_PROP
static String
STREAM_PROCESSING_ENABLED
protected StreamManager
streamManager
protected static String
XP_LOG_CONFIG
protected static String
XP_STREAM_PROCESSOR
-
Fields inherited from class org.nuxeo.runtime.model.DefaultComponent
lastModified, name
-
-
Constructor Summary
Constructors Constructor Description StreamServiceImpl()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected LogConfig
createChronicleLogConfig(LogConfigDescriptor desc)
protected LogConfig
createKafkaLogConfig(LogConfigDescriptor desc)
protected void
createLogIfNotExists(LogConfigDescriptor config)
int
getApplicationStartedOrder()
The component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext)
.protected Path
getChroniclePath(String basePath)
protected String
getChronicleRetention(String retention)
protected List<LogConfig>
getLogConfigs()
LogManager
getLogManager()
protected String
getNodeId()
protected Settings
getSettings(StreamProcessorDescriptor descriptor)
StreamManager
getStreamManager()
protected void
initProcessor(StreamProcessorDescriptor descriptor)
protected boolean
isProcessingDisabled()
boolean
restartComputation(Name computation)
Restart the computation thread pool.boolean
setComputationPositionAfterDate(Name computation, Name stream, Instant after)
Moving computation position after a date.boolean
setComputationPositionToBeginning(Name computation, Name stream)
Moving computation position to the beginning of stream.boolean
setComputationPositionToEnd(Name computation, Name stream)
Moving computation position to the end of stream.boolean
setComputationPositionToOffset(Name computation, Name stream, int partition, long offset)
Moving computation position to a specific offset for a partition.void
start(ComponentContext context)
Start the component.boolean
startProcessor(String processorName)
Starts a registered Processor.protected void
startProcessors()
void
stop(ComponentContext context)
Stop the component.boolean
stopComputation(Name computation)
Stop computation thread pool immediately.boolean
stopProcessor(String processorName)
Stop a running processor.void
stopProcessors()
-
Methods inherited from class org.nuxeo.runtime.model.DefaultComponent
activate, addRuntimeMessage, addRuntimeMessage, deactivate, getAdapter, getDescriptor, getDescriptors, getLastModified, getRegistry, register, registerContribution, registerExtension, setLastModified, setModifiedNow, setName, unregister, unregisterContribution, unregisterExtension
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.runtime.model.Component
applicationStarted
-
Methods inherited from interface org.nuxeo.runtime.stream.StreamService
getLogManager, getStreamManager
-
-
-
-
Field Detail
-
NUXEO_STREAM_DIR_PROP
public static final String NUXEO_STREAM_DIR_PROP
- See Also:
- Constant Field Values
-
NUXEO_STREAM_RET_DURATION_PROP
public static final String NUXEO_STREAM_RET_DURATION_PROP
- See Also:
- Constant Field Values
-
DEFAULT_CODEC
public static final String DEFAULT_CODEC
- See Also:
- Constant Field Values
-
XP_LOG_CONFIG
protected static final String XP_LOG_CONFIG
- See Also:
- Constant Field Values
-
XP_STREAM_PROCESSOR
protected static final String XP_STREAM_PROCESSOR
- See Also:
- Constant Field Values
-
logManager
protected LogManager logManager
-
streamManager
protected StreamManager streamManager
-
STREAM_PROCESSING_ENABLED
public static final String STREAM_PROCESSING_ENABLED
- Since:
- 11.2
- See Also:
- Constant Field Values
-
isStreamProcessingDisabled
protected Boolean isStreamProcessingDisabled
-
-
Method Detail
-
getApplicationStartedOrder
public int getApplicationStartedOrder()
Description copied from interface:Component
The component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext)
.Components are notified in increasing order. Order 1000 is the default order for components that don't care. Order 100 is the repository initialization.
- Specified by:
getApplicationStartedOrder
in interfaceComponent
- Returns:
- the order, 1000 by default
-
getLogManager
public LogManager getLogManager()
- Specified by:
getLogManager
in interfaceStreamService
-
getStreamManager
public StreamManager getStreamManager()
- Specified by:
getStreamManager
in interfaceStreamService
-
createLogIfNotExists
protected void createLogIfNotExists(LogConfigDescriptor config)
-
start
public void start(ComponentContext context)
Description copied from interface:Component
Start the component. This method is called after all the components were resolved and activated- Specified by:
start
in interfaceComponent
- Overrides:
start
in classDefaultComponent
-
getNodeId
protected String getNodeId()
-
createKafkaLogConfig
protected LogConfig createKafkaLogConfig(LogConfigDescriptor desc)
-
createChronicleLogConfig
protected LogConfig createChronicleLogConfig(LogConfigDescriptor desc)
-
initProcessor
protected void initProcessor(StreamProcessorDescriptor descriptor)
-
getSettings
protected Settings getSettings(StreamProcessorDescriptor descriptor)
-
stop
public void stop(ComponentContext context) throws InterruptedException
Description copied from interface:Component
Stop the component.- Specified by:
stop
in interfaceComponent
- Overrides:
stop
in classDefaultComponent
- Throws:
InterruptedException
-
startProcessors
protected void startProcessors()
-
stopProcessors
public void stopProcessors()
- Specified by:
stopProcessors
in interfaceStreamService
-
startProcessor
public boolean startProcessor(String processorName)
Description copied from interface:StreamService
Starts a registered Processor. Returns true if the processor has been started.- Specified by:
startProcessor
in interfaceStreamService
-
stopProcessor
public boolean stopProcessor(String processorName)
Description copied from interface:StreamService
Stop a running processor. Returns true if the processor has been stopped.- Specified by:
stopProcessor
in interfaceStreamService
-
stopComputation
public boolean stopComputation(Name computation)
Description copied from interface:StreamService
Stop computation thread pool immediately.- Specified by:
stopComputation
in interfaceStreamService
-
restartComputation
public boolean restartComputation(Name computation)
Description copied from interface:StreamService
Restart the computation thread pool. Do nothing if the computation thread pool is already started.- Specified by:
restartComputation
in interfaceStreamService
-
setComputationPositionToEnd
public boolean setComputationPositionToEnd(Name computation, Name stream)
Description copied from interface:StreamService
Moving computation position to the end of stream. The computation thread pool must be stopped usingStreamService.stopComputation(Name)
before changing its position.- Specified by:
setComputationPositionToEnd
in interfaceStreamService
-
setComputationPositionToBeginning
public boolean setComputationPositionToBeginning(Name computation, Name stream)
Description copied from interface:StreamService
Moving computation position to the beginning of stream. The computation thread pool must be stopped usingStreamService.stopComputation(Name)
before changing its position.- Specified by:
setComputationPositionToBeginning
in interfaceStreamService
-
setComputationPositionToOffset
public boolean setComputationPositionToOffset(Name computation, Name stream, int partition, long offset)
Description copied from interface:StreamService
Moving computation position to a specific offset for a partition. The computation thread pool must be stopped usingStreamService.stopComputation(Name)
before changing its position.- Specified by:
setComputationPositionToOffset
in interfaceStreamService
-
setComputationPositionAfterDate
public boolean setComputationPositionAfterDate(Name computation, Name stream, Instant after)
Description copied from interface:StreamService
Moving computation position after a date. The computation thread pool must be stopped usingStreamService.stopComputation(Name)
before changing its position.- Specified by:
setComputationPositionAfterDate
in interfaceStreamService
-
isProcessingDisabled
protected boolean isProcessingDisabled()
-
-