Working with Kafka Consumers

Sandesh Deshmane
6 min readJul 7, 2020

--

What is Kafka

Apache Kafka is an open-source, distributed streaming platform used to publish and subscribe to streams of records. Its fast, scalable, fault-tolerant, durable, pub-sub messaging system.

Kafka is reliable, has high throughput and good replication management.

Kafka works with Flume, Spark Streaming, Storm, HBase, Flink for real-time data ingestion, analysis, and processing of streaming data. Kafka is a data stream used to feed Hadoop BigData lakes. Kafka brokers support massive message streams for a low-latency follow-up analysis in Hadoop or Spark.

Topics and Partitions

All the data that is written to Kafka is written into different topics. A Topic is a category/feed name to which records are stored and published. Producer applications write data to topics and consumer applications read from topics. There are multiple topics created in Kafka as per requirement

Each topic is divided into multiple partitions. This means messages of a single topic would be in multiple partitions. Each of the partitions could have replicas which are the same copy.

Consumers

It’s the process that reads from Kafka. It can be a simple java program, python program, Go code, or any distributed processing framework like Spark Stream, storm, fling, or similar.

There are two types of Kafka consumer

Low-level Consumer

In the low-level consumer, where topics and partitions are specified as is the offset from which to read, either fixed position, at the beginning or at the end. This can, of course, be cumbersome to keep track of which offsets are consumed so the same records aren’t read more than once.

High-Level Consumer

The high-level consumer (more known as consumer groups) consists of one or more consumers. Here a consumer group is created by adding the property “group.id” to a consumer. Giving the same group id to another consumer means it will join the same group.

Consumer Group

A consumer group is a mechanism of grouping multiple consumers where consumers within the same group share the same group id. Data is equally divided among all the consumers of a group, with no two consumers of a group receiving the same data.

When you write Kafka consumer you add a property like below

props.setProperty(“bootstrap.servers”, “localhost:9092”);

props.setProperty(“group.id”, “test”);

So consumers with the same group id are part of the same consumer group. They will share the data from the Kafka topic. Consumers would be able to consume only from the partitions of the topic which are assigned to it by Kafka.

Partitions and Consumer Group

What happens when we start consumer with some consumer group. First Kafka checks if already a consumer is running with the same consumer group id.

If it’s a new consumer group id, it would assign all the partitions of that topic to this new consumer. If there are more than one consumer with the same group id then Kafka will divide partitions among available consumers.

If we write a new consumer group with a new group id. Then Kafka will send data to that consumer as well. The data is not shared here. Two consumers with different group id will get the same data. Normally this is done when you have multiple business logic to run on data in the Kafka

In the below example consumer z is a new consumer with different group id. Here an only a single instance of the consumer is running so all four partitions will be assigned to that consumer.

When to write the same consumer group?

When you want to increase parallel processing for your consumer application, then all individual consumers should be part of the same group.

Consumers should be part of the same group when the consumer performing an operation needs to be scaled up to process in parallel. Consumers part of the same group would be assigned with different partitions which leads to parallel processing. This is one of the ways suggested by Kafka to achieve parallel processing in consumers.

When you write a storm/spark application, The application uses a consumer id and when you increase the workers for your application it adds more consumers for your application and increases parallel processing. But you can add consumers the same as that of a number of partitions, you can not have more consumers than the number of partitions in the same consumer group. Basically partitions are assigned to one consumer.

When to write different consumer groups?

When you want to run different application/Business logic then consumers should not be part of the same consumer group. Some consumers might update the database, while other sets of consumers might do some aggregations, computations with the consumed data. In this case, we should register these consumers with different group-id. They will work independently and Kafka will manage data sharing for them

Offset management

Each message in the partition would have a unique index that is specific to the partition. This unique id is called offset. it’s normally a number.

Offset, indicates how many messages have been read by a consumer, would be maintained per consumer group-id and partition. When there are two different consumer groups, 2 different offsets would be maintained per partition. Consumers of different consumer groups can resume/pause independent of the other consumer groups. Hence, leaving no dependency between the consumers of different groups.

auto.offset.reset

This property controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid (usually because the consumer was down for so long that the record with that offset was already aged out of the broker). The default is “latest,” which means that lacking a valid offset, the consumer will start reading from the newest records (records that were written after the consumer started running). The alternative is “earliest,” which means that lacking a valid offset, the consumer will read all the data in the partition, starting from the very beginning.

enable.auto.commit

This parameter controls whether the consumer will commit offsets automatically, and defaults to true. Set it to false if you prefer to control when offsets are committed, which is necessary to minimize duplicates and avoid missing data. If you set enable.auto.commit to true, then you might also want to control how frequently offsets will be committed using auto.commit.interval.ms.

Automatic Commit

The easiest way to commit offsets is to allow your consumer to do it for you. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms.

But if your consumer restarts before 5 seconds then there are chances that you process some records again.

Automatic commits are easy to implement, but they don’t give developers enough flexibility to avoid duplicate messages.

Manual Commit

Developers want to control when they wish to commit offset back to Kafka. As normally applications read some data from Kafka, some processing and save data in the database, files, etc. So they want to commit back to Kafka only when their processing is successful.

By setting enable.auto.commit=false, offsets will only be committed when the application explicitly chooses to do so.

The simplest and most reliable of the commit APIs is commitSync(). This API will commit the latest offset returned by poll() and return once the offset is committed, throwing an exception if the commit fails for some reason.

One drawback of a manual commit is that the application is blocked until the broker responds to the commit request. This will limit the throughput of the application.

To avoid this asynchronous commit API is used. Instead of waiting for the broker to respond to a commit, we just send the request and continue on.

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a non-retriable failure, commitAsync() will not retry.

Sometimes commitSync() and commitAsync() together are used to avoid retry problems if the commit is failed.

Conclusion

We have checked here what is a consumer, how the consumer group works, and how we can parallelize consumers by using the same consumer group id. we have also checked on to do offset management when working with an application.

--

--