How to Deploy Kafka Connect on Kubernetes using Helm Charts

Here’s our step-by-step how-to guide to deploying Kafka Connect on Kubernetes for connecting Kafka to external systems.

Kubernetes (K8s) is one of the most famous open-source projects and it is being continuously adapted. Kafka is an open-source stream-processing software platform that is used by a lot of companies. For example, LinkedIn customizes Apache Kafka for 7 trillion messages per day.

So, what is Kafka Connect now? Kafka Connect is an open-source component of Kafka, a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. It makes it simple to quickly define connectors that move large data sets into and out of Kafka.

As Kafka Connect is a component of Kafka, the setup will need Kafka broker(s) and Zookeepers (at least, until the Zookeeper dependency is removed). Our setup would look like something below:

deploy kafka connect on kubernetes using helm charts

We will start by setting up a Kafka cluster first. In this blog, we are going to use Confluent’s open-source Helm chart to do that. I am one of the contributors to these Helm charts and those are really good if you want to learn “Kafka on Kubernetes”.

For getting started, make sure a Kubernetes cluster is running (e.g. GKE by Google, EKS by AWS, AKS by Azure, Minikube, etc.) and the following tools are installed in your local system:

  1. helm (version used for this blog: v2.16.1)
  2. Kubectl (version used for this blog: v1.15.3)
  3. docker (version used for this blog: 19.03.1)
  4. MySql (Version used for this blog: 5.7)

Let’s start by cloning the repository and updating the dependencies.

git clone git@github.com:confluentinc/cp-helm-charts.git
cd cp-helm-charts
helm dependency update charts/cp-kafka/

The last command updates the dependencies in the cp-kafka chart that has a dependency of cp-zookeeper chart. Installation of cp-kafka fails without running the update command.

Now let’s move ahead and deploy Kafka brokers with Zookeepers with a release name (e.g. confluent) using the below command:

helm install --name confluent ./charts/cp-kafka

It will take a few minutes before all the pods start running. Let’s verify the resources created with our release are working fine using kubectl.

$ kubectl get pods

NAME                                            READY      STATUS       RESTARTS     AGE
confluent-cp-kafka-0                2/2            Running        0                      5m16s
confluent-cp-kafka-1                2/2            Running        0                      4m47s
confluent-cp-kafka-2                2/2            Running        0                      4m29s
confluent-cp-zookeeper-0       2/2            Running        0                      5m16s
confluent-cp-zookeeper-1       2/2            Running        0                      4m47s
confluent-cp-zookeeper-2       2/2            Running        0                      4m21s

$ kubectl get services

NAME                                  TYPE                     CLUSTER-IP        PORT(S)         AGE
cp-kafka                             ClusterIP              xx.xx.xxx.x            9092/TCP       5m16s
cp-kafka-headless           ClusterIP              None                      9092/TCP       5m16s
cp-zookeeper                    ClusterIP              xx.xx.xxx.x            2181/TCP       5m16s
cp-zookeeper-headless  ClusterIP              None         2888/TCP,3888/TCP 5m16s

If you notice all brokers and zookeepers have 2 containers per pod, one of these is the prometheus container. You can disable prometheus by editing the values files or simply setting values from Helm command-line while installing (E.g. helm install --set prometheus.jmx.enabled=false..)

Since we have the Kafka Connect dependencies in place we can go ahead and deploy the Kafka Connect chart too. However, to read from a MySQL database we will need JDBC Source Connector installed in our container. To do so let’s use the confluentinc/cp-kafka-connect image provided by Confluent and add a line to install JDBC Source Connector. Put the below content in file named Dockerfile

FROM confluentinc/cp-kafka-connect:5.4.0

RUN echo "===> Installing MySQL connector" \
&& curl https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar  --output /usr/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.19.jar

NOTE: I have my Kubernetes cluster running on Google Cloud Platform, I will use Google Container Registry to keep my built docker image. You can simply use Dockerhub or any other preferred platform.

The below commands build and push the docker image to Google Container Registry:

docker build -t gcr.io/project123/cp-kafka-connect:5.4.0-jdbc .
docker push gcr.io/project123/cp-kafka-connect:5.4.0-jdbc

The next step is to use the docker image we just built and deploy Kafka connect on Kubernetes:

helm install --name confluent-2 \
  --set image="gcr\.io/project123/cp-kafka-connect" \
  --set imageTag="5.4.0-jdbc" \
  --set kafka.bootstrapServers="PLAINTEXT://confluent-cp-kafka-headless:9092" \
  ./charts/cp-kafka-connect

Replace name, image and imageTag with appropriate values in the above command. Here, kafka.bootstrapServers is the service and port at which Kafka brokers are running.

After running kubectl get all command again, we should see the pod, service, deployment, etc. running for Kafka Connect as well. Make sure the connect worker is healthy.

$ kubectl logs confluent-2-cp-kafka-connect-mvt5d \
--container cp-kafka-connect-server

