Categories
Technology

How to Deploy Kafka Connect on Kubernetes using Helm Charts

Here’s our step-by-step how-to guide to deploying Kafka Connect on Kubernetes for connecting Kafka to external systems.

Kubernetes (K8s) is one of the most famous open-source projects and it is being continuously adapted. Kafka is an open-source stream-processing software platform that is used by a lot of companies. For example, LinkedIn customizes Apache Kafka for 7 trillion messages per day.

So, what is Kafka Connect now? Kafka Connect is an open-source component of Kafka, a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. It makes it simple to quickly define connectors that move large data sets into and out of Kafka.

As Kafka Connect is a component of Kafka, the setup will need Kafka broker(s) and Zookeepers (at least, until the Zookeeper dependency is removed). Our setup would look like something below:

deploy kafka connect on kubernetes using helm charts

We will start by setting up a Kafka cluster first. In this blog, we are going to use Confluent’s open-source Helm chart to do that. I am one of the contributors to these Helm charts and those are really good if you want to learn “Kafka on Kubernetes”.

For getting started, make sure a Kubernetes cluster is running (e.g. GKE by Google, EKS by AWS, AKS by Azure, Minikube, etc.) and the following tools are installed in your local system:

  1. helm (version used for this blog: v2.16.1)
  2. Kubectl (version used for this blog: v1.15.3)
  3. docker (version used for this blog: 19.03.1)
  4. MySql (Version used for this blog: 5.7)

Let’s start by cloning the repository and updating the dependencies.

git clone git@github.com:confluentinc/cp-helm-charts.git
cd cp-helm-charts
helm dependency update charts/cp-kafka/

The last command updates the dependencies in the cp-kafka chart that has a dependency of cp-zookeeper chart. Installation of cp-kafka fails without running the update command.

Now let’s move ahead and deploy Kafka brokers with Zookeepers with a release name (e.g. confluent) using the below command:

helm install --name confluent ./charts/cp-kafka

It will take a few minutes before all the pods start running. Let’s verify the resources created with our release are working fine using kubectl.

$ kubectl get pods

NAME                                            READY      STATUS       RESTARTS     AGE
confluent-cp-kafka-0                2/2            Running        0                      5m16s
confluent-cp-kafka-1                2/2            Running        0                      4m47s
confluent-cp-kafka-2                2/2            Running        0                      4m29s
confluent-cp-zookeeper-0       2/2            Running        0                      5m16s
confluent-cp-zookeeper-1       2/2            Running        0                      4m47s
confluent-cp-zookeeper-2       2/2            Running        0                      4m21s

$ kubectl get services

NAME                                  TYPE                     CLUSTER-IP        PORT(S)         AGE
cp-kafka                             ClusterIP              xx.xx.xxx.x            9092/TCP       5m16s
cp-kafka-headless           ClusterIP              None                      9092/TCP       5m16s
cp-zookeeper                    ClusterIP              xx.xx.xxx.x            2181/TCP       5m16s
cp-zookeeper-headless  ClusterIP              None         2888/TCP,3888/TCP 5m16s

If you notice all brokers and zookeepers have 2 containers per pod, one of these is the prometheus container. You can disable prometheus by editing the values files or simply setting values from Helm command-line while installing (E.g. helm install --set prometheus.jmx.enabled=false..)

Since we have the Kafka Connect dependencies in place we can go ahead and deploy the Kafka Connect chart too. However, to read from a MySQL database we will need JDBC Source Connector installed in our container. To do so let’s use the confluentinc/cp-kafka-connect image provided by Confluent and add a line to install JDBC Source Connector. Put the below content in file named Dockerfile

FROM confluentinc/cp-kafka-connect:5.4.0

RUN echo "===> Installing MySQL connector" \
&& curl https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar  --output /usr/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.19.jar

NOTE: I have my Kubernetes cluster running on Google Cloud Platform, I will use Google Container Registry to keep my built docker image. You can simply use Dockerhub or any other preferred platform.

The below commands build and push the docker image to Google Container Registry:

docker build -t gcr.io/project123/cp-kafka-connect:5.4.0-jdbc .
docker push gcr.io/project123/cp-kafka-connect:5.4.0-jdbc

The next step is to use the docker image we just built and deploy Kafka connect on Kubernetes:

