Apache Kafka is a community distributed event streaming and processing platform capable of handling trillions of events each day. Managing Kafka, especially in cloud environments can be challenging. In this blog post, I will address the challenge of deploying Kafka as a Service on AWS.
Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from a messaging queue to a fully-fledged event streaming platform.
Kafka has three key capabilities:
Kafka is generally used for two broad classes of applications:
Founded by the original developers of Apache Kafka, Confluent provides the most complete distribution of Kafka with Confluent Platform. Confluent Platform improves Kafka with additional community and commercial features that enhance the streaming experience and support massive scales.
Provisioning a Kafka cluster on AWS can be as simple as installing the required Java and Confluent Platform packages. A more reliable approach is to use Chef and Packer to create an Amazon Machine Image (AMI) with Kafka and all dependencies installed. The challenge comes in configuring Kafka:
I’ll address each of these questions, but first, I’ll make a few assumptions:
When we deploy Kafka cluster (Confluent flavor) on AWS as a service, there is no need for manual installation or configuration. In the case of any broker going down or being replaced, configuration information (such as broker id assignment) is automated and no manual configuration is needed! How is this achieved?
Broker id is a mandatory configuration in Kafka. It can be string or number, but it needs to be unique in the cluster. How do we set this value?
I list two approaches:
from kazoo.client import KazooClient
zk = KazooClient(hosts=zk_hosts, logger=logger)
zk.start()
zk_broker_ids = zk.get_children('/brokers/ids')
zk.stop()
zk.close()
# our cluster size is 3 in our example
max_broker_count = 3
set_broker_ids = set(map(int, zk_broker_ids))
possible_broker_ids = set(range(max_broker_count))
broker_id = sorted(possible_broker_ids - set_broker_ids)[0]
For the rest of this article, I’ll use a small pool of broker ids: 0, 1 and 2.
Since we know the broker id, attaching ENI is easy, because we simply can map broker ids to the ENI tags:
Before we go farther, let’s see why we need ENI. We need to create an environment with static internal IP addresses for the brokers, so that if a broker is replaced, the new one gets the same IP address. With ENI, we do this easily.
In our ASG launch configuration, we should follow this order to attach the ENI:
Step 1. Get broker id:
require 'zookeeper'
def get_broker_id(zk_host)
zk = Zookeeper.new(zk_host)
assigned_ids = zk.get_children(:path => '/brokers/ids')[:children]
# our cluster size is 3 in our example
max_broker_count = 3
all_ids =* (0..(max_broker_count - 1)).map(&:to_s)
possible_ids = all_ids - assigned_ids
possible_ids.size == 0 ? -1 : possible_ids[0]
end
Step 2. Get available ENI with the correct tag:
Exploring the Complexity of Visualizing COVID-19 Case Data
Over the last four+ months, the world has logged on daily to track case counts...
Microsoft recently reached a major milestone in the development of the open-source cross-platform library ML.NET...
Apache Kafka Automation, Part 1: Zookeeper as a Service
Zookeeper is an orchestration service, typically associated with distributed systems (think Hadoop or Kafka). Managing...