omkafka: write to Apache Kafka¶
Module Name: | omkafka |
Author: | Rainer Gerhards <rgerhards@adiscon.com> |
Available since: | v8.7.0 |
Purpose¶
The omkafka plug-in implements an Apache Kafka producer, permitting rsyslog to write data to Kafka.
Configuration Parameters¶
Note
Parameter names are case-insensitive.
Action Parameters¶
Note that omkafka supports some Array-type parameters. While the parameter name can only be set once, it is possible to set multiple values with that single parameter. See the Using Array Type Parameter section for details.
Broker¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
array | localhost:9092 | no | none |
Specifies the broker(s) to use.
Topic¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
string | none | yes | none |
Specifies the topic to produce to.
Key¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
word | none | no | none |
Kafka key to be used for all messages.
If a key is provided and partitions.auto=”on” is set, then all messages will be assigned to a partition based on the key.
DynaKey¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
binary | off | no | none |
If set, the key parameter becomes a template for the key to base the partitioning on.
DynaTopic¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
binary | off | no | none |
If set, the topic parameter becomes a template for which topic to produce messages to. The cache is cleared on HUP.
DynaTopic.Cachesize¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
integer | 50 | no | none |
If set, defines the number of topics that will be kept in the dynatopic cache.
Partitions.Auto¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
binary | off | no | none |
Librdkafka provides an automatic partitioning function that will automatically distribute the produced messages into all partitions configured for that topic.
To use, set partitions.auto=”on”. This is instead of specifying the number of partitions on the producer side, where it would be easier to change the kafka configuration on the cluster for number of partitions/topic vs on every machine talking to Kafka via rsyslog.
If no key is set, messages will be distributed randomly across partitions. This results in a very even load on all partitions, but does not preserve ordering between the messages.
If a key is set, a partition will be chosen automatically based on it. All messages with the same key will be sorted into the same partition, preserving their ordering. For example, by setting the key to the hostname, messages from a specific host will be written to one partition and ordered, but messages from different nodes will be distributed across different partitions. This distribution is essentially random, but stable. If the number of different keys is much larger than the number of partitions on the topic, load will be distributed fairly evenly.
If set, it will override any other partitioning scheme configured.
Partitions.number¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
integer | none | no | none |
If set, specifies how many partitions exists and activates load-balancing among them. Messages are distributed more or less evenly between the partitions. Note that the number specified must be correct. Otherwise, some errors may occur or some partitions may never receive data.
Partitions.useFixed¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
integer | none | no | none |
If set, specifies the partition to which data is produced. All data goes to this partition, no other partition is ever involved for this action.
errorFile¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
word | none | no | none |
If set, messages that could not be sent and caused an error messages are written to the file specified. This file is in JSON format, with a single record being written for each message in error. The entry contains the full message, as well as Kafka error number and reason string.
The idea behind the error file is that the admin can periodically run a script that reads the error file and reacts on it. Note that the error file is kept open from when the first error occured up until rsyslog is terminated or received a HUP signal.
statsFile¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
word | none | no | none |
If set, the contents of the JSON object containing the full librdkafka statistics will be written to the file specified. The file will be updated based on the statistics.interval.ms confparam value, which must also be set.
ConfParam¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
array | none | no | none |
Permits to specify Kafka options. Rather than offering a myriad of config settings to match the Kafka parameters, we provide this setting here as a vehicle to set any Kafka parameter. This has the big advantage that Kafka parameters that come up in new releases can immediately be used.
Note that we use librdkafka for the Kafka connection, so the parameters are actually those that librdkafka supports. As of our understanding, this is a superset of the native Kafka parameters.
TopicConfParam¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
array | none | no | none |
In essence the same as confParam, but for the Kafka topic.
Template¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
word | template set via template module parameter | no | none |
Sets the template to be used for this action.
closeTimeout¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
integer | 2000 | no | none |
Sets the time to wait in ms (milliseconds) for draining messages submitted to kafka-handle (provided by librdkafka) before closing it.
The maximum value of closeTimeout used across all omkafka action instances is used as librdkafka unload-timeout while unloading the module (for shutdown, for instance).
resubmitOnFailure¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
binary | off | no | none |
New in version 8.28.0.
If enabled, failed messages will be resubmit automatically when kafka is able to send messages again. To prevent message loss, this option should be enabled.
Note: Messages that are rejected by kafka due to exceeding the maximum configured message size, are automatically dropped. These errors are not retriable.
KeepFailedMessages¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
binary | off | no | none |
If enabled, failed messages will be saved and loaded on shutdown/startup and resend after startup if the kafka server is able to receive messages again. This setting requires resubmitOnFailure to be enabled as well.
failedMsgFile¶
type | default | mandatory | obsolete legacy directive |
---|---|---|---|
word | none | no | none |
New in version 8.28.0.
Filename where the failed messages should be stored into. Needs to be set when keepFailedMessages is enabled, otherwise failed messages won’t be saved.
Statistic Counter¶
This plugin maintains global statistics for omkafka that accumulate all action instances. The statistic origin is named “omafka” with following counters:
- submitted - number of messages submitted to omkafka for processing (with both acknowledged deliveries to broker as well as failed or re-submitted from omkafka to librdkafka).
- maxoutqsize - high water mark of output queue size.
- failures - number of messages that librdkafka failed to deliver. This number is broken down into counts of various types of failures.
- topicdynacache.skipped - count of dynamic topic cache lookups that find an existing topic and skip creating a new one.
- topicdynacache.miss - count of dynamic topic cache lookps that fail to find an existing topic and end up creating new ones.
- topicdynacache.evicted - count of dynamic topic cache entry evictions.
- acked - count of messages that were acknowledged by kafka broker. Note that kafka broker provides two levels of delivery acknowledgements depending on topicConfParam: default (acks=1) implies devlivery to the leader only while acks=-1 implies delivery to leader as well as replication to all brokers.
- failures_msg_too_large - count of messages dropped by librdkafka when it failed to deliver to the broker because broker considers message to be too large. Note that omkafka may still resubmit to librdkafka depending on resubmitOnFailure option.
- failures_unknown_topic - count of messages dropped by librdkafka when it failed to deliver to the broker because broker does not recognize the topic.
- failures_queue_full - count of messages dropped by librdkafka when its queue becomes full. Note that default size of librdkafka queue is 100,000 messages.
- failures_unknown_partition - count of messages that librdkafka failed to deliver becuase broker does not recognize a partition.
- failures_other - count of all of the rest of the failures that do not fall in any of the above failure categories.
- errors_timed_out - count of messages that librdkafka could not deliver within timeout. These errors will cause action to be suspended but messages can be retried depending on retry options.
- errors_transport - count of messages that librdkafka could not deliver due to transport errors. These messages can be retried depending on retry options.
- errors_broker_down - count of messages that librdkafka could not deliver because it thins that broker is not accessible. These messages can be ertried depending on options.
- errors_auth - count of messages that librdkafka could not deliver due to authentication errors. These messages can be retried depending on the options.
- errors_other - count of rest of librdkafka errors.
- rtt_avg_usec - broker round trip time in microseconds averaged over all brokers. It is based on the statistics callback window specified through statistics.interval.ms parameter to librdkafka. Averag exclude brokers with less than 100 microseconds rtt.
- throttle_avg_msec - broker throttling time in milliseconds averaged overa all brokers. This is also a part of window statistics delivered by librdkakfka. Averge excludes brokers with zero throttling time.
- int_latency_avg_usec - intranal librdkafka producer queue latency in microsconds averaged other all brokers. This is also part of window statistics and average excludes broers with zero internal latency.
Note that three window statics counters are not safe with multiple clients. When statistics callback is enabled, for example, by using statics.callback.ms=60000, omkafa will generate an internal log message every minute for the corresponing omkafka action:
2018-03-31T01:51:59.368491+00:00 app1-1.example.com rsyslogd: statscb_window_stats:
handler_name=collections.rsyslog.core#producer-1 replyq=0 msg_cnt=30 msg_size=37986 msg_max=100000
msg_size_max=1073741824 rtt_avg_usec=41475 throttle_avg_msec=0 int_latency_avg_usec=2943224 [v8.32.0]
For multiple actions using statistics callabck, there will be one such record for each action after specified window period. See https://github.com/edenhill/librdkafka/wiki/Statistics for more details on statistics callback values.
Examples¶
Using Array Type Parameter¶
Set a single value¶
For example, to select “snappy” compression, you can use:
action(type="omkafka" topic="mytopic" confParam="compression.codec=snappy")
which is equivalent to:
action(type="omkafka" topic="mytopic" confParam=["compression.codec=snappy"])
Set multiple values¶
To specify multiple values, just use the bracket notation and create a comma-delimited list of values as shown here:
action(type="omkafka" topic="mytopic"
confParam=["compression.codec=snappy",
"socket.timeout.ms=5",
"socket.keepalive.enable=true"]
)
See also
Help with configuring/using Rsyslog
:
- Mailing list - best route for general questions
- GitHub: rsyslog source project - detailed questions, reporting issues
that are believed to be bugs with
Rsyslog
- Stack Exchange (View, Ask) - experimental support from rsyslog community
See also
Contributing to Rsyslog
:
- Source project: rsyslog project README.
- Documentation: rsyslog-doc project README