helm install --name confluent-2 \
  --set image="gcr\.io/project123/cp-kafka-connect" \
  --set imageTag="5.4.0-jdbc" \
  --set kafka.bootstrapServers="PLAINTEXT://confluent-cp-kafka-headless:9092" \
  ./charts/cp-kafka-connect

Replace name, image and imageTag with appropriate values in the above command. Here, kafka.bootstrapServers is the service and port at which Kafka brokers are running.

After running kubectl get all command again, we should see the pod, service, deployment, etc. running for Kafka Connect as well. Make sure the connect worker is healthy.

$ kubectl logs confluent-2-cp-kafka-connect-mvt5d \
--container cp-kafka-connect-server

[datetime] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect)
[datetime] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Here, confluent-2-cp-kafka-connect-mvt5d is the name of the pod created for me, it should be something similar for you too, based on the release name you choose (for me release name is : confluent-2 ).

Now we have our Kafka Connect server running, but to read from a database (e.g MySQL) we will need to create connectors. Let’s do that now.

Presuming we have a MySQL server running somewhere and MySQL client installed on your local system, let’s connect to the MySQL server using appropriate credentials and execute the following SQL statements:

# Repace xx.xxx.xxx.xx, and root with appropriate values
$ mysql -u root -h xx.xxx.xxx.xx -p

CREATE DATABASE IF NOT EXISTS test_db;
USE test_db;
DROP TABLE IF EXISTS test_table;

CREATE TABLE IF NOT EXISTS test_table (
id serial NOT NULL PRIMARY KEY,
name varchar(100),
emailId varchar(200),
branch varchar(200),
updated timestamp default CURRENT_TIMESTAMP NOT NULL,
INDEX `updated_index` (`updated`)
);

INSERT INTO test_table (name, emailId, branch) VALUES ('Chandler', 'muriel@venus.com', 'Transponster');

INSERT INTO test_table (name, emailId, branch) VALUES ('Joey', 'joseph@tribbiani.com', 'DOOL');

exit;

While deploying Kafka Brokers and Zookeepers above, a sample Kafka-client is shown in the outputs for testing. Let’s save that in a file called sample-pod.yaml and deploy that.

apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
  namespace: default
  spec:
    containers:
    - name: kafka-client
      image: confluentinc/cp-enterprise-kafka:5.4.1
      command:
      - sh
      - -c
      - "exec tail -f /dev/null"

Deploy this sample pod using the below command:

kubectl apply -f sample-pod.yaml

We can verify if the Connect server is working by sending a simple GET request to Kafka Connect REST endpoint. Read more about the REST API here.

$ kubectl exec -it kafka-client -- curl confluent-2-cp-kafka-connect:8083/connectors

# Output
[]

As there are no connectors yet, we get a SUCCESS response with an empty list [ ]. Let’s exec into the container and create a connector:

$ kubectl exec -ti confluent-2-cp-kafka-connect-mvt5d \
  --container cp-kafka-connect-server -- /bin/bash

$ curl -X POST \
  -H "Content-Type: application/json" \
  --data '{ "name": "k8s-connect-source",
  "config": {
  "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "tasks.max": 1,
"connection.url":"jdbc:mysql://xx.xxx.xxx.xx/test_dbuser=root&password=ayadav",
  "mode": "incrementing",
  "incrementing.column.name": "id",
  "timestamp.column.name": "updated",
  "topic.prefix": "k8s-connect-",
  "poll.interval.ms": 1000 } }'\
  http://localhost:8083/connectors

Note:

  • Make sure to replace the value of connection.url with an appropriate value and verify other configurations too. We are using JsonConverter in this connector to avoid using Schema-registry (which is recommended) for the simplicity of the article.
  • These SQL statements and the connector are inspired by this tutorial.

We can verify the status of the connector by running the following command (still from inside the sample Enterprise-Kafka sample pod):

$ curl -s -X \
  GET http://localhost:8083/connectors/k8s-connect-source/status

{"name":"k8s-connect-source","connector":{"state":"RUNNING","worker_id":"10.8.4.2:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.8.4.2:8083"}],"type":"source"}

This should have created a list of topics on which Kafka Connect stores the connector configurations and pushes messages every time a new row gets added to the table. Exit the container and run the below command on your machine:

