Kafka configuration and integration with Nuxeo.

When to Use Kafka?

Since Nuxeo 10.10 it is highly recommended to use Kafka when running Nuxeo in cluster mode:

  • Nuxeo Stream introduced in Nuxeo 9.3 requires Kafka to run in a distributed way. Kafka will act as a message broker and enable reliable distributed processing by handling failover between nodes.

    Without Kafka, Nuxeo Stream relies on local storage using Chronicle Queue:

    • the processing is not distributed among nodes
    • there is no cluster wide metrics to follow processing progress
    • the chronicle queue files need to be backup on each node
  • The Nuxeo Bulk Service introduced in Nuxeo 10.10 relies on Nuxeo Stream and therefore requires Kafka to work in a distributed way.

Other reasons to use Kafka:

  • The WorkManager can be configured to use Nuxeo Stream and go beyond the boundaries of Redis by not being limited by memory.

  • To get rid of Redis deployment.

  • To gain interoperability using Kafka topic and Avro messaging.

Kafka Setup

Nuxeo only need to talk with Kafka brokers, it does not need to have access to Zookeeper.

Here is the compatibility versions:

Nuxeo Platform Version: LTS 2019 LTS 2017
Kafka Library: 2.1.1 scala 2.12 since HF15
Library: 2.1.0 scala 2.12 from HF01 to HF14
Cluster: 1.x, 2.x
Library: 1.0.0 scala 2.11
Cluster: 1.x, 2.0.0

Kafka broker need to be tuned a bit:

Kafka broker options default recommended Description
offsets.retention.minutes 1440 20160 The default offset retention is only 1 day, without activity for this amount of time the current consumer offset position is lost and all messages will be reprocessed. To prevent this we recommend to use a value 2 times bigger as log.retention.hours, so by default 14 days or 20160. See KAFKA-3806 for more information.
log.retention.hours 168 The default log retention is 7 days. If you change this make sure you update offset.retention.minutes.

Make sure that you set properly the offsets.retention.minutes.

Kafka Configuration

Access, consumer and producer properties are registered using the Nuxeo KafkaConfigService extension point:

<?xml version="1.0"?>
<component name="my.project.kafka.contrib">
  <extension target="org.nuxeo.runtime.stream.kafka.service" point="kafkaConfig">
    <kafkaConfig name="default" topicPrefix="nuxeo-">
      <producer>
        <property name="bootstrap.servers">localhost:9092</property>
      </producer>
      <consumer>
        <property name="bootstrap.servers">localhost:9092</property>
        <property name="request.timeout.ms">65000</property>
        <property name="max.poll.interval.ms">60000</property>
        <property name="session.timeout.ms">20000</property>
        <property name="heartbeat.interval.ms">1000</property>
        <property name="max.poll.records">50</property>
      </consumer>
    </kafkaConfig>
  </extension>
</component>

Here are some important properties:

Consumer options default Description
bootstrap.servers localhost:9092 A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
enable.auto.commit false The module manages the offset commit this is always set to false.
auto.offset.reset earliest This option is always set to earliest
request.timeout.ms 3605000 Requests timeout between client and Kafka brokers.
max.poll.interval.ms 3600000 Consumers that don't call poll during this delay are removed from the group.
session.timeout.ms 50000 Consumers that don't send heartbeat during this delay are removed from the group.
heartbeat.interval.ms 2000 Interval between heartbeats.
max.poll.records 2 Can be adjusted to make sure the poll interval is respected.
group.initial.rebalance.delay.ms 3000 Delay for the initial consumer rebalance.
subscribe.disable false Not a Kafka option, used by the module to disable the dynamic assignment, when this option is true LogManager will only support static partition assignment.
Producer options default Description
acks 1 The number of acknowledgments the producer requires the leader to have received before considering a request complete.
compression.type none Valid values are none, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
default.replication.factor 1 Not a Kafka option, used by Nuxeo to set the topic replication factor when creating new topic.

Most of the above properties can be tuned directly from nuxeo.conf file.

Make sure that you set properly the default.replication.factor, the default value is 1 which means NO replication. With replication factor N, Kafka will tolerate up to N-1 server failures without losing record. For instance if you have 3 brokers in your cluster a replication factor of 2 will tolerate a server failure.

It is important to adapt the max.poll.interval.ms for slow consumers; otherwise, you will encounter errors like:

ERROR [ComputationRunner] compliance: Exception in processLoop: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

For instance, this will happen when using the StreamWorkManager if a Work takes more than 1h.

Please refer to the Kafka documentation about the consumer and producer options and replication for more information.

When Kafka is used by the PubSub Provider, the topic retention can be reduced to few hours because PubSub is used to send instant messages, this can be done at the Kafka level using the following command:

$KAFKA_HOME/bin/kafka-configs.sh --zookeeper <zk_host> --alter --entity-type topics --entity-name nuxeo-pubsub --add-config retention.ms=7200000

We'd love to hear your thoughts!

All fields required