What, why, when to use Apache Kafka, with an example

I have seen, heard and been asked questions and comments like

What is Kafka and When should I use it?

I don’t understand why we have to use Kafka

The objective of this post is to get you up to speed with what Apache Kafka is, when to use them and the foundational concepts of Apache Kafka with a simple example.

What is Apache Kafka

First let’s understand what Apache Kafka is. According to the official definition, it is a distributed streaming platform. This means that you have a cluster of connected machines (Kafka Cluster) which can

  1. Receive data from multiple applications, the applications producing data(aka messages) are called producers.

  2. Reliably store the received data(aka message).

  3. Allow applications to read the stored data, these applications are called consumers since they are consuming the data(aka message). Consumers usually read one message at a time.

  4. Guarantee order of data. That is, if a message, say m1, is received by the cluster at time t1 and another message, say m2, is received by the cluster at later time t1 + 5 sec, then a consumer reading the messages will read m1 before m2.

  5. Provide at-least once delivery of data. This means every message sent to the Apache Kafka cluster is guaranteed to be received by a consumer at least once. This means that at the consumer there may be duplication of data. The most common reason for this is that the message sent by producer getting lost due to network failures. There are techniques to deal with this, which we will see in a future post.

  6. Has support for running apps on the Kafka cluster using connector and framework to process messages called Streams API.

Going forward we will be using message to denote the data that the producer sends to the Apache Kafka cluster and the data that the consumer reads from Apache Kafka cluster.

simple kafka

Why use Apache Kafka

In order to understand why Apache Kafka is popular with streaming apps, lets understand what is generally used, and why Apache Kafka is a better choice for certain use cases.

Currently used, RESTful systems - HTTPS

Modern web applications are RESTful services. What this means is that

  1. A server or client (Usually your browser) sends a HTTP(S)(either GET,PUT,POST,DELETE there are more, but these are the popular ones) request to another server(the backend server).

  2. The server that receives this HTTP(S) request, authenticates the request, does custom processing according to the logic you have and responds with a status code and data in most cases.

  3. The server or client making the request receives the response and proceeds with the logic you have defined.

In most cases the request is made from the client(your browser) to a server(aka backend) running on the cloud which does the required processing and responds with the appropriate data.

simple HTTPS

For example, in this website if you right click on this page -> inspect -> network tab and refresh the webpage, search for what, Click on it and select the Headers tab, you will be able to see the HTTPS requests that your client(aka browser) has sent to the backend. In the Response tab you will also be able to see the response, which in this case will be an HTML to be displayed.

HTTPS Request HTTPS Response

Common use cases

The RESTful services model works fine for most cases. Let’s go over the use cases below and try to think of efficient ways to solve them

  1. Let’s assume our application has 10Million users and we want to record user actions(hover, move, idle, etc) every 5 seconds. This will create 120Million user action events per minute. In this case we don’t have to make the user aware that we have successfully processed their action information. To respond to 120Million requests per minute, we will need multiple servers running copies of your application. How will you solve this?

  2. Let’s say one of our applications need to send a message to 3 other applications. In this case assume the application that sends the message does not need to know if the message was processed. How will you solve this?

Spend some time brainstorming the above use cases before reading on.

One key thing that sets apart the above use cases from a normal web request is that we don’t have to process the data and respond immediately. In case 1 we don’t have to process the data immediately, we can just store the data in some place and process it later, depending on project constraints. In case 2 we can send HTTPS requests to 3 other applications and get responses from those applications, but since we do not need the sender application to know the state of the process we can write the data to a location and have the other 3 applications read from it.

This is basically what Apache Kafka does. In Apache Kafka cluster you have Topics which are ordered queues of messages.

Solution for case 1

We will send 120Million messages per minute into a Topic lets say user-action-event from the your user client(web browser) and you can have your producer applications read from them at their own pace of processing.

Solution for case 2