$ kubectl -n default exec kafka-client -- /usr/bin/kafka-topics --zookeeper confluent-cp-zookeeper:2181 --list

# Output: List of topics
__confluent.support.metrics
__consumer_offsets
_confluent-metrics
confluent-2-cp-kafka-connect-config
confluent-2-cp-kafka-connect-offset
confluent-2-cp-kafka-connect-status
k8s-connect-test
k8s-connect-test_table


# Listen for the messages on the Kafka topic
$ kubectl -n default exec -ti \
    kafka-client -- /usr/bin/kafka-console-consumer \
    --bootstrap-server confluent-10-cp-kafka:9092 \
    --topic k8s-connect-test_table --from-beginning

# Output
{"id":1,"name":"Joey","emailId":"joey@tribianni.com","branch":"DOOL","updated":1585514796000}
{"id":2,"name":"Chandler","emailId":"muriel@venus.com","branch":"Transponster","updated":1585514796000}

Furthermore, you can keep the listener shell alive, connect to MySQL again, and add a new row. You should see a new message from this topic in the kubectl output.

BONUS TIP: To automate the process of creating connectors on the fly while deploying Kafka Connect, have a look at this Pull Request I had submitted that is now merged with the master branch, and the values.yaml file.

Please let me know in the comments if you get stuck somewhere, or if you have any suggestions for improvement. You may also be interested in my article on Setting up TCP load balancers in a multi-regional cluster using GKE. Thank you for reading.

Categories
Technology

How to set up TCP load balancer in multi-regional clusters using GKE

“Neither should a ship rely on one small anchor nor should life rest on a single hope.”

This great quote by Epictetus teaches us a lot about reliability. And it hits the nail on the head when it comes to DevOps. Case in point: setting up multi-regional clusters to safeguard reliability of your microservices.

You can’t rely on one Google Kubernetes Cluster (GKE) running in one zone of a region, though you can create a regional (multi-zonal) cluster, where the master cluster and nodes are distributed in different zones of a region (read more about zones and regions here). That way, even if one zone is down, your services and nodes continue to run in the other zones, and you don’t have to lose any sleep over it.

But what if the entire region goes down?

Even a few seconds of downtime in a regional cluster can render your service entirely unavailable.

I’ve scoured the internet for answers, and here’s what I found… There is documentation on how to route HTTP and HTTPS traffic to multi-regional clusters: with Google Cloud Platform’s (GCP) recently introduced command-line interface (CLI) tool, kubemci, you can automatically configure Ingress using Google Cloud Load Balancer (GCLB) for multi-cluster Kubernetes Engine environments. What you’ll have is something like this:

 

GCP’s kubemci tool is useful for HTTP and HTTPS traffic, but not for TCP traffic. Image Source

The beta release of kubemci is useful for creating the same services in multiple clusters (say, cluster-south and cluster-southeast) in multiple regions, and to access those services using a common IP of the Google Cloud Load Balancer (GCLB). This Setting Up A Multi-Cluster Ingress document by GCP helps achieve high availability in case of regional failure. According to the Kubernetes Ingress Documentation: “Ingress exposes HTTP and HTTPS routes from outside the cluster to services within the cluster.” But this doesn’t apply to TCP traffic. 

There’s also documentation on how to route TCP traffic to individual VMs.

There’s surprisingly little documentation on how to route TCP traffic to two GKE clusters (i.e. groups of VMs) sitting in different regions. If you’ve tried to look this up, you know the struggle of finding any comprehensive tutorial. It took a bit of work but I finally figured it out, and I just had to share it with you.

 

How do you route TCP traffic to multi-regional clusters? 

I wanted to set up highly available, multi-regional sftp servers to securely transfer files that internally use the TCP protocol, but you can do this for any TCP service.

This Setting Up TCP Proxy Load Balancing document is a great starting point. It refers only to VMs, but using this as a step-by-step reference, I’ll show you how to apply it to create GKE clusters (as I’m sure you’re aware, a cluster is a group of GCP Compute Instance Virtual Machines).

Note that I’ve modified most outputs: some have been redacted for security reasons, and others abridged (with an ellipsis) for conciseness.

 

  • Step 1: Configure instances spread across two regions

Instead of creating instances, you want to create clusters here. Presuming you’re familiar with the gcloud command-line tool, create two clusters in two different regions each with two nodes (VMs) using the following commands:

 

