Package org.nuxeo.lib.stream.log.kafka
Class KafkaLogTailer<M extends Externalizable>
- java.lang.Object
-
- org.nuxeo.lib.stream.log.kafka.KafkaLogTailer<M>
-
- All Implemented Interfaces:
AutoCloseable
,org.apache.kafka.clients.consumer.ConsumerRebalanceListener
,LogTailer<M>
public class KafkaLogTailer<M extends Externalizable> extends Object implements LogTailer<M>, org.apache.kafka.clients.consumer.ConsumerRebalanceListener
- Since:
- 9.3
-
-
Field Summary
Fields Modifier and Type Field Description protected boolean
closed
protected Codec<M>
codec
protected org.apache.kafka.clients.consumer.KafkaConsumer<String,org.apache.kafka.common.utils.Bytes>
consumer
protected static AtomicInteger
CONSUMER_CLIENT_ID_SEQUENCE
protected boolean
consumerMoved
protected Codec<M>
decodeCodec
protected Name
group
protected String
id
protected boolean
isLost
protected boolean
isRebalanced
protected boolean
isRevoked
protected Map<org.apache.kafka.common.TopicPartition,Long>
lastCommittedOffsets
protected Map<org.apache.kafka.common.TopicPartition,Long>
lastOffsets
protected int
lastPollSize
protected long
lastPollTimestamp
protected RebalanceListener
listener
protected Collection<Name>
names
protected Collection<LogPartition>
partitions
protected Queue<org.apache.kafka.clients.consumer.ConsumerRecord<String,org.apache.kafka.common.utils.Bytes>>
records
protected NameResolver
resolver
protected Collection<org.apache.kafka.common.TopicPartition>
topicPartitions
-
Constructor Summary
Constructors Modifier Constructor Description protected
KafkaLogTailer(Codec<M> codec, NameResolver resolver, Name group, Properties consumerProps)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description Collection<LogPartition>
assignments()
Returns the list of Log name, partitions tuples currently assigned to this tailer.protected static String
buildId(Name group, Collection<LogPartition> partitions)
protected static String
buildSubscribeId(Name group, Collection<Name> names)
protected void
cleanDuringRebalancing()
void
close()
boolean
closed()
Returnstrue
if the tailer has been closed.void
commit()
Commit current positions for all partitions (last message offset returned by read).LogOffset
commit(LogPartition partition)
Commit current position for the partition.static <M extends Externalizable>
KafkaLogTailer<M>createAndAssign(Codec<M> codec, NameResolver resolver, Collection<LogPartition> partitions, Name group, Properties consumerProps)
static <M extends Externalizable>
KafkaLogTailer<M>createAndSubscribe(Codec<M> codec, NameResolver resolver, Collection<Name> names, Name group, Properties consumerProps, RebalanceListener listener)
protected void
forceCommit()
Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsetsCodec<M>
getCodec()
Returns the codec used to read the records.Name
group()
Returns the consumer group.LogOffset
offsetForTimestamp(LogPartition partition, long timestamp)
Look up the offset for the given partition by timestamp.void
onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> newPartitions)
void
onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions)
void
onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
protected int
poll(Duration timeout)
LogRecord<M>
read(Duration timeout)
Read a message from assigned partitions within the timeout.void
reset()
Reset all committed positions for this group, next read will be done from beginning.void
reset(LogPartition partition)
Reset the committed position for this group on this partition, next read for this partition will be done from the beginning.void
seek(LogOffset offset)
Set the current position for a single partition.void
toEnd()
Set the current positions to the end of all partitions.void
toLastCommitted()
Set the current positions to previously committed positions.protected long
toLastCommitted(org.apache.kafka.common.TopicPartition topicPartition)
void
toStart()
Set the current positions to the beginning of all partitions.String
toString()
-
-
-
Field Detail
-
group
protected final Name group
-
lastCommittedOffsets
protected final Map<org.apache.kafka.common.TopicPartition,Long> lastCommittedOffsets
-
records
protected final Queue<org.apache.kafka.clients.consumer.ConsumerRecord<String,org.apache.kafka.common.utils.Bytes>> records
-
codec
protected final Codec<M extends Externalizable> codec
-
decodeCodec
protected final Codec<M extends Externalizable> decodeCodec
-
resolver
protected final NameResolver resolver
-
consumer
protected org.apache.kafka.clients.consumer.KafkaConsumer<String,org.apache.kafka.common.utils.Bytes> consumer
-
id
protected String id
-
topicPartitions
protected Collection<org.apache.kafka.common.TopicPartition> topicPartitions
-
partitions
protected Collection<LogPartition> partitions
-
closed
protected boolean closed
-
names
protected Collection<Name> names
-
listener
protected RebalanceListener listener
-
isRebalanced
protected boolean isRebalanced
-
isRevoked
protected boolean isRevoked
-
isLost
protected boolean isLost
-
consumerMoved
protected boolean consumerMoved
-
CONSUMER_CLIENT_ID_SEQUENCE
protected static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE
-
lastPollTimestamp
protected long lastPollTimestamp
-
lastPollSize
protected int lastPollSize
-
-
Constructor Detail
-
KafkaLogTailer
protected KafkaLogTailer(Codec<M> codec, NameResolver resolver, Name group, Properties consumerProps)
-
-
Method Detail
-
createAndAssign
public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec, NameResolver resolver, Collection<LogPartition> partitions, Name group, Properties consumerProps)
-
createAndSubscribe
public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec, NameResolver resolver, Collection<Name> names, Name group, Properties consumerProps, RebalanceListener listener)
-
buildId
protected static String buildId(Name group, Collection<LogPartition> partitions)
-
buildSubscribeId
protected static String buildSubscribeId(Name group, Collection<Name> names)
-
read
public LogRecord<M> read(Duration timeout) throws InterruptedException
Description copied from interface:LogTailer
Read a message from assigned partitions within the timeout.- Specified by:
read
in interfaceLogTailer<M extends Externalizable>
- Returns:
- null if there is no message in the queue after the timeout.
- Throws:
InterruptedException
-
poll
protected int poll(Duration timeout) throws InterruptedException
- Throws:
InterruptedException
-
toEnd
public void toEnd()
Description copied from interface:LogTailer
Set the current positions to the end of all partitions.- Specified by:
toEnd
in interfaceLogTailer<M extends Externalizable>
-
toStart
public void toStart()
Description copied from interface:LogTailer
Set the current positions to the beginning of all partitions.- Specified by:
toStart
in interfaceLogTailer<M extends Externalizable>
-
toLastCommitted
public void toLastCommitted()
Description copied from interface:LogTailer
Set the current positions to previously committed positions.- Specified by:
toLastCommitted
in interfaceLogTailer<M extends Externalizable>
-
toLastCommitted
protected long toLastCommitted(org.apache.kafka.common.TopicPartition topicPartition)
-
seek
public void seek(LogOffset offset)
Description copied from interface:LogTailer
Set the current position for a single partition. Do not change other partitions positions.- Specified by:
seek
in interfaceLogTailer<M extends Externalizable>
-
reset
public void reset()
Description copied from interface:LogTailer
Reset all committed positions for this group, next read will be done from beginning.- Specified by:
reset
in interfaceLogTailer<M extends Externalizable>
-
reset
public void reset(LogPartition partition)
Description copied from interface:LogTailer
Reset the committed position for this group on this partition, next read for this partition will be done from the beginning.- Specified by:
reset
in interfaceLogTailer<M extends Externalizable>
-
offsetForTimestamp
public LogOffset offsetForTimestamp(LogPartition partition, long timestamp)
Description copied from interface:LogTailer
Look up the offset for the given partition by timestamp. The position is the earliest offset whose timestamp is greater than or equal to the given timestamp.The timestamp used depends on the implementation, for Kafka this is the LogAppendTime. Returns null if no record offset is found with an appropriate timestamp.
- Specified by:
offsetForTimestamp
in interfaceLogTailer<M extends Externalizable>
-
commit
public void commit()
Description copied from interface:LogTailer
Commit current positions for all partitions (last message offset returned by read).- Specified by:
commit
in interfaceLogTailer<M extends Externalizable>
-
forceCommit
protected void forceCommit()
Commits the consumer at its current position regardless of lastOffsets or lastCommittedOffsets
-
commit
public LogOffset commit(LogPartition partition)
Description copied from interface:LogTailer
Commit current position for the partition.- Specified by:
commit
in interfaceLogTailer<M extends Externalizable>
- Returns:
- the committed offset, can return null if there was no previous read done on this partition.
-
assignments
public Collection<LogPartition> assignments()
Description copied from interface:LogTailer
Returns the list of Log name, partitions tuples currently assigned to this tailer. Assignments can change only if the tailer has been created usingLogManager.subscribe(org.nuxeo.lib.stream.log.Name, java.util.Collection<org.nuxeo.lib.stream.log.Name>, org.nuxeo.lib.stream.log.RebalanceListener, org.nuxeo.lib.stream.codec.Codec<M>)
.- Specified by:
assignments
in interfaceLogTailer<M extends Externalizable>
-
group
public Name group()
Description copied from interface:LogTailer
Returns the consumer group.- Specified by:
group
in interfaceLogTailer<M extends Externalizable>
-
closed
public boolean closed()
Description copied from interface:LogTailer
Returnstrue
if the tailer has been closed.- Specified by:
closed
in interfaceLogTailer<M extends Externalizable>
-
getCodec
public Codec<M> getCodec()
Description copied from interface:LogTailer
Returns the codec used to read the records. A null codec is the default legacy encoding.- Specified by:
getCodec
in interfaceLogTailer<M extends Externalizable>
-
close
public void close()
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceLogTailer<M extends Externalizable>
-
onPartitionsRevoked
public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
- Specified by:
onPartitionsRevoked
in interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
-
onPartitionsAssigned
public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> newPartitions)
- Specified by:
onPartitionsAssigned
in interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
-
onPartitionsLost
public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions)
- Specified by:
onPartitionsLost
in interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
-
cleanDuringRebalancing
protected void cleanDuringRebalancing()
-
-