Package org.nuxeo.runtime.stream
Class StreamServiceImpl
- java.lang.Object
-
- org.nuxeo.runtime.model.DefaultComponent
-
- org.nuxeo.runtime.stream.StreamServiceImpl
-
- All Implemented Interfaces:
Adaptable,Component,Extensible,TimestampedService,StreamService
public class StreamServiceImpl extends DefaultComponent implements StreamService
- Since:
- 9.3
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected classStreamServiceImpl.ComponentsLifeCycleListener
-
Field Summary
Fields Modifier and Type Field Description static StringDEFAULT_CODECprotected BooleanisStreamProcessingDisabledprotected LogManagerlogManagerstatic StringNUXEO_STREAM_DIR_PROPstatic StringNUXEO_STREAM_RET_DURATION_PROPstatic StringSTREAM_PROCESSING_ENABLEDprotected StreamManagerstreamManagerprotected static StringXP_LOG_CONFIGprotected static StringXP_STREAM_PROCESSOR-
Fields inherited from class org.nuxeo.runtime.model.DefaultComponent
lastModified, name
-
-
Constructor Summary
Constructors Constructor Description StreamServiceImpl()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected LogConfigcreateChronicleLogConfig(LogConfigDescriptor desc)protected LogConfigcreateKafkaLogConfig(LogConfigDescriptor desc)protected voidcreateLogIfNotExists(LogConfigDescriptor config)intgetApplicationStartedOrder()The component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext).protected PathgetChroniclePath(String basePath)protected StringgetChronicleRetention(String retention)protected List<LogConfig>getLogConfigs()LogManagergetLogManager()protected StringgetNodeId()protected SettingsgetSettings(StreamProcessorDescriptor descriptor)StreamManagergetStreamManager()protected voidinitProcessor(StreamProcessorDescriptor descriptor)protected booleanisProcessingDisabled()booleanrestartComputation(Name computation)Restart the computation thread pool.booleansetComputationPositionAfterDate(Name computation, Name stream, Instant after)Moving computation position after a date.booleansetComputationPositionToBeginning(Name computation, Name stream)Moving computation position to the beginning of stream.booleansetComputationPositionToEnd(Name computation, Name stream)Moving computation position to the end of stream.booleansetComputationPositionToOffset(Name computation, Name stream, int partition, long offset)Moving computation position to a specific offset for a partition.voidstart(ComponentContext context)Start the component.booleanstartProcessor(String processorName)Starts a registered Processor.protected voidstartProcessors()voidstop(ComponentContext context)Stop the component.booleanstopComputation(Name computation)Stop computation thread pool immediately.booleanstopProcessor(String processorName)Stop a running processor.voidstopProcessors()-
Methods inherited from class org.nuxeo.runtime.model.DefaultComponent
activate, addRuntimeMessage, addRuntimeMessage, deactivate, getAdapter, getDescriptor, getDescriptors, getLastModified, getRegistry, register, registerContribution, registerExtension, setLastModified, setModifiedNow, setName, unregister, unregisterContribution, unregisterExtension
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.nuxeo.runtime.model.Component
applicationStarted
-
Methods inherited from interface org.nuxeo.runtime.stream.StreamService
getLogManager, getStreamManager
-
-
-
-
Field Detail
-
NUXEO_STREAM_DIR_PROP
public static final String NUXEO_STREAM_DIR_PROP
- See Also:
- Constant Field Values
-
NUXEO_STREAM_RET_DURATION_PROP
public static final String NUXEO_STREAM_RET_DURATION_PROP
- See Also:
- Constant Field Values
-
DEFAULT_CODEC
public static final String DEFAULT_CODEC
- See Also:
- Constant Field Values
-
XP_LOG_CONFIG
protected static final String XP_LOG_CONFIG
- See Also:
- Constant Field Values
-
XP_STREAM_PROCESSOR
protected static final String XP_STREAM_PROCESSOR
- See Also:
- Constant Field Values
-
logManager
protected LogManager logManager
-
streamManager
protected StreamManager streamManager
-
STREAM_PROCESSING_ENABLED
public static final String STREAM_PROCESSING_ENABLED
- Since:
- 11.2
- See Also:
- Constant Field Values
-
isStreamProcessingDisabled
protected Boolean isStreamProcessingDisabled
-
-
Method Detail
-
getApplicationStartedOrder
public int getApplicationStartedOrder()
Description copied from interface:ComponentThe component notification order forComponent.applicationStarted(org.nuxeo.runtime.model.ComponentContext).Components are notified in increasing order. Order 1000 is the default order for components that don't care. Order 100 is the repository initialization.
- Specified by:
getApplicationStartedOrderin interfaceComponent- Returns:
- the order, 1000 by default
-
getLogManager
public LogManager getLogManager()
- Specified by:
getLogManagerin interfaceStreamService
-
getStreamManager
public StreamManager getStreamManager()
- Specified by:
getStreamManagerin interfaceStreamService
-
createLogIfNotExists
protected void createLogIfNotExists(LogConfigDescriptor config)
-
start
public void start(ComponentContext context)
Description copied from interface:ComponentStart the component. This method is called after all the components were resolved and activated- Specified by:
startin interfaceComponent- Overrides:
startin classDefaultComponent
-
getNodeId
protected String getNodeId()
-
createKafkaLogConfig
protected LogConfig createKafkaLogConfig(LogConfigDescriptor desc)
-
createChronicleLogConfig
protected LogConfig createChronicleLogConfig(LogConfigDescriptor desc)
-
initProcessor
protected void initProcessor(StreamProcessorDescriptor descriptor)
-
getSettings
protected Settings getSettings(StreamProcessorDescriptor descriptor)
-
stop
public void stop(ComponentContext context) throws InterruptedException
Description copied from interface:ComponentStop the component.- Specified by:
stopin interfaceComponent- Overrides:
stopin classDefaultComponent- Throws:
InterruptedException
-
startProcessors
protected void startProcessors()
-
stopProcessors
public void stopProcessors()
- Specified by:
stopProcessorsin interfaceStreamService
-
startProcessor
public boolean startProcessor(String processorName)
Description copied from interface:StreamServiceStarts a registered Processor. Returns true if the processor has been started.- Specified by:
startProcessorin interfaceStreamService
-
stopProcessor
public boolean stopProcessor(String processorName)
Description copied from interface:StreamServiceStop a running processor. Returns true if the processor has been stopped.- Specified by:
stopProcessorin interfaceStreamService
-
stopComputation
public boolean stopComputation(Name computation)
Description copied from interface:StreamServiceStop computation thread pool immediately.- Specified by:
stopComputationin interfaceStreamService
-
restartComputation
public boolean restartComputation(Name computation)
Description copied from interface:StreamServiceRestart the computation thread pool. Do nothing if the computation thread pool is already started.- Specified by:
restartComputationin interfaceStreamService
-
setComputationPositionToEnd
public boolean setComputationPositionToEnd(Name computation, Name stream)
Description copied from interface:StreamServiceMoving computation position to the end of stream. The computation thread pool must be stopped usingStreamService.stopComputation(Name)before changing its position.- Specified by:
setComputationPositionToEndin interfaceStreamService
-
setComputationPositionToBeginning
public boolean setComputationPositionToBeginning(Name computation, Name stream)
Description copied from interface:StreamServiceMoving computation position to the beginning of stream. The computation thread pool must be stopped usingStreamService.stopComputation(Name)before changing its position.- Specified by:
setComputationPositionToBeginningin interfaceStreamService
-
setComputationPositionToOffset
public boolean setComputationPositionToOffset(Name computation, Name stream, int partition, long offset)
Description copied from interface:StreamServiceMoving computation position to a specific offset for a partition. The computation thread pool must be stopped usingStreamService.stopComputation(Name)before changing its position.- Specified by:
setComputationPositionToOffsetin interfaceStreamService
-
setComputationPositionAfterDate
public boolean setComputationPositionAfterDate(Name computation, Name stream, Instant after)
Description copied from interface:StreamServiceMoving computation position after a date. The computation thread pool must be stopped usingStreamService.stopComputation(Name)before changing its position.- Specified by:
setComputationPositionAfterDatein interfaceStreamService
-
isProcessingDisabled
protected boolean isProcessingDisabled()
-
-