Apache Kafka Automation, Part 2: Kafka as a Service

by Ali Jafari

Apache Kafka is a community distributed event streaming and processing platform capable of handling trillions of events each day. Managing Kafka, especially in cloud environments, can be challenging. In this blog post, I will address the challenge of deploying Kafka as a Service on Amazon Web Services (AWS). Before getting into that, I will briefly introduce the tool and describe its capabilities.

What is Kafka

Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a fully-fledged event streaming platform.

Kafka has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant, durable way.
  • Process streams of records as they occur.

Kafka is generally used for two broad classes of applications:

  • Real-time data streaming pipelines that reliably move data between systems or applications.
  • Real-time streaming applications that transform or react to continual flow of data.

What is Confluent

Founded by the original developers of Apache Kafka, Confluent provides the most complete distribution of Kafka with Confluent Platform. Confluent Platform improves Kafka with additional community and commercial features that enhance the streaming experience and support massive scales.

Challenges in the Cloud (AWS)

Provisioning a Kafka cluster on AWS can be as simple as installing the required Java and Confluent Platform packages. A more reliable approach is to use Chef and Packer to create an Amazon Machine Image (AMI) with Kafka and all dependencies installed. The challenge comes in configuring Kafka:

  • How do we determine the broker id for each broker?
  • How we handle broker replacement? On in other words, how we tell to the other brokers about new broker IP addresses?
  • In the event of new broker initialization, how do we manage data replication?

I'll address each of these questions, but first, I'll make a few assumptions:

  1. We have a cluster of 3 brokers.
  2. Each broker is deployed on an EC2 instance in one Available Zone (AZ).
  3. We are leveraging an Auto Scaling Group (ASG).
  4. We are using Chef and Packer to bake Confluent Kafka and all its dependencies.
  5. We are using Keystore for secret and configuration management.

When we deploy Kafka cluster (Confluent flavor) on AWS as a service, there is no need for manual installation or configuration. In the case of any broker going down or being replaced, configuration information (such as broker id assignment) is automated and no manual configuration is needed! How is this achieved?

Broker Assignment

Broker id is a mandatory configuration in Kafka. It can be string or number, but it needs to be unique in the cluster. How do we set this value?

I list two approaches:

  1. Use the EC2 instance id: This is very simple to setup, but when a broker is replaced, replication may put a lot of traffic on the network. Another downside is that if all the brokers are terminated, we will lose our data. This option is viable for a large cluster.
  2. Use a number, 0 to N: This requires custom code to query Zookeeper, determine what ids are available, and pick one. The script needs to be smart enough to handle multiple simultaneous requests and not return duplicate ids. The main benefit of this option is that we can predict the broker id and provision some resources like Elastic Block Store (EBS) and Elastic Network Interface (ENI) and apply them to the broker, as described in the next section. This option is good for small to medium-sized clusters. Here is some sample python that manages the broker assignment:
from kazoo.client import KazooClient

zk = KazooClient(hosts=zk_hosts, logger=logger)
zk.start()
zk_broker_ids = zk.get_children('/brokers/ids')
zk.stop()
zk.close()
# our cluster size is 3 in our example
max_broker_count = 3
set_broker_ids = set(map(int, zk_broker_ids))
possible_broker_ids = set(range(max_broker_count))
broker_id = sorted(possible_broker_ids - set_broker_ids)[0]

For the rest of this article, I'll use a small pool of broker ids: 0, 1 and 2.

ENI Attachment

Since we know the broker id, attaching ENI is easy, because we simply can map broker ids to the ENI tags:

  1. id = 0 map to ENI tag KAFKA-0.
  2. id = 1 map to ENI tag KAFKA-1.
  3. id = 2 map to ENI tag KAFKA-2.

Before we go farther, let's see why we need ENI. We need to create an environment with static internal IP addresses for the brokers, so that if a broker is replaced, the new one gets the same IP address. With ENI, we do this easily.

In our ASG launch configuration, we should follow this order to attach the ENI:

Step 1. Get broker id:

require 'zookeeper'
def get_broker_id(zk_host)
  zk = Zookeeper.new(zk_host)
  assigned_ids = zk.get_children(:path => '/brokers/ids')[:children]
  # our cluster size is 3 in our example
  max_broker_count = 3
  all_ids =* (0..(max_broker_count - 1)).map(&:to_s)
  possible_ids = all_ids - assigned_ids
  possible_ids.size == 0 ? -1 : possible_ids[0]
end

Step 2. Get available ENI with the correct tag:

zk_host = '10.100.1.100:2181,10.100.2.100:2181,10.100.3.100:2181'

broker_id = get_broker_id(zk_host)
@ec2 = Aws::EC2::Client.new(region: region)
# get the available eni
eni = @ec2.describe_network_interfaces(
  filters: [
    {
       name: 'tag:Name', 
       values: ["KAFKA-#{broker_id}"]
    },
    {
       name: 'status',
       values: ['available']
    }
  ]).network_interfaces[0]

Step 3. Attach the ENI:

metadata_endpoint = 'http://169.254.169.254/latest/meta-data/'
instance_id = Net::HTTP.get(URI.parse(metadata_endpoint + 'instance-id'))
eni.attach(instance_id: instance_id, device_index: 1)