We will have our producer application send messages to a Topic lets say multi-app-events then all the 3 applications can read from this topic. This reduces the burden on the producer as it only cares about sending the messages.

You can set up the producer as a fire and forget model, where the producer sends a message to the Apache Kafka cluster and moves on or message acknowledgement model where the producer sends a message to the Apache Kafka cluster and waits to receive a confirmation from the Apache Kafka cluster. Use fire and forget model if a few message losses is tolerable and to increase the producer speed. Use message acknowledgement model when you need to be certain that you don’t want lose a message due to network failures or such.

Kafka Architecture ref: https://kafka.apache.org/documentation/

Simple Example

Let’s understand how Apache Kafka works with a simple example.

prerequisites

  1. Docker (also make sure you have docker-compose)
  2. Python3

Setup

First let’s set up an Apache Kafka docker container. For this example we will use the popular wurstmeister image. We can use the Confluent docker image as well, but it requires 8GB of docker memory.

First let’s clone the wurstmeister repo.

cd <your-directory>
git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker

Then we modify the kafka service section in the docker-compose.yml file, to have the following configuration

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
     - "2181:2181"
  kafka:
    build: .
    ports:
     - "9092:9092"
    expose:
     - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
     - /var/run/docker.sock:/var/run/docker.sock

The environment variables KAFKA_* are settings allowing connection between Apache Kafka, Apache Zookeeper(the service that does cluster management) and from producers and consumers outside the docker container.

Now start the Apache Kafka and Apache Zookeeper docker containers as shown below

docker-compose up -d

The -d runs the docker containers in detached mode. Each node within the Apache Kafka cluster is called a broker. Now you can check the list of containers running using

docker-compose ps

You will see

Name Command State Ports
kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:9092->9092/tcp, 9093/tcp
kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd … Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

You can see that your kafka-docker_kafka_1 (Apache Kafka container) and kafka-docker_zookeeper_1 have started.

For this simple example we will be using Python’s Kafka library. You can install it using

pip install kafka-python

In Apache Kafka, as we saw earlier messages are stored in queues called topics. By default topics are created automatically when a producer pushes a message to that topic. This is controlled using the auto.create.topics.enable broker configuration variable.

Producer & Consumers

Let’s assume we have a simple event listener on the client which sends a message to the backend server every 3 seconds, we can write a naive producer in python and lets call it user_event_producer.py

from datetime import datetime
import json
from kafka import KafkaProducer
import random
import time
import uuid

EVENT_TYPE_LIST = ['buy', 'sell', 'click', 'hover', 'idle_5']

producer = KafkaProducer(
   value_serializer=lambda msg: json.dumps(msg).encode('utf-8'), # we serialize our data to json for efficent transfer
   bootstrap_servers=['localhost:9092'])

TOPIC_NAME = 'events_topic'


def _produce_event():
    """
    Function to produce events
    """
    # UUID produces a universally unique identifier
    return {
        'event_id': str(uuid.uuid4()),
        'event_datetime': datetime.now().strftime('%Y-%m-%d-%H-%M-%S'),
        'event_type': random.choice(EVENT_TYPE_LIST)
    }

def send_events():
    while(True):
        data = _produce_event()
        time.sleep(3) # simulate some processing logic
        producer.send(TOPIC_NAME, value=data)

if __name__ == '__main__':
    send_events()

Now the consumer script. Let’s call it user_event_consumer.py

import json
from kafka import KafkaConsumer

TOPIC_NAME = 'events_topic'

consumer = KafkaConsumer(
    TOPIC_NAME,
    auto_offset_reset='earliest', # where to start reading the messages at
    group_id='event-collector-group-1', # consumer group id
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')) # we deserialize our data from json
)

def consume_events():
    for m in consumer:
        # any custom logic you need
        print(m.value)

if __name__ == '__main__':
    consume_events()

