Step-by-Step Guide: Deploying Kafka Connect via Strimzi Operator on Kubernetes

Hamed Karbasi
ITNEXT
Published in
7 min readApr 25, 2023

--

Photo by Ryoji Iwata on Unsplash

Strimzi is almost the richest Kubernetes Kafka operator, which you can utilize to deploy Apache Kafka or its other components like Kafka Connect, Kafka Mirror, etc. This article will provide a step-by-step tutorial about deploying Kafka Connect on Kubernetes. I brought all issues I encountered during the deployment procedure and their best mitigation.

Note: Consider that this operator is based on Apache Kafka, not the Confluent Platform. That’s why you may need to add some confluent artifacts like Confluent Avro Converter to get the most out of it.

This article is based on Strimzi v0.29.0. Thus you're able to install the following versions of Kafka Connect:

  • Strimzi: 0.29.0
  • Apache Kafka & Kafka Connect: Up to 3.2
  • Equivalent Confluent Platform: 7.2.4

Note: You can convert Confluent Platform version to Apache Kafka version and vice versa with the provided table here.

Installation

Openshift GUI and Kubernetes CLI

If you're using Openshift, navigate to Operators > installed Operators > Strimzi > Kafka Connect.

Srimzi Operator in Openshift Panel

Now you will face a form containing the Kafka connect configurations. You can get the equivalent Yaml file of the form by clicking on Yaml View. Any update on the form view will be applied to the Yaml view on the fly. Although the form view is relatively straightforward, It's strongly recommended not to use it for creating the instance directly. Use it only for converting your desired configuration to a Yaml file and then deploy the operator with the kubectl apply command. So to summarize:

  1. Enter the configuration in the form view
  2. Click on Yaml view
  3. Copy its contents to a Yaml file on your local (e.g. kafka-connect.yaml)
  4. Run: kubectl apply -f kafka-connect.yaml

Now the Kafka-Connect kind should be deployed or updated. The deployed resources consist of Deployment and pods, Service, config maps, and secrets.

Let's get through the minimum configuration and make it more advanced, step by step.

Minimum Configuration

To deploy a simple minimum configuration of Kafka Connect, you can use the below Yaml:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: <YOUR_PROJECT_NAME>
spec:
config:
config.storage.replication.factor: -1
config.storage.topic: okd4-connect-cluster-configs
group.id: okd4-connect-cluster
offset.storage.replication.factor: -1
offset.storage.topic: okd4-connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: okd4-connect-cluster-status
bootstrapServers: kafka1, kafka2
version: 3.2.0
replicas: 1

You can have the Kafka Connect Rest API on port 8083 exposed on the pod. You can expose it on a private or internal network by defining a route on OKD.

REST API Authentication

With the configuration explained here, you can add authentication to the Kafka Connect REST proxy. Unfortunately, that doesn't work on the Strimzi operator, as discussed here. So to provide security on Kafka Connect, you've two options:

  1. Use the Kafka Connector operator API. Strimzi operator lets you have a Connector kind defined in a YAML file. However, it may not be practical for some use cases since updating, pausing, and stopping connectors via the REST API is necessary.
  2. Put the insecure REST API behind an authenticated API Gateway like Apache APISIX or any other tool or self-developed application.

JMX Prometheus Metrics

To expose JMX Prometheus Metrics, useful for observing connectors statuses in Grafana, add the below configuration:

  metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: jmx-prometheus
name: configs
jmxOptions: {}

It uses a pre-defined config for Prometheus export. You can use this config:

startDelaySeconds: 0
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
rules:
- pattern : "kafka.connect<type=connect-worker-metrics>([^:]+):"
name: "kafka_connect_connect_worker_metrics_$1"
- pattern : "kafka.connect<type=connect-metrics, client-id=([^:]+)><>([^:]+)"
name: "kafka_connect_connect_metrics_$2"
labels:
client: "$1"
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^,]+), key=([^>]+)><>RowsScanned"
name: "debezium_metrics_RowsScanned"
labels:
plugin: "$1"
name: "$3"
context: "$2"
table: "$4"
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^>]+)>([^:]+)"
name: "debezium_metrics_$4"
labels:
plugin: "$1"
name: "$3"
context: "$2"