Step 4. Network configuration: At this point, we know the correct IP address is attached, but we are not able to use it for communication yet! First, we must create a network config and route with our new network device as the default. This can be managed by a shell script:

#!/bin/bash -e
export GATEWAY=`route -n | grep "^0.0.0.0" | tr -s " " | cut -f2 -d" "`

if [ -f /etc/network/interfaces.d/eth1.cfg ]; then mv -f /etc/network/interfaces.d/eth1.cfg /etc/network/interfaces.d/backup.eth1.cfg.backup; fi
cat > /etc/network/interfaces.d/eth1.cfg <<ETHCFG
auto eth1
iface eth1 inet dhcp
    up ip route add default via $GATEWAY dev eth1 table eth1_rt
    up ip rule add from <%= new_ip_address %> lookup eth1_rt prio 1000
ETHCFG

mv /etc/iproute2/rt_tables /etc/iproute2/backup.rt_tables.backup
cat > /etc/iproute2/rt_tables <<RTTABLES
#
# reserved values
#
255     local
254     main
253     default
0       unspec
#
# local
#
#1      inr.ruhep
2 eth1_rt
RTTABLES

ifup eth1

ip route add default via $GATEWAY dev eth1 table eth1_rt;

These scripts can be run by Chef as part of the ASG launch configuration.

EBS Attachment

Our next step is to attach an EBS volume. Why do we need this? We want our Kafka cluster to be resilient and fault-tolerant. This means not losing any data in the event one or more brokers are terminated. We can ensure this with the topic's replication factor.

Kafka is designed to be a distributed system and it replicates data based on the topic's replication factor. If the number of instances (brokers) terminated at any given moment is equal to or greater than the topic's replication factor, then data will be lost. For example, If the replication factor is 2 and both brokers are terminated, the data is gone. If we set the replication factor to a reasonably high number, we can protect our data, but the challenge is that every time a new broker is created data needs to be replicated to it. This places stress on our network. If you're testing for resilience with Chaos Monkey, you'll quickly see what I mean!

In order to ease the load on our network and prevent full data replication to the new broker, we can use an EBS volume, and, if we are mindful about our choice of ids, we can attach the same volume based on broker id, over and over. Here's how to do it: use an extra EBS volume for each broker and tag it with KAFKA-#{broker_id}; set the Delete on termination property to false; and attach it to the new replaced broker. Here is a python code sample:

from boto import ec2
import commands

broker_id = get_broker_id()
conn = ec2.connect_to_region(region_name)

volume = conn.get_all_volumes(
  filters = {
    'tag:Name': "KAFKA-%s" % broker_id
  })[0]
# attach the volume
conn.attach_volume(volume.id, instance_id, '/dev/xvdg')
# mount it
commands.getstatusoutput('mount /dev/xvdg /kafkalogs')

Handling Service Discovery

Now that we have attached ENI, we can specify the IP addresses that we want. In the ASG CloudFormation template, we can provision ENI with whatever IP that we want in the subnet. Here's how:

NetworkInterface1:
  Type: AWS::EC2::NetworkInterface
  Properties:
    SubnetId:
      Ref: Subnet1
    PrivateIpAddress: 10.100.1.200
    Description: ENI for Kafka broker 0
    GroupSet:
    - Ref: InstanceSecurityGroup
    Tags:
    - Key: Name
      Value: KAFKA-0

Now we always know the new broker's IP (or hostname) and connection url. This is the Kafka connection url (AKA bootstrap-server) in our example:

broker 0: 10.100.1.200:9092

broker 1: 10.100.2.200:9092

broker 2: 10.100.3.200:9092

cluster: 10.100.1.200:9092,10.100.2.200:9092,10.100.3.200:9092

Why it's Self-Healing

In the sections above, I described how we can attach a network interface to a new broker using a previous id and how we can attach and reuse an EBS volume within our ASG. Now let's review what happens when a broker is terminated:

  • A broker is terminated; let's assume it is broker 0.
  • Because we have an ASG, a new broker starts.
  • From launch configuration (or userdata) these steps run:
    1. Available broker id determination from Zookeeper (0 in this case).
    2. ENI attachment with tag name of KAFKA-0 (if it is not available yet, we wait a few seconds).
    3. EBS volume attachment with tag name of KAFKA-0 (if it is not available yet, we wait a few seconds).
  • If steps 1, 2, and 3 successfully run, then a success signal is sent to the CloudFormation stack:
$ cfn-signal -e 0 --stack $stack_name --resource InstanceAsg --region us-east-1
  • If any of the steps 1, 2, or 3 were not successful, then the CloudFormation stack and ASG will consider the node to be unhealthy, and will terminate that broker. The process will repeat until it gets a healthy broker with id of 0.

As we can see, there is no requirement for manual intervention and we have a Self-Healing cluster or Kafka as a Service.

Here is the GitHub repository with the the source code for the Kafka as a Service configuration, ready for AWS deployment.

If you missed Part 1 of this series, you can find it here. When you're ready, continue to Part 3.

References