Apache Kafka is a message queuing system…A very large, very complex message queuing system! Here are my rough notes on getting started, rules, vocabulary etc. This is not an in-depth tutorial; instead, it’s a brief reference on the basics. It is outlined based on Stephane Maarek’s excellent introduction to Kafka.

Quick Start

Download the latest Zookeeper and Kafka (they are bundled together). Both are services. Zookeeper runs on port 2181 by default. Kafka depends on Zookeeper and runs on port 9092.

Add Kafka’s bin folder to PATH in ~/.bashrc for convenience.

In the Kafka folder: mkdir -p data/zookeeper. Set dataDir in config/zookeeper.properties to this folder. Run zookeeper-server-start.sh config/zookeeper.properties.

Again, in the Kafka folder: mkdir -p data/kafka. Edit config/server.properties so that log.dirs points to this folder. Run kafka-server-start.sh config/server.properties.

Vocab/Theory

Kafka has “topics”, which are like database tables. Each topic has a number of “partitions” specified upon creation of the topic. Each partition has messages, where each message has some incremental offset. So a message is identified by topic -> partition -> offset.

Offset only means something within its partition. Ordering is only guaranteed within a partition. Data is only kept for a limited amount of time (default one week). (IDs will keep incrementing, though.) Data is immutable. Data is assigned to a random partition (round-robin, load-balanced) if no key is given.

A Kafka cluster has many brokers (servers). Each broker has an ID. Each broker has some, but not all, topic partitions. When you connect to one broker, you’re connected to the whole cluster. Start with three brokers (big companies have > 100).

You can set a topic replication factor, usually between 2 and 3 (prefer 3). This duplicates partitions across brokers.

At any given time, ONE broker can be a leader for a given partition. ONLY that leader can receive and send data for that partition. The other brokers just replicate the data (these are in-sync replicas, or ISR). Zookeeper is what keeps them in sync.

Producers write data to topics. Producers automatically know which broker and partition to write to (makes things easier). If a broker fails, the producers automatically recover.

Producers can use acks=0, which won’t wait for acknowledgement, and may result in data loss. Dangerous. acks=1 is the default, which waits for leader acknowledgement and may have limited data loss. acks=all waits for leader and replica acknowledgement and has no data loss.

Producers can send a “key” (string, number, etc.) with the message. It’s used when you need message ordering for a specific field (e.g. truck_id). Key hashing determines which partition a key will go to.

Consumers read data from a topic (ID’d by name). Consumers know which broker to read from automatically. Data is read in order within each partition. Consumers that are subscribed to multiple partitions read a little from one, a little from another, back to the first, etc.

A “consumer group” is an application with multiple consumers. Each consumer in a group reads from an exclusive partition, i.e. no other consumer in that group reads from the same partition. If there are more consumers than partitions, some consumers will be inactive. This is all handled automatically by a GroupCoordinator or ConsumerCoordinator.

Kafka stores the offsets that a consumer group has read from–checkpointing, or bookmarking. The offsets are committed to a topic named __consumer_offsets. When a consumer in a group has processed a specific offset, it should commit that offset to __consumer_offsets. This can be done automatically. This enables consumers that die to pick up where they left off when they restart (I’m back alive, where should I start from? The table says “offset 1234”).

Committing an offset implies using “delivery semantics”. Consumers choose when to commit offsets. There are three delivery semantics:

  • At most once
    • Offset is committed as soon as the message is received.
    • Message is lost if processing goes wrong.
    • Usually you don’t want this.
  • At least once
    • Offset is committed after processing.
    • Message is read again if processing goes wrong.
    • Usually preferred.
    • System must be idempotent, i.e. processing a message multiple times won’t impact your systems
  • Exactly once
    • Can only be achieved for Kafka using Kafka workflows and Kafka Stream API
    • For Kafka to external system workflows, use an idempotent consumer (ensures no duplicates in final database).

Broker discovery: every broker is a “bootstrap server”, so you only connect to one broker, and you’re connected to the whole cluster (again, automatically). Every broker has all metadata on all brokers, topics, and partitions.

Zookeeper keeps a list of brokers. It helps with leader election. It sends notifications to Kafka (new topic, broker dies, delete topic, broker comes up, etc). Kafka requires Zookeeper (for now). Zookeeper operates with an odd number of servers–3, 5, 7. Zookeeper has a leader to handle writes; the rest are followers that handle reads. Each broker is connected to a Zookeeper node; I don’t think it matters which one.

Guarantees: messages are appended to partitions in the order they are sent. Messages are read in the order they are stored. With replication factor n, the producers and consumers can tolerate n-1 brokers being down. If the number of partitions for a topic stays the same (i.e. no new partitions), the same key will always go to the same partition.

CLI

kafka-topics.sh is the most important command. Start by creating a topic:

kafka-topics.sh \
  --zookeeper 127.0.0.1:2181 \
  --topic first_topic \
  --create \
  --partitions 3 \
  --replication-factor 2

partitions and replication-factor are required! Check that the topic was created:

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

More info: kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --describe.

Delete a topic with --delete. This marks the topic for deletion.

kafka-console-producer.sh pushes messages. Run kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic. Type messages in and press ctrl-c.

Add --producer-property acks=all to get acks.

Use a previously-unused topic name to create a new topic. It gives a warning about no leader and then works fine (the producer waits until a leader is available and then retries…automatically). The new topic has a partition count and replication factor of 1, which isn’t great. So always create the topic before pushing to it. You can edit server.properties to change the defaults.

kafka-console-consumer.sh gets messages. Run kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic. Nothing will show, since it only gets new messages (since launched) by default. Add --from-beginning to get all (note that the ordering is still per partition, not per topic).

Consumers have to be part of a group, and the group ID is the name of the application. Run kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-first-application to create a consumer group. Run it another instance of it to show multiple consumers in a group. Then produce messages with kafka-console-producer.sh to show round-robin distribution of messages to the consumers. Close one consumer, and all messages will go to the remaining ones (partitions are automatically reassigned to other consumers when one consumer goes down).

Create another group with --group my-second-application and use --from-beginning. All messages are replayed. Run it again, and no messages are replayed. This is because the offset for the group is committed to Kafka, so --from-beginning is ignored the second time.

kafka-consumer-groups.sh gets info about consumer groups and allows resetting offsets. kafka-consumer-groups.sh --boostrap-server 127.0.0.1:9092 --group my-second-application --describe. Could also use --all-groups.

To reset offsets: kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --all-groups --reset-offsets --topic first_topic --to-earliest --execute. (Tons more options are available, e.g. --shift-by -2 shifts back by two messages.)