# Cluster name: cluster-south
gcloud container clusters create cluster-south \
--num-nodes 2 \
--zone asia-south1-a

# Cluster name: cluster-southeast
gcloud container clusters create cluster-south \
--num-nodes 2 \
--zone asia-south1-a 

Use kubectl to deploy services and applications on the cluster. Make your kubectl point to your clusters one by one using the following command:

 

gcloud container clusters get-credentials cluster-name \
--zone zone-name

 

  • Step 2: Create the same services on both the clusters

Use kubectl to deploy services on GKE. This kubectl tool comes preinstalled with the gcloud, but if you don’t already have it installed, click here.I always run all kubectl commands twice, switching the clusters to make sure the services and deployments are present on both clusters and are identical.

 

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
 name: sftp-claim
spec:
 accessModes:
 - ReadWriteOnce
 resources:
   requests:
     storage: 100Mi
 
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
 labels:
   app: sftp
 name: sftp
spec:
 replicas: 1
 strategy:
   type: Recreate
 template:
   metadata:
     labels:
       app: sftp
   spec:
     containers:
     - env:
       - name: USER
         value: user
       - name: PASS
         value: pass
       image: writl/sftp:latest
       name: sftp
       ports:
       - containerPort: 22
       resources: {}
       volumeMounts:
       - mountPath: /data/incoming
         name: sftp-claim
     restartPolicy: Always
     volumes:
     - name: sftp-claim
       persistentVolumeClaim:
         claimName: sftp-claim

Put it in a file named “sftp.yaml” and run the kubectl apply -f sftp.yaml command for both the clusters (switching the context using the gcloud container clusters get-credentials command explained in Step 1). If you notice, a Persistent Volume Claim is also being created through “sftp.yaml”. We are using the writl/sftp docker image to set up an easy and simple sftp server.

Similarly, create a NodePort service that exposes port 30061 by putting the following manifest in an sftp-service.yaml file and run the kubectl apply -f sftp-service.yaml command:

 

apiVersion: v1
kind: Service
metadata:
 name: sftp-service
spec:
 type: NodePort
 ports:
   -
     protocol: TCP
     port: 80
     targetPort: 22
     nodePort: 30061
 selector:
   app: sftp

Next, run kubectl get services,pods to make sure the pods and services run on both the clusters. For both clusters, the output will be something like this:

 

NAME             READY   STATUS    RESTARTS   AGE
pod/sftp-hfs8v   1/1     Running   0          78s
pod/sftp-nqhb2   1/1     Running   0          78s

NAME               TYPE        CLUSTER-IP     PORT(S)
service/kubernetes ClusterIP   14.52.232.1    443/TCP
service/sftp-svc   NodePort    13.57.246.1   80:30061/TCP

 

  • Step 3: Create an instance group for each zone and add instances

You don’t need to create instance groups or add instances here, because GKE creates the group of instances for you. However, you will need a named-port on both groups. The gcloud compute instance-groups list should show the following output:

 

NAME                       LOCATION            INSTANCES
cluster-south..grp         asia-south1-a        2
cluster-southeast..grp     asia-southeast1-a    2

Set the named-ports for both groups by running the following command:

 

cloud compute instance-groups set-named-ports \
instance-group-name \
--named-ports np30061:30061
--zone zone-name

 

  • Step 4: Configure the load balancer

This step requires a series of sub-steps:

 

  • 4.a: Create a health check

The health check is used by the backend services of the load balancer to see which cluster/region/service is healthy to forward the traffic. To create a health check for port 30061, run the following command:

 

gcloud compute health-checks create tcp \
my-tcp-health-check --port 30061

 

  • 4.b: Create a backend service and add instance groups to it

Simply create a backend service that uses the health check and port 30061 you just created.

 

gcloud compute backend-services create \
my-tcp-backend-service \
--global \
--protocol TCP \
--health-checks my-tcp-health-check \
--timeout 5m \
--port-name np30061

Add the instance groups created by GKE to this backend service. For both instance groups, run this command one by one by one for all the groups:

 

gcloud compute backend-services add-backend \
my-tcp-backend-service \
--global \
--instance-group instance-group-name \
--instance-group-zone zone-name \
--balancing-mode UTILIZATION \
--max-utilization 0.8

 

  • 4.c: Configure a target TCP proxy

