kafka
The Kafka connectors kafka_consumer
and kafka_producer
provide integration with Apache Kafka and compatible
products such as Confluent Kafka and Redpanda. Consuming from Kafka and producing to Kafka are handled by two separate connectors.
Both Kafka connectors in Tremor are built on top of [librdkafka] version 1.8.2 and expose the full complement
of configuration settings. Care SHOULD be
taken when configuring kafka
with tremor to ensure that the configuration settings make sense given the logic
required of the resulting system.
kafka_consumer
To consume from kafka, one needs to define a connector from kafka_consumer
.
Configuration
It supports the following configuration options:
Option | Description | Type | Required | Default Value |
---|---|---|---|---|
group_id | The consumer group id to register with to the kafka cluster. Corresponds to the librdkafka group.id setting. | string | yes | |
topics | The topics to consumer from. | list of strings | yes | |
brokers | URLs to the cluster bootstrap servers to connect to. Corresponds to the librdkafka bootstrap.servers setting. | list of strings | yes | |
mode | Determines the working mode of the connector. The following modes are supported: performance (Default), transactional and custom . See below for more details. | See below. | yes | "performance" |
Mode
Performance
The mode describes how the connector will consume from kafka. Using the mode "performance"
, this connector will automatically store all offsets of messages it receives and commit them to the broker every 5 secons. This mode has the lowest overhead and is best suited for performance-sensitive applications where a single missed message or a failed message isn ot a big deal. This is the default mode.
This mode essentially sets the following librdkafka configuration options:
Example:
define connector perf_consumer from kafka_consumer
with
codec = "json",
config = {
"brokers": [
"localhost:9092"
],
"group_id": "my_consumer_group",
"topics": [
"my_topic"
],
"mode": "performance"
}
end;
Transactional
The mode "transactional"
is for workloads where each and every kafka message needs to be handled successfully in order to make progress. In this mode, the offset for every message
is only stored once it has been handled successfully by a downstream connector. The setting commit_interval
of the mode determines how often the offsets are committed to the kafka cluster.
The default is 5 seconds. If this setting is set to 0
, the connector will immediately commit every single event offset directly to the kafka cluster. This will lead to lots of traffic towards the group-coordinator, and should be avoided for high volume loads. This is the safest setting though.
Failed events will be replayed. If an event fails the partition offset of the event is reset, so the consumer will start consuming again from that point. This can lead to consuming kafka messages more than once, but it guarantees at-least-once delivery in the face of failing events.
Example:
use std::time::nanos;
define connector transactional_consumer from kafka_consumer
with
codec = "json",
config = {
"brokers": [
"localhost:9092"
],
"group_id": "my_consumer_group",
"topics": [
"my_topic"
],
"mode": {
"transactional": {
# this setting can be ommitted and defaults to 5 seconds
# if set to `0`, the connector commits offsets immediately
"commit_interval": nanos::from_seconds(1)
}
}
}
end;
Custom
The mode custom
allows custom configuration of the connector and the underlying librdkafka
. It contains two settings:
rdkafka_options
: librdkafka configuration options. For possible options consult the librdkafka configuration options. The optionsgroup.id
,client.id
andbootstrap.servers
are set from the connector config and cannot be overwritten.retry_failed_events
: If set totrue
this connector will behave as intransactional
mode and reset the offset for every failed kafka message, effectively retrying it. Default:false
In order to customize your settings, it might be useful to know how the different modes can be achieved using the custom
mode and adapting it to your needs.
Transactional mode translates to the following rdkafka_options
:
{
"mode": {
"custom": {
"rdkafka_options": {
"enable.auto.commit": true,
"enable.auto.offset.store": false,
"auto.commit.interval.ms": 5000
},
"retry_failed_events": true
}
}
}
Performance mode translates to the following rdkafka_options
:
{
"mode": {
"custom": {
"rdkafka_options": {
"enable.auto.commit": true,
"enable.auto.offset.store": true,
"auto.commit.interval.ms": 5000
},
"retry_failed_events": false
}
}
}
For detailed semantics on how the consumer behaves with which settings, please consult the librdkafka configuration options.
Example configuration for kafka_consumer
:
use std::time::nanos;
define connector consumer from kafka_consumer
with
metrics_interval_s = 1,
reconnect = {
"retry": {
"interval_ms": 3000,
"max_retries": 10
}
},
codec = "json",
# Kafka specific consumer configuration
config = {
# List of broker bootstrap servers
"brokers": [
"127.0.0.1:9092"
],
"group_id": "test1", # Consumer group id
# required - list of subscription topics to register with
"topics": [
"tremor_test"
],
"mode": {
"custom": {
# Whether or not to retry failed attempts
# When true - resets the offset to a failed message for retry
# - Warning: where persistent failure is expected, this will lead to persistent errors
# When false - Only commits offset for a successful acknowledgement
"retry_failed_events": false,
# librdkafka configuration settings ( indicative illustrative example )
"rdkafka_options": {
"enable.auto.commit": "false", # this will only commit a message offset if the event has been handled successfully
"auto.commit.interval.ms": 5000, # this will auto-commit the current offset every 5s
"enable.auto.offset.store": true,
"debug": "consumer" # enable librdkafka debug logging of the consumer
"enable.partition.eof": false, # do not send an EOF if a partition becomes empty
"auto.offset.reset": "beginning", # always start consuming from the beginning of all partitions
}
}
}
}
end;
Event Metadata
Events consumed from a kafka_consumer
connector will have the following event metadata:
{
"kafka_consumer": {
"key": ..., # binary message key
"headers": {
"kafka_message_header": ..., # binary header value
...
},
"topic": "my_topic", # topic name
"partition": 1, # numeric parition id of the message
"offset": 12, # numeric message offset in the given partition
"timestamp": 1230000000 # optional message timestamp in nanoseconds
}
}
It can be accessed in scripts or select
statements in the following way:
match $ of
case %{ present kafka_consumer } => $kafka_consumer.partition
default => -1 # invalid partition
end;
kafka_producer
To produce events as kafka messages, the a connector needs to be defined from the kafka_producer
connector type.
Configuration
It supports the following configuration options:
Option | Description | Type | Required | Default Value |
---|---|---|---|---|
brokers | URLs to the cluster bootstrap servers to connect to. Corresponds to the librdkafka bootstrap.servers setting. | list of strings | yes | |
topic | The topic to produce events to. | string | yes | |
key | The message key to add to the produced kafka messages. Can be overwritten by event metadata value $kafka_producer.key . | string | no | |
rdkafka_options | librdkafka configuration. For possible options consult the librdkafka configuration docs. | json record with string values | no | By default only client.id and bootstrap.servers is set. |
Example configuration for kafka_producer
:
define connector producer from kafka_producer
with
# Enables metrics at a 1 second interval
metrics_interval_s = 1,
# event payload is serialized to JSON
codec = "json",
# Kafka specific producer configuration
config = {
# List of broker bootstrap servers
"brokers": [
"127.0.0.1:9092",
],
# the topic to send to
"topic": "tremor_test"
}
end;
Event Metadata
To control how the kafka_producer
produces events as kafka messages, the following metadata options are available:
let $kafka_producer = {
"key": "message_key", # kafka message key as string or bytes
"headers": { # message headers
"my_bytes_header": <<"badger"/binary>>,
"my_string_header": "string"
},
"timestamp": 12345, # message timestamp
"partition": 3 # numeric partition id to publish message on
};
It is important to provide the metadata options underneath the key kafka_producer
, otherwise they will be ignored.