Some key points from the above python script

  1. offset: denotes the position of a message within the topic. This helps the consumers decide from which message to start reading.

  2. auto_offset_reset: The possible values are earliest and latest which tells the consumer to read from the earliest available message or the latest message the consumer has yet to read in the topic respectively.

  3. group_id: denotes the group the consumer application is a part of. Usually multiple consumers are run in a group and group id enables consumers to keep track of which messages have been read and which have not.

Offset ref: https://kafka.apache.org/documentation/

Let’s start the python scripts

python user_event_producer.py & # the & runs the python script in the background
python user_event_consumer.py

you will start seeing something like

{"event_id": "3f847c7b-e015-4f01-9f5c-81c536b9d89b", "event_datetime": "2020-06-12-21-12-27", "event_type": "buy"}
{"event_id": "e9b51a88-0e86-47cc-b412-8159bfda3128", "event_datetime": "2020-06-12-21-12-30", "event_type": "sell"}
{"event_id": "55c115f1-0a23-4c89-97b2-fc388bee28f5", "event_datetime": "2020-06-12-21-12-33", "event_type": "click"}
{"event_id": "28c01bae-5b5b-421b-bc8b-f3bed0e1d77f", "event_datetime": "2020-06-12-21-12-36", "event_type": "sell"}
{"event_id": "8d6b1cbe-304f-4dec-8389-883d77f99084", "event_datetime": "2020-06-12-21-12-39", "event_type": "hover"}
{"event_id": "50ffcd7c-5d40-412e-9223-cc7a26948fa9", "event_datetime": "2020-06-12-21-12-42", "event_type": "hover"}
{"event_id": "6dbb5438-482f-4f77-952e-aaa54f11320b", "event_datetime": "2020-06-12-21-12-45", "event_type": "click"}

You can stop the python consumer script using ctrl + c. Remember your producer is running in the background. You can stop this using

pkill -f user_event_producer.py

The above kills a process based on its name. Now let’s check out Apache Kafka cluster to see the list of available topics, we must see the topic that was created by our producer at events_topic.

docker exec  -it $(docker ps -aqf "name=kafka-docker_kafka") bash # get inside the Kafka container
$KAFKA_HOME/bin/kafka-topics.sh --list  --bootstrap-server kafka:9092 # list all the topics in this Kafka cluster

# you will see the topics
# __consumer_offsets
# events_topic

# view messages stored in the `events_topic` topic
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic events_topic --from-beginning

exit # exit the container

Auto commit

Now let’s assume that our consumer does some processing and writes the processed data to our database. In this case, if our consumer dies and we restart it, based on our current configuration setting we will reprocess already processed data. Take some time to think about why this happens and what configuration setting we can use to prevent this.

We can avoid this issue by setting the auto_offset_reset setting to latest instead of the earliest that we have in our KafkaConsumer code sample. Every consumer commits the latest offset position of the message it reads into a metadata topic in Kafka every 5 seconds(set as default using auto.commit.interval.ms). This feature is set by default to true using enable.auto.commit. The topic which holds the offset information is called __consumer_offsets.

Partitions

As with most distributed systems, Apache Kafka distributes its data across multiple nodes within the cluster. A topic in Apache Kafka is chunked up into partitions which are duplicated(into 3 copies by default) and stored in multiple nodes within the cluster. This prevents data loss in case of node failures.

Topic Partition ref: https://kafka.apache.org/documentation/

In the above diagram you can see how a topic is split up into partitions and how the incoming messages are duplicated amongst them.

You can stop all the running docker containers using

docker-compose stop

Recap

In this post we saw

  1. What Apache Kafka is

  2. When to use Apache Kafka with a few common use cases

  3. Apache Kafka concepts - Producer, Topic, Broker, Consumer, Offset and auto commit

There are more advanced concepts like partition size, partition function, Apache Kafka Connectors, Streams API, etc which we will cover in future posts.

Conclusion

Hope this article gives you a good introduction of Apache Kafka and insights into when to use and not use it. Let me know if you have any comments or question in the comments section below.

Reference:

  1. kafka-docker-setup

  2. Kafka-configuration