Package org.nuxeo.lib.stream.log.kafka
Class KafkaUtils
- java.lang.Object
-
- org.nuxeo.lib.stream.log.kafka.KafkaUtils
-
- All Implemented Interfaces:
AutoCloseable
public class KafkaUtils extends Object implements AutoCloseable
Misc Kafka Utils- Since:
- 9.3
-
-
Field Summary
Fields Modifier and Type Field Description protected static long
ADMIN_CLIENT_CLOSE_TIMEOUT_S
protected org.apache.kafka.clients.admin.AdminClient
adminClient
protected static long
ALL_CONSUMERS_CACHE_TIMEOUT_MS
protected List<String>
allConsumers
protected long
allConsumersTime
static String
BOOTSTRAP_SERVERS_PROP
static String
DEFAULT_BOOTSTRAP_SERVERS
-
Constructor Summary
Constructors Constructor Description KafkaUtils()
KafkaUtils(Properties adminProperties)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected static List<List<LogPartition>>
assignments(org.apache.kafka.clients.consumer.ConsumerPartitionAssignor assignor, int threads, Map<String,Integer> streams)
void
close()
void
createTopic(String topic)
Creates a topic with partitions and replications that default to broker configuration.void
createTopic(String topic, int partitions)
Creates a topic with replication factor that defaults to broker configuration.void
createTopic(String topic, int partitions, short replicationFactor)
Creates a topic with the given partitions and replication.void
createTopicWithoutReplication(String topic, int partitions)
boolean
delete(String topic)
void
deleteConsumers()
Remove all existing consumers and their committed positions.void
deleteRecords(String topic)
Delete all records of a topic by moving the first offsets to end of each partition.static String
getBootstrapServers()
protected List<String>
getConsumerTopics(String group)
static Properties
getDefaultAdminProperties()
int
getNumberOfPartitions(String topic)
protected static Collection<org.apache.kafka.common.PartitionInfo>
getPartsFor(String topic, int partitions)
static boolean
kafkaDetected()
List<String>
listAllConsumers()
List<String>
listConsumers(String topic)
Set<String>
listTopics()
int
partitions(String topic)
static List<List<LogPartition>>
rangeAssignments(int threads, Map<String,Integer> streams)
static List<List<LogPartition>>
roundRobinAssignments(int threads, Map<String,Integer> streams)
boolean
topicExists(String topic)
boolean
topicReady(String topic)
protected void
waitForTopicCreation(String topic, Duration timeout)
-
-
-
Field Detail
-
BOOTSTRAP_SERVERS_PROP
public static final String BOOTSTRAP_SERVERS_PROP
- See Also:
- Constant Field Values
-
DEFAULT_BOOTSTRAP_SERVERS
public static final String DEFAULT_BOOTSTRAP_SERVERS
- See Also:
- Constant Field Values
-
adminClient
protected final org.apache.kafka.clients.admin.AdminClient adminClient
-
allConsumersTime
protected volatile long allConsumersTime
-
ALL_CONSUMERS_CACHE_TIMEOUT_MS
protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS
- See Also:
- Constant Field Values
-
ADMIN_CLIENT_CLOSE_TIMEOUT_S
protected static final long ADMIN_CLIENT_CLOSE_TIMEOUT_S
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
KafkaUtils
public KafkaUtils()
-
KafkaUtils
public KafkaUtils(Properties adminProperties)
-
-
Method Detail
-
getDefaultAdminProperties
public static Properties getDefaultAdminProperties()
-
getBootstrapServers
public static String getBootstrapServers()
-
kafkaDetected
public static boolean kafkaDetected()
-
rangeAssignments
public static List<List<LogPartition>> rangeAssignments(int threads, Map<String,Integer> streams)
-
roundRobinAssignments
public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String,Integer> streams)
-
assignments
protected static List<List<LogPartition>> assignments(org.apache.kafka.clients.consumer.ConsumerPartitionAssignor assignor, int threads, Map<String,Integer> streams)
-
getPartsFor
protected static Collection<org.apache.kafka.common.PartitionInfo> getPartsFor(String topic, int partitions)
-
createTopic
public void createTopic(String topic)
Creates a topic with partitions and replications that default to broker configuration.- Since:
- 2021.33
-
createTopic
public void createTopic(String topic, int partitions)
Creates a topic with replication factor that defaults to broker configuration.- Since:
- 2021.33
-
createTopicWithoutReplication
public void createTopicWithoutReplication(String topic, int partitions)
-
createTopic
public void createTopic(String topic, int partitions, short replicationFactor)
Creates a topic with the given partitions and replication. Since 2021.33 partitions (or replications) below 1 defaults to broker configuration.
-
topicExists
public boolean topicExists(String topic)
-
topicReady
public boolean topicReady(String topic)
-
partitions
public int partitions(String topic)
-
getNumberOfPartitions
public int getNumberOfPartitions(String topic)
-
close
public void close()
- Specified by:
close
in interfaceAutoCloseable
-
delete
public boolean delete(String topic)
-
deleteRecords
public void deleteRecords(String topic)
Delete all records of a topic by moving the first offsets to end of each partition.- Since:
- 2021.43
-
deleteConsumers
public void deleteConsumers()
Remove all existing consumers and their committed positions. For testing purpose.- Since:
- 2021.43
-
-