Service for External Prometheus

If you are intended to deploy Prometheus in companion with Strimzi to collect the metrics, follow the instructions here. However, in the case of using external Prometheus, the story goes another way:

Strimzi operator only creates port mapping in Service for these ports:

  • 8083: Kafka Connect REST API
  • 9999: JMX port

Sadly it doesn't create a mapping for port 9404, the Prometheus exporter HTTP port. So we've to create a service on our own:

kind: Service
apiVersion: v1
metadata:
name: kafka-connect-jmx-prometheus
namespace: kafka-connect
labels:
app.kubernetes.io/instance: kafka-connect
app.kubernetes.io/managed-by: strimzi-cluster-operator
app.kubernetes.io/name: kafka-connect
app.kubernetes.io/part-of: strimzi-kafka-connect
strimzi.io/cluster: kafka-connect
strimzi.io/kind: KafkaConnect
spec:
ports:
- name: tcp-prometheus
protocol: TCP
port: 9404
targetPort: 9404
type: ClusterIP
selector:
strimzi.io/cluster: kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: kafka-connect-connect
status:
loadBalancer: {}

Note: This method only works for single-pod deployments since you should define a route for the service, and even in the case of headless service, the route returns one IP of a pod at a time. Hence, Prometheus can’t scrape all pods metrics. That’s why it is recommended to use Podmonitor and Prometheus on Cloud. This issue is discussed here

Plugins and Artifacts

To add plugins and artifacts, there are two ways:

Operator Build Section

To add plugins, you can use the operator build section. It gets the plugin or artifact addresses, downloads them in the build stage (The operator creates the build config automatically), and adds them to the plugin directory of the image.

It supports jar, tgz, zip, and maven. However, in the case of Maven, a multi-stage Dockerfile is created, which is problematic to Openshift, and it faces failure in the build stage. Hence, you should only use other types that don't need compile stage (i.e., jar, zip, tgz) and end up with a single-stage Dockerfile.

For example, to add the Debezium MySQL plugin, you can use the below configuration:

spec:  
build:
output:
image: 'kafkaconnect:1.0'
type: imagestream
plugins:
- artifacts:
- type: tgz
url: >-
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.4.Final/debezium-connector-mysql-2.1.4.Final-plugin.tar.gz
name: debezium-connector-mysql

Note: Strimzi operator is only able to download public artifacts. So if you wish to download a privately secured artifact that is not accessible by Kubernetes, you’ve to give up this method and follow the next one.

Changing Image

The operator is able to use your desired image instead of its default one. Thus you can add your selected artifacts and plugins by building an image manually or via CI/CD. One of the other reasons why you may want to use this method is that Strimzi uses Apache Kafka image, not the Confluent Platform. So the deployments don't have Confluent applicable packages like Confluent Avro Converter, etc. So you need to add them to your image and configure the operator to use your docker image.

For example, If you want to add your customized Debezium MySQL Connector plugin from Gitlab Generic Packages and Confluent Avro Converter to the base image, first use this Dockerfile:

ARG CONFLUENT_VERSION=7.2.4

# Install confluent avro converter
FROM confluentinc/cp-kafka-connect:${CONFLUENT_VERSION} as cp
# Reassign version
ARG CONFLUENT_VERSION
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:${CONFLUENT_VERSION}

# Copy privious artifacts to the main strimzi kafka image
FROM quay.io/strimzi/kafka:0.29.0-kafka-3.2.0
ARG GITLAB_TOKEN
ARG CI_API_V4_URL=https://gitlab.snapp.ir/api/v4
ARG CI_PROJECT_ID=3873
ARG DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION=1.0
USER root:root

# Copy Confluent packages from previous stage
RUN mkdir -p /opt/kafka/plugins/avro/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/

