In most Apache Kafka use cases, message compression is used to quickly send large amounts of data. Even if this way of sending messages is recommended, you might still want to manually read the content of a compressed message for debugging or other purposes. This article will teach you how.
(If you do not have the necessary binaries installed, don't worry we've got you covered!)
You found out Kafka is the key component to make your project a success and you started using it, so welcome onboard!
Then you created a topic with its own partitions and after that, you pushed messages to it using compression (hopefully). So far, so good, but now you want to know what your topic is actually holding, right?
The reasons you might want to manually read the content of a topic can vary, but the most common reason is for debugging.
Whatever the reason, the easiest way to see the compressed messages is to use a tool called 'Kafka-console-consumer.sh'.
How to get the tool
Each Kafka installation comes with a set of utilities to make our lives easier. If you installed Kafka via the package manager, then 'Kafka-console-consumer.sh' (or sometimes the alias 'Kafka-console-consumer') is made available to users automagically on the machine it was installed on.
That's really it. You can directly type 'Kafka-console-consumer' from the machine terminal. Instead, if you installed kafka manually, you will find 'Kafka-console-consumer.sh' in the 'bin' folder of your Kafka installation.
In all other cases, when you do not have Kafka on a machine because, for instance, you want to delegate Kafka installation and maintenance to a SaaS ( like Cloudkarafka ), then you can download Kafka binaries.
To install the binaries manually, you can go to: https://kafka.apache.org/downloads and pick the latest binary releases. Once downloaded, decompress the binaries to a location that is convenient to you, i.e. '/home/john/Kafka'. All Kafka binaries will then be available in the 'bin' subfolder. So, as an example, a user named 'John' can use 'kafka-console-consumer.sh' this way:
The above is basically the universal way of installation. It requires Java and, for the sake of simplicity, we are not covering how to add the executable(s) to your PATH.
Now that you can use 'kafka-console-consumer.sh', let's have a look at how it works. For purposes of this article, it is assumed that you are inside the 'bin' folder and can run the command as:
instead of giving full PATH as
Generate test data
Synthetic data can be generated for test purposes by using:
./kafka-verifiable-producer.sh --broker-list [broker0:9092,broker1:9092..] --topic my_topic --max-messages 10
Now we have some test data and you can also by following these same steps.
In order to access the possible usage options, one can simply type the command itself with or without the '--help' flag:
The first thing the console consumer needs to know is where your Kafka cluster is. Instruct it with '--bootstrap-server [broker(s) list]'.
And finally, the topic name should be passed with '--topic my_topic'.
The complete command will result in:
./kafka-console-consumer.sh --bootstrap-server [broker0:9092,broker1:9092..] --topic my_topic
This command will consume all new messages produced since it is connected to the cluster.
If you want to see also older messages, you can use '--from-beginning' flag:
./kafka-console-consumer.sh --bootstrap-server [broker0:9092,broker1:9092..] --topic my_topic --from-beginning
You will then see the old messages generated:
4 5 6 7 8 9 0 1 2 3
A few considerations...
- When consuming from a topic with more than one partition, there is no guarantee that the messages will be served in the order they were produced. This is due to the fact that in Kafka order is only guaranteed within one single partition (see: https://www.cloudkarafka.com/blog/understanding-kafka-topics-and-partitions.html ).
- If you start reading from a topic that does not exist yet, the topic will be created automatically when the server is configured with: 'auto.topic.create = enable'.
- It is always considered good practice to run the consumer on a separate machine, and not where the broker is running.
It is very interesting and useful to pass '--property xyz' to the consumer.
For instance, it is possible to see in which partition the messages are stored:
./kafka-console-consumer.sh --bootstrap-server [broker0:9092,broker1:9092..] --topic my_topic --from-beginning --property print.partition=true
You will then see an output similar to:
Partition:19 0 Partition:19 1 Partition:19 2 Partition:19 3 Partition:8 4 Partition:8 5 Partition:8 6 Partition:8 7 Partition:8 8 Partition:8 9
This is very useful when you are testing with partitions. Are your messages where you think they are?
It is also possible to see the associated timestamp:
./kafka-console-consumer.sh --bootstrap-server [broker0:9092,broker1:9092..] --topic my_topic --from-beginning --property print.timestamp=true
CreateTime:1445412480818 0 CreateTime:1445412480836 1 CreateTime:1445412480836 2 CreateTime:1445412480836 3 CreateTime:1445412480836 4 CreateTime:1445412480841 5 CreateTime:1445412480842 6 CreateTime:1445412480842 7 CreateTime:1445412480842 8 CreateTime:1445412480842 9
Or the offset:
./kafka-console-consumer.sh --bootstrap-server [broker0:9092,broker1:9092..] --topic my_topic --from-beginning --property print.offset=true
Offset:0 0 Offset:1 1 Offset:2 2 Offset:3 3 Offset:0 4 Offset:1 5 Offset:2 6 Offset:3 7 Offset:4 8 Offset:5 9
That way, we might want to consume only messages sent since offset '3' (partition is required):
./kafka-console-consumer.sh --bootstrap-server [broker0:9092,broker1:9092..] --topic my_topic --from-beginning --offset '3' --partition 8
7 8 9
Default properties as the latest Kafka at the time of writing are:
print.timestamp=true|false print.key=true|false print.offset=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> header.deserializer=<header.deserializer>
For how to pass customized properties and more flags that can be passed to the consumer, you can refer to the manual.
Happy Consuming with Kafka!
We hope that you found this information useful. If you have any questions or concerns regarding this blog post, send an email to email@example.com