[datetime] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect)
[datetime] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Here, confluent-2-cp-kafka-connect-mvt5d is the name of the pod created for me, it should be something similar for you too, based on the release name you choose (for me release name is : confluent-2 ).

Now we have our Kafka Connect server running, but to read from a database (e.g MySQL) we will need to create connectors. Let’s do that now.

Presuming we have a MySQL server running somewhere and MySQL client installed on your local system, let’s connect to the MySQL server using appropriate credentials and execute the following SQL statements:

# Repace xx.xxx.xxx.xx, and root with appropriate values
$ mysql -u root -h xx.xxx.xxx.xx -p

CREATE DATABASE IF NOT EXISTS test_db;
USE test_db;
DROP TABLE IF EXISTS test_table;

CREATE TABLE IF NOT EXISTS test_table (
id serial NOT NULL PRIMARY KEY,
name varchar(100),
emailId varchar(200),
branch varchar(200),
updated timestamp default CURRENT_TIMESTAMP NOT NULL,
INDEX `updated_index` (`updated`)
);

INSERT INTO test_table (name, emailId, branch) VALUES ('Chandler', 'muriel@venus.com', 'Transponster');

INSERT INTO test_table (name, emailId, branch) VALUES ('Joey', 'joseph@tribbiani.com', 'DOOL');

exit;

While deploying Kafka Brokers and Zookeepers above, a sample Kafka-client is shown in the outputs for testing. Let’s save that in a file called sample-pod.yaml and deploy that.

apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
  namespace: default
  spec:
    containers:
    - name: kafka-client
      image: confluentinc/cp-enterprise-kafka:5.4.1
      command:
      - sh
      - -c
      - "exec tail -f /dev/null"

Deploy this sample pod using the below command:

kubectl apply -f sample-pod.yaml

We can verify if the Connect server is working by sending a simple GET request to Kafka Connect REST endpoint. Read more about the REST API here.

$ kubectl exec -it kafka-client -- curl confluent-2-cp-kafka-connect:8083/connectors

# Output
[]

As there are no connectors yet, we get a SUCCESS response with an empty list [ ]. Let’s exec into the container and create a connector:

$ kubectl exec -ti confluent-2-cp-kafka-connect-mvt5d \
  --container cp-kafka-connect-server -- /bin/bash

$ curl -X POST \
  -H "Content-Type: application/json" \
  --data '{ "name": "k8s-connect-source",
  "config": {
  "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "tasks.max": 1,
"connection.url":"jdbc:mysql://xx.xxx.xxx.xx/test_dbuser=root&password=ayadav",
  "mode": "incrementing",
  "incrementing.column.name": "id",
  "timestamp.column.name": "updated",
  "topic.prefix": "k8s-connect-",
  "poll.interval.ms": 1000 } }'\
  http://localhost:8083/connectors

Note:

  • Make sure to replace the value of connection.url with an appropriate value and verify other configurations too. We are using JsonConverter in this connector to avoid using Schema-registry (which is recommended) for the simplicity of the article.
  • These SQL statements and the connector are inspired by this tutorial.

We can verify the status of the connector by running the following command (still from inside the sample Enterprise-Kafka sample pod):

$ curl -s -X \
  GET http://localhost:8083/connectors/k8s-connect-source/status

{"name":"k8s-connect-source","connector":{"state":"RUNNING","worker_id":"10.8.4.2:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.4.2:8083"}],"type":"source"}

This should have created a list of topics on which Kafka Connect stores the connector configurations and pushes messages every time a new row gets added to the table. Exit the container and run the below command on your machine:

$ kubectl -n default exec kafka-client -- /usr/bin/kafka-topics --zookeeper confluent-cp-zookeeper:2181 --list

# Output: List of topics
__confluent.support.metrics
__consumer_offsets
_confluent-metrics
confluent-2-cp-kafka-connect-config
confluent-2-cp-kafka-connect-offset
confluent-2-cp-kafka-connect-status
k8s-connect-test
k8s-connect-test_table


# Listen for the messages on the Kafka topic
$ kubectl -n default exec -ti \
    kafka-client -- /usr/bin/kafka-console-consumer \
    --bootstrap-server confluent-10-cp-kafka:9092 \
    --topic k8s-connect-test_table --from-beginning

# Output
{"id":1,"name":"Joey","emailId":"joey@tribianni.com","branch":"DOOL","updated":1585514796000}
{"id":2,"name":"Chandler","emailId":"muriel@venus.com","branch":"Transponster","updated":1585514796000}

Furthermore, you can keep the listener shell alive, connect to MySQL again, and add a new row. You should see a new message from this topic in the kubectl output.

BONUS TIP: To automate the process of creating connectors on the fly while deploying Kafka Connect, have a look at this Pull Request I had submitted that is now merged with the master branch, and the values.yaml file.

Please let me know in the comments if you get stuck somewhere, or if you have any suggestions for improvement. You may also be interested in my article on Setting up TCP load balancers in a multi-regional cluster using GKE. Thank you for reading.