Package org.nuxeo.lib.stream.log.kafka
Class KafkaLogManager
- java.lang.Object
-
- org.nuxeo.lib.stream.log.internals.AbstractLogManager
-
- org.nuxeo.lib.stream.log.kafka.KafkaLogManager
-
- All Implemented Interfaces:
AutoCloseable
,LogManager
public class KafkaLogManager extends AbstractLogManager
- Since:
- 9.3
-
-
Field Summary
Fields Modifier and Type Field Description protected List<KafkaLogConfig>
configs
protected KafkaLogConfig
defaultConfig
static String
DISABLE_SUBSCRIBE_PROP
protected Map<KafkaLogConfig,KafkaUtils>
kUtils
-
Fields inherited from class org.nuxeo.lib.stream.log.internals.AbstractLogManager
ADMIN_GROUP, appenders, tailers, tailersAssignments
-
-
Constructor Summary
Constructors Constructor Description KafkaLogManager(String prefix, Properties producerProperties, Properties consumerProperties)
KafkaLogManager(List<KafkaLogConfig> kafkaConfigs)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
checkValidPartition(LogPartition partition)
void
close()
void
create(Name name, int size)
<M extends Externalizable>
CloseableLogAppender<M>createAppender(Name name, Codec<M> codec)
boolean
delete(Name name)
Tries to delete a Log.void
deleteConsumers()
Remove all existing consumers and their committed positions.void
deleteRecords(Name name)
Delete all records of a stream by moving the first offsets to end of each partition.protected <M extends Externalizable>
LogTailer<M>doCreateTailer(Collection<LogPartition> partitions, Name group, Codec<M> codec)
protected <M extends Externalizable>
LogTailer<M>doSubscribe(Name group, Collection<Name> names, RebalanceListener listener, Codec<M> codec)
boolean
exists(Name name)
Returnstrue
if a Log with thisname
exists.protected String
filterDisplayedProperties(Properties properties)
protected KafkaLogConfig
findDefaultConfig()
protected KafkaLogConfig
getConfig(Name name)
protected KafkaLogConfig
getConfig(Name name, Name group)
List<LogLag>
getLagPerPartition(Name name, Name group)
Returns the lag between consumergroup
and the producers for each partition.protected int
getSize(Name name)
List<Name>
listAllNames()
Returns all the Log names.List<Name>
listConsumerGroups(Name name)
List the consumer groups for a Log.boolean
supportSubscribe()
boolean
supportSubscribe(Name stream)
Returnstrue
if theLogManager.subscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener, org.nuxeo.lib.stream.codec.Codec<M>)
method is supported for the specific Log.String
toString()
-
Methods inherited from class org.nuxeo.lib.stream.log.internals.AbstractLogManager
checkInvalidAssignment, checkInvalidCodec, cleanTailers, createIfNotExists, createTailer, getAppender, getLatencyPerPartition, guessCodec, sameCodec, size, subscribe
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.nuxeo.lib.stream.log.LogManager
createIfNotExists, createTailer, createTailer, createTailer, createTailer, createTailer, createTailer, createTailer, createTailer, createTailer, createTailer, createTailer, delete, exists, getAppender, getAppender, getAppender, getLag, getLag, getLagPerPartition, getLatency, getLatency, getLatencyPerPartition, listAll, listConsumerGroups, size, subscribe, subscribe, subscribe
-
-
-
-
Field Detail
-
DISABLE_SUBSCRIBE_PROP
public static final String DISABLE_SUBSCRIBE_PROP
- See Also:
- Constant Field Values
-
configs
protected final List<KafkaLogConfig> configs
-
defaultConfig
protected final KafkaLogConfig defaultConfig
-
kUtils
protected final Map<KafkaLogConfig,KafkaUtils> kUtils
-
-
Constructor Detail
-
KafkaLogManager
public KafkaLogManager(String prefix, Properties producerProperties, Properties consumerProperties)
- Since:
- 10.2
-
KafkaLogManager
public KafkaLogManager(List<KafkaLogConfig> kafkaConfigs)
- Since:
- 11.1
-
-
Method Detail
-
findDefaultConfig
protected KafkaLogConfig findDefaultConfig()
-
getConfig
protected KafkaLogConfig getConfig(Name name)
-
getConfig
protected KafkaLogConfig getConfig(Name name, Name group)
-
create
public void create(Name name, int size)
- Specified by:
create
in classAbstractLogManager
-
getSize
protected int getSize(Name name)
- Specified by:
getSize
in classAbstractLogManager
-
exists
public boolean exists(Name name)
Description copied from interface:LogManager
Returnstrue
if a Log with thisname
exists.
-
createAppender
public <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec)
- Specified by:
createAppender
in classAbstractLogManager
-
doCreateTailer
protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, Name group, Codec<M> codec)
- Specified by:
doCreateTailer
in classAbstractLogManager
-
checkValidPartition
protected void checkValidPartition(LogPartition partition)
-
close
public void close()
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceLogManager
- Overrides:
close
in classAbstractLogManager
-
supportSubscribe
public boolean supportSubscribe()
Description copied from interface:LogManager
Returnstrue
if the LogLogManager.subscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener, org.nuxeo.lib.stream.codec.Codec<M>)
method is supported. Now deprecated because some implementations support subscribe only on specific Log names.- Specified by:
supportSubscribe
in interfaceLogManager
- Overrides:
supportSubscribe
in classAbstractLogManager
-
supportSubscribe
public boolean supportSubscribe(Name stream)
Description copied from interface:LogManager
Returnstrue
if theLogManager.subscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener, org.nuxeo.lib.stream.codec.Codec<M>)
method is supported for the specific Log.- Specified by:
supportSubscribe
in interfaceLogManager
- Overrides:
supportSubscribe
in classAbstractLogManager
-
doSubscribe
protected <M extends Externalizable> LogTailer<M> doSubscribe(Name group, Collection<Name> names, RebalanceListener listener, Codec<M> codec)
- Specified by:
doSubscribe
in classAbstractLogManager
-
getLagPerPartition
public List<LogLag> getLagPerPartition(Name name, Name group)
Description copied from interface:LogManager
Returns the lag between consumergroup
and the producers for each partition. The result list is ordered, for instance index 0 is lag for partition 0.- Specified by:
getLagPerPartition
in interfaceLogManager
- Specified by:
getLagPerPartition
in classAbstractLogManager
-
listAllNames
public List<Name> listAllNames()
Description copied from interface:LogManager
Returns all the Log names.
-
filterDisplayedProperties
protected String filterDisplayedProperties(Properties properties)
-
listConsumerGroups
public List<Name> listConsumerGroups(Name name)
Description copied from interface:LogManager
List the consumer groups for a Log.
-
delete
public boolean delete(Name name)
Description copied from interface:LogManager
Tries to delete a Log. Returns true if successfully deleted, might not be possible depending on the implementation.- Specified by:
delete
in interfaceLogManager
- Overrides:
delete
in classAbstractLogManager
-
deleteRecords
public void deleteRecords(Name name)
Description copied from interface:LogManager
Delete all records of a stream by moving the first offsets to end of each partition. Exposed for testing purpose, might not be implemented on all implementation.
-
deleteConsumers
public void deleteConsumers()
Description copied from interface:LogManager
Remove all existing consumers and their committed positions. Exposed for testing purpose, might not be implemented by all implementation.
-
-