# Connector plugin debezium-connector-mysql
RUN 'mkdir' '-p' '/opt/kafka/plugins/debezium-connector-mysql' \
&& curl --header "${GITLAB_TOKEN}" -f -L \
--output /opt/kafka/plugins/debezium-connector-mysql.tgz \
${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/debezium-customized/${DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION}/debezium-connector-mysql-customized.tar.gz \
&& 'tar' 'xvfz' '/opt/kafka/plugins/debezium-connector-mysql.tgz' '-C' '/opt/kafka/plugins/debezium-connector-mysql' \
&& 'rm' '-vf' '/opt/kafka/plugins/debezium-connector-mysql.tgz'

USER 1001

Build the image. Push it to the image stream or any other docker repository and configure the operator by adding the below line:

spec:  
image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0

Depending on its type, you must use different configurations to add Kafka authentication. However, to bring an example, here you can see the configuration for Kafka with SASL/Plaintext mechanism and scram-sha-512:

spec:
authentication:
passwordSecret:
password: kafka-password
secretName: mysecrets
type: scram-sha-512
username: myuser

No need to say that you must provide the password in a secret file named mysecret.

Handling File Credentials

Since connectors need credentials to access databases, you've to define them as secrets and access them with environment variables. However, if there are too many of them, you can put all credentials in a file and address them in the connector with the $file modifier:

  1. Put all credentials as the value of a key named credentials in a secret file.

Credentials file:

USERNAME_DB_1=user1
PASSWORD_DB_1=pass1

USERNAME_DB_2=user2
PASSWORD_DB_2=pass2

Secret file:

kind: Secret
apiVersion: v1
metadata:
name: mysecrets
namespace: kafka-connect
data:
credentials: <BASE64 YOUR DATA>

2. Configure the operator with the secret as volume:

spec:
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: database_credentials
secret:
items:
- key: credentials
path: credentials
optional: false
secretName: mysecrets

3. Now, in the connector, you can access PASSWORD_DB_1 with the below command:

"${file:/opt/kafka/external-configuration/database_credentials/credentials:PASSWORD_DB_1}"

Put it all together

If we put all configurations together, we'll have the below configuration for Kafka Connect:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
namespace: kafka-connect
spec:
authentication:
passwordSecret:
password: kafka-password
secretName: mysecrets
type: scram-sha-512
username: myuser
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
config.storage.replication.factor: -1
config.storage.topic: okd4-connect-cluster-configs
group.id: okd4-connect-cluster
offset.storage.replication.factor: -1
offset.storage.topic: okd4-connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: okd4-connect-cluster-status
bootstrapServers: 'kafka1:9092, kafka2:9092'
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: jmx-prometheus
name: configs
resources:
limits:
memory: 1Gi
requests:
memory: 1Gi
readinessProbe:
failureThreshold: 10
initialDelaySeconds: 60
periodSeconds: 20
jmxOptions: {}
livenessProbe:
failureThreshold: 10
initialDelaySeconds: 60
periodSeconds: 20
image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0
version: 3.2.0
replicas: 2
externalConfiguration:
volumes:
- name: database_credentials
secret:
items:
- key: credentials
path: credentials
optional: false
secretName: mysecrets

Note: Service, route and build configuration are ommited since we’ve discussed earlier in the article.

Conclusion

In conclusion, deploying Kafka Connect using the Strimzi Operator can be a powerful and efficient way to manage data integration in your organization. By leveraging the flexibility and scalability of Kafka, along with the ease of use and automation provided by the Strimzi Operator, you can streamline your data pipelines and improve your data-driven decision-making. In this article, I've covered the key steps involved in deploying Kafka Connect via the Strimzi Operator, including creating its minimal custom resource definition (CRD), REST API Basic authentication issue, Kafka Authentication, JMX Prometheus metrics, plugins and artifacts and handling file credentials. Following these steps, you can easily customize your Kafka Connect deployment to meet your specific needs.

--

--

Hi there! I'm an enthusiastic software engineer with a passion for data-intensive challenges and an interest in distributed systems and infrastructures.