Skip to content

Consume from Kafka

Alex Golshani edited this page Jul 25, 2020 · 8 revisions

Trubka is a general purpose Kafka consumer which can be used to consume plain text, protocol buffer or any arbitrary stream of bytes from Kafka.

Consuming Plain Text

consume plain command can be used to consume plain text or any byte stream from Kafka regardless of the serialisation algorithm of the content.

$> trubka consume plain <topic name> --brokers <broker:port>

Consuming Protobuf

consume proto command on the other hand is super opinionated about the bytes it consumes. It only accepts the probuf bytes of a specific proto contract to come through. In order to consume protocol buffer events from Kafka you need to tell Trubka:

  • Where your protocol buffer files (*.proto) live (--proto-root flag).
  • What topic you want to consume from (the first argument of the command).
  • And what contract it needs to use to deserialise the bytes into.
$> trubka consume proto <topic name> <contract> --proto-root <dir> --brokers <broker:port>

Keep in mind that you need the specify the fully qualified name of the contract. For example, if you have a protocol buffer message called EventHappened which lives in the contracts package and gets published to a topic named Events on your local machine, you must use the following command to start consuming:

$> trubka consume proto Events contracts.EventHappened \
--proto-root /some_dir_on_your_disk --brokers localhost:9092

If the topic name is exactly the same as the fully qualified name of your schema (contracts.EventHappened in the above example), <contract> argument can be dropped. So you can run the command as:

$> trubka consume proto contracts.EventHappened \
--proto-root /some_dir_on_your_disk --brokers localhost:9092

Common Consuming Tips

Searching Messages

You can optionally define a regular expression using the --search-query(-q) flag to filter the messages consumed from Kafka. It's simply a string match on the string representation of the de-serialised message content (if applicable). If mixed with the --reverse flag, the messages which match the search query will be filtered out.

Offset Storage

Trubka stores the offsets locally on the disk. So, there will be no consumer group for Trubka maintained by your Kafka cluster which makes it safe to test and troubleshoot production environments without polluting the internal __consumer_offsets topic.

Speaking of production, most likely we all work in companies with different environments (local, staging, production etc). In order to make it easier to use Trubka in such an environment, the consumers store the offsets under the --environment (default local) directory on the disk. You can see where on the disk the offsets are being stored by running the list local-topics -v sub command.

Offset Management

By default Trubka starts consuming from the latest offset of the specified topic(s). You can explicitly ask Trubka to start consuming from a specific offset using the --from flag or stop at --to. Both start and stop limits are inclusive, so --form 10, --to 15 will include the messages with offsets 10,11,12,13,14 and 15.

The from value may be set to:

  • local: The latest offset stored locally for each partition within the topic.
  • newest (default): Starts consuming from the end of the stream.
  • oldest: Starts consuming from the beginning of the stream.
  • timestamp: Starts consuming from the most recent available offset at the given time.
  • An explicit offset value (eg. 18000).
  • Partition specific timestamp/explicit offsets (eg 10#18000 OR 8#2020-06-29)

A note on the timestamp based start offset requests

Kafka only stores a single timestamp per log segment file, so the start offset returned from the server for the provided time parameter is an approximation.

The to value may be set to:

  • timestamp: Stops consuming once the timestamp of the messages passed the provided time.
  • An explicit offset value (eg. 25000).
  • Partition specific timestamp/explicit offsets (eg 10#25000 OR 8#2020-06-30)
Examples

trubka consume ... --from oldest --to 25000

Starts consuming from the oldest offsets up to offset 25000 of all the partitions. If any of the partitions does not have a message at offset 25000, the consuming will not stop, unless --idle-timeout has been set.

trubka consume ... --from 2020-06-29T10:20:00+10:00 --to 2020-06-29T12:50:30+10:00

Starts consuming from the most recent available offset at 2020-06-29T10:20:00+10:00 and stops at 2020-06-29T12:50:30+10:00. If any of the partitions does not have a message after 2020-06-29T12:50:30+10:00, the consuming will not stop, unless --idle-timeout has been set.

trubka consume ... --from local --to 2020-06-29T12:50:30+10:00

Starts consuming from the latest offset stored locally for each partition within the topic and stops at 2020-06-29T12:50:30+10:00.

trubka consume ... --from 0#100 --to 0#200 --from 500 --to 1000

Starts consuming from offset 100 of partition 0 and stops at 200. For other partitions, starts from 500 up to offset 1000.

trubka consume ... --from 0#100 --to 0#200

Starts consuming from offset 100 of partition 0 and stops at 200. For other partitions, starts from the newest, but it never stops until cancelled by the user.

trubka consume ... --from 0#100 --to 0#200 --exclusive

Starts consuming from offset 100 of partition 0 and stops at 200, but does not consume from any other partitions.

Notes

  • If you are consuming from multiple partitions with stop offsets/timestamps, the consumer will only stop if:
    • All the partitions reach the specified stop checkpoint OR
    • No message has been delivered to the partition(s) in the provided --idle-timeout (eg --idle-timeout 10s) period.
  • If you need to consume from the specified partitions only (defined using Partition#Checkpoint syntax), you should set the --exclusive flag to exclude all the other partitions from being consumed.
  • If you ask for an offset explicitly (eg. 1000), you need to make sure that the requested offset is valid on the server, otherwise the request will be rejected by Kafka.
  • trubka has no idea about the timezone of your Kafka cluster. In case you don't know either, I would recommend you consume a few messages using -S flag to find out what timezone the messages are stored in on the server. You must use the same timezone when specifying time based --from and --to flags.

Output Redirection

Trubka writes the consumed bytes into your terminal's buffer (stdout) by default. If you need to store the bytes into a file you can either redirect the output (> | >>) or use the consume command's --output-dir flag to specify the directory in which a single file per topic will be created to store the stream content.

Output formats and encodings

Both proto and plain text consumers are capable of formatting/encoding the Kafka event bytes before writing them to the output. You can change the format/encoder using the --format (-F) flag. For the proto consumer command, the default is json-indent and for the plain consumer it is plain. The other available encoders are hex and base64. For proto consumer, json is also a valid option which will instruct Trubka to spit out unformatted json string to the output.

You can also ask the plain consumer to automatically decode the input bytes from hex or base64 into plain text using --decode-from (-D) flag. The default value is plain which tells Trubka not to touch the input bytes of the events.

Interactive mode

Trubka can also be executed in interactive mode using the -i flag. Interactive mode walks you though the steps of picking the topic and the proto message type (for consume proto command) from a list of existing topics fetched from the server, and a list of protocol buffer messages, living in the --proto-root directory (for consume proto command).

In case you have too many topics on the server, or the list of protocol buffer contracts is too long, you can filter them out using --topic-filter or --proto-filter flag respectively. Both flags are regular expressions.

Interactive Proto Consumer Example
$> trubka consume proto --proto-root <dir> --brokers <broker:port> \ 
--topic-filter Event --proto-filter Happened -i
Interactive Plain Text Consumer Example
$> trubka consume plain --brokers <broker:port> --topic-filter Notifications -i

The main advantage of the interactive mode is the ability of consuming from multiple topics at the same time. You can also explicitly define for each topic, which offset you want to start to consume from. If you need to specify offsets, you need to run Trubka in --interactive-with-offset (-I) mode, instead of --interactive (-i).