Run this command to configure the TCP proxy:

 

gcloud compute target-tcp-proxies create \
my-tcp-lb-target-proxy \
--backend-service my-tcp-backend-service \
--proxy-header NONE

 

  • 4.d: Reserve global static IPv4 addresses

The GCP official documentation for TCP load balancer says you can use either IPv4 or IPv6 addresses. We’ll use IPv4. To reserve a static IPv4 address, run the following command:

 

gcloud compute addresses create tcp-lb-static-ipv4 \
--ip-version=IPV4 \
--global

Get the IPv4 address IP using the gcloud compute addresses list command.

 

  • 4.e: Configure global forwarding rules for the address

Use the IPv4 address (say xx.xxx.xxx.xx) to create the forwarding-rule using the following command:

 

gcloud beta compute forwarding-rules create \
tcp-lb-forwarding-rule \
--global \
--target-tcp-proxy my-tcp-lb-target-proxy \
--address xx.xxx.xxx.xx \
--ports 195

Select any of the ports supported by GCP’s TCP load balancer: 25, 43, 110, 143, 195, 443, 465, 587, 700, 993, 995, 1883, 5222. (I randomly selected port 195.)

 

  • Step 5. Create a firewall rule for the TCP load balancer

The firewall rule will allow traffic from the load balancer and health checks. The LBs and health checks source ranges are 130.211.0.0/22,35.191.0.0/16. Use tags to make sure the firewall rule applies to only tagged instances. GKE adds tags to all instances created while spinning up the cluster. All the instances in one instance-group have the same tag. To see the tags associated with groups, run the following commands:

 

gcloud compute instances list
NAME                                     ZONE             
gke-cluster-south-default-pool-67r      asia-south1-a    
gke-cluster-south-default-pool-53q      asia-south1-a    
gke-cluster-southeast-default-pool-86a  asia-southeast1-a
gke-cluster-southeast-default-pool-22g  asia-southeast1-a

# Now run the below command to see the tags
gcloud compute instances describe instance-name \
--zone zone

...output...
tags:
 fingerprint: H7326hhI3qU=
 items:
 - "gke-cluster-south-12345b67-node"

Note: This command produces a large output; you will find the tag in the last four lines.

There will be two unique tags here – one per group. Use those tags to create the required firewall rule to open ports 195 and 30061:

 

gcloud compute firewall-rules create \
allow-tcplb-and-health \
--source-ranges 130.211.0.0/22,35.191.0.0/16 \
--target-tags tag-instance-group-1,tag-instance-group-2 \
--allow tcp:195,tcp:30061

That’s it! You’re all set. Just wait a few minutes to let the load balancer set up completely, then connect to the sftp server by running the following command:

 

sftp -P 195 user@xx.xxx.xxx.xx

You should successfully be able to log in to the sftp server. You can try deleting the server from one cluster using kubectl delete -f sftp.yaml.You’ll notice that if you login again, the load balancer will route traffic to another server.

What are the benefits of this setup?

You can add and delete instances in the GKE cluster without affecting the sftp server, because you use a tag and a daemonset.

In some cases, if GKE updates or recreates the VMs with new IPs, it won’t affect the setup since it doesn’t use the IPs of the nodes directly.

It’s really easy to scale up and down. In case you add a nodepool (one more group of VMs in the GKE), all you’ll need to do is:

 

  1. Add the named port to the new group
  2. Add it to the backend service
  3. Update the firewall rule to use one more tag

Bonus Tip: To test the setup, modify mountPath: /data/incoming in the “sftp.yaml”. When deploying in cluster-south, name it mountPath: /data/incoming-1, and for cluster-southeast name it mountPath: /data/incoming-2. That way, if you log in to the sftp server and run ls, you will get the upload folder name and easily determine from what cluster and region the response is coming.

Note: sftp server is an example here. If you really want to deploy sftp services, the storage will not be consistent. This happens because each sftp server is using a Persistent Volume Claim running inside its own clusters. To easily manage the synchronization of the files and storage, you should mount one common Google Cloud Storage bucket in both the servers.

It’s as simple as that. Please let me know in the comments if you get stuck somewhere, or if you can suggest a better approach!