Important Notice: CloudKarafka is shutting down. Read all about it in our End of Life Announcement

Handling Consumer Side Failures in Apache Kafka

Written by Fabio Pardi

One of the most powerful aspects of Kafka is its ability to guarantee data persistence. In order to leverage this powerful feature, it is important to have a deep understanding on how it works and avoid mistakes that might put our data at stake.

The importance of data persistence

Data persistence has been a priority since data storage on media was invented. Anything can break, including storage media. Fortunately, Kafka has powerful data persistence features, but it’s best to understand how to leverage them to avoid mistakes that might jeopardize the data.

Over the years, many solutions have been provided to tackle persistence in several ways. Creating a RAID array for instance, or a data backup that gets sent to another continent are two common solutions.

Another popular solution is described in RFC1149. While internet transmissions over avian carriers suffer from poor latency, the payload they can transmit is very high, which makes them a viable solution when the backup size is considerable.

Kafka is a distributed system and can take care of persistence for us if properly configured and used. A real DevOps effort is required to guarantee success.

Three Kafka brokers

The first step is to have at least three Kafka brokers, which allows for guaranteed persistence as well as high availability with a tolerance of one missing Kafka broker.

With three brokers in place, topics can be created with a replication factor of three. The replication factor can be specified when the topics are created. Or, another way to set the replication factor is via the cluster setting default.replication.factor with auto.create.topics.enable=true

In this last case, clients do not need to create the topic, but it is enough to push data to a new topic where Kafka will create it based on the settings specified. Both methods will give the same result, essentially telling Kafka to keep three copies of the data spread over three different brokers.

If there are more than three brokers, Kafka will choose on which brokers the copies will be hosted. Partitions also play a role here, but that topic is more complicated and best saved for another article.

For now, there are three Kafka brokers holding one copy of the topic each. The topic will be served by one broker known as the leader and the copies hosted on the other brokers will be called replicas. All data sent to the topic will be accepted by the leader while the replicas will make sure that the important information is replicated.

Note that there will be a delay between the instant data is sent to the leader and when it is received by the replicas. If the delay is not too long, then the replicas will be in sync.

To get a real-time overview of the current situation, just run:

kafka-topics.sh  --bootstrap-server IP:port --describe
Topic: my_new_topic    PartitionCount: 1    ReplicationFactor: 3    Configs:
Topic: my_new_topic    Partition: 0    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2

Running this simple command will retrieve a load of information:

  • The topic name is: my_new_topic, with 1 partition, replicated 3 times over brokers 1,0,2
  • The data will be sent to broker 1 and then replicated to brokers 0 and 2.
  • If broker 1 will be unavailable, the first designated broker to take over will be broker 0.
  • All (two) replicas are in sync with master (Isr: In Sync Replicas).

When a broker cannot keep up with the messages sent to the leader or when it is unavailable, it will be removed from 'Isr'. To make sure that data sent to the producer is not lost even in case the leader goes down, this is when to enforce persistence.

Apache Kafka Producer configuration, ACKS

Now is the time for the 'acks' setting on the producer side to come into play. Acks were mentioned in an earlier article, Apache Kafka Idempotent Producer - Avoiding message duplication, and this is the deep dive into the topic. Acks tell the producer how sent messages should be acknowledged by the leader. There are three possible settings:

  • 0: Do not wait for any acknowledgment. Messages are 'just' sent. In this case, data may be lost, but it is a good use when persistence isn’t necessary.
  • 1: Mark a message as sent when the leader confirms that it has been received and written to its local log.
  • all (AKA ‘-1’): Consider the message as sent when the leader and ISR send back a confirmation. Use all or -1, which means the same thing.

Note that "written to its local log" does not necessarily mean "written to local disk". Disk flushes are delegated to the OS unless the data sync is forced explicitly by Kafka setting log.flush.interval.messages and log.flush.interval.ms.

Usage of such settings (which default to very high values, AKA: not in use) is discouraged by Kafka but it is something to consider when writing down the architecture of the Kafka cluster. For instance, when sending Kafka sensor readings, acks=0 is usually acceptable. However, when using Kafka to record banking transactions, it would be best to use acks=all then.

Using acks=all will always make sure the data is copied to as many replicas as are in sync. Also, there cannot be a scenario where a replica can fall behind when the producer is sending data too fast because every sent message needs acknowledgment from all the ISR before the next one is sent.

The first consideration, in this case, is that the latency between brokers will play a bigger role. The furthest (in terms of network latency) broker will slow down the producer.

The second consideration is that the replicas can go out of sync for multiple reasons. Most commonly when they are down or unreachable, which could end in the following:

kafka-topics.sh  --bootstrap-server IP:port --describe
Topic: my_new_topic    PartitionCount: 1    ReplicationFactor: 3    Configs:
Topic: my_new_topic    Partition: 0    Leader: 1    Replicas: 1,0,2    Isr: 1

Data is sent to broker 1 using acks=all and acknowledgments come only from all ISR, therefore only from broker 1. Then acks=all will behave the same as acks=1 But what if broker 1 also breaks? In that case, there will be no in-sync copies of the data, which defeats the whole purpose.

Consider a topic configured with replication factor=3 and min.insync.replicas=2 and assume data is produced using acks=all.

When all designed replicas are in sync or when one replica is not available, everything will be fine. However, if more than one replica isn’t available, then new data will not be accepted and an error, namely ‘NOT_ENOUGH_REPLICAS’ will be returned.

Having min.insync.replicas < 2 is not considered safe for data consistency because the leader might suffer an outage and lose all the data, therefore it should be always avoided. Also, when not enough in-sync replicas are available, it is not possible to produce to the partition suffering the outage, affecting availability.

Cloudkarafka uses the settings that best support the general-purpose usage of Kafka, all depending on the number of nodes that you select for your cluster. However, you are allowed to configure almost any setting to optimize the cluster according to your needs. Huge price cuts has also been done on our current plans! Read more about all price changes