ITNEXT

ITNEXT is a platform for IT developers & software engineers to share knowledge, connect, collaborate, learn and experience next-gen technologies.

Follow publication

Change Data Capture with Azure, PostgreSQL, and Kafka

Abhishek Gupta
ITNEXT
Published in
14 min readNov 2, 2020

--

Introduction

Overview

Source and Destination

Kafka and Kafka Connect

Docker Compose services

zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
dataexplorer-connector:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8080:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=adx_connect_configs
- OFFSET_STORAGE_TOPIC=adx_connect_offsets
- STATUS_STORAGE_TOPIC=adx_connect_statuses
postgres-connector:
image: debezium/connect:1.2
ports:
- 9090:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=pg
- CONFIG_STORAGE_TOPIC=pg_connect_configs
- OFFSET_STORAGE_TOPIC=pg_connect_offsets
- STATUS_STORAGE_TOPIC=pg_connect_statuses
FROM debezium/connect:1.2
WORKDIR $KAFKA_HOME/connect
ARG KUSTO_KAFKA_SINK_VERSION
RUN curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar
orders-gen:
build:
context: ./orders-generator
environment:
- PG_HOST=<postgres host>
- PG_USER=<postgres username>
- PG_PASSWORD=<postgres password>
- PG_DB=<postgres db name>

Pre-requisites

git clone https://github.com/abhirockzz/kafka-adx-postgres-cdc-demo
cd kafka-adx-postgres-cdc-demo

Setup and configure Azure Data Explorer

.create table Orders (orderid: string, custid: string, city: string, amount: int, purchase_time: datetime).create table Orders ingestion json mapping 'OrdersEventMapping' '[{"column":"orderid","Properties":{"path":"$.orderid"}},{"column":"custid","Properties":{"path":"$.custid"}},{"column":"city","Properties":{"path":"$.city"}},{"column":"amount","Properties":{"path":"$.amount"}},{"column":"purchase_time","Properties":{"path":"$.purchase_time"}}]'
.alter table Orders policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:30", "MaximumNumberOfItems": 500, "MaximumRawDataSizeMB": 1024}'.show table <enter database name>.Orders policy ingestionbatching
az ad sp create-for-rbac -n "adx-sp"
{
"appId": "fe7280c7-5705-4789-b17f-71a472340429",
"displayName": "kusto-sp",
"name": "http://kusto-sp",
"password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
"tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
}
.add database <enter database name> admins  ('aadapp=<enter service principal appId>;<enter service principal tenant>') 'AAD App'

Setup and configure Azure PostgreSQL DB

az postgres server configuration set --resource-group <name of resource group> --server-name <name of server> --name azure.replication_support --value logical
psql -h <POSTGRESQL_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require//example
psql -h my-pgsql.postgres.database.azure.com -p 5432 -U foo@my-pgsql -W -d postgres --set=sslmode=require
CREATE SCHEMA retail;CREATE TABLE retail.orders_info (
orderid SERIAL NOT NULL PRIMARY KEY,
custid INTEGER NOT NULL,
amount INTEGER NOT NULL,
city VARCHAR(255) NOT NULL,
purchase_time VARCHAR(20) NOT NULL
);

Start Docker containers

docker-compose --project-name adx-kafka-cdc up --build
docker-compose -p adx-kafka-cdc ps
//output Name Command State Ports
--------------------------------------------------------------------------------------------------------------------------
adx-kafka-cdc_dataexplorer-connector_1 /docker-entrypoint.sh start Up 0.0.0.0:8080->8083/tcp, 8778/tcp, 9092/tcp,
9779/tcp
adx-kafka-cdc_kafka_1 /docker-entrypoint.sh start Up 8778/tcp, 0.0.0.0:9092->9092/tcp, 9779/tcp
adx-kafka-cdc_orders-gen_1 /orders-gen Up
adx-kafka-cdc_postgres-connector_1 /docker-entrypoint.sh start Up 0.0.0.0:9090->8083/tcp, 8778/tcp, 9092/tcp,
9779/tcp
adx-kafka-cdc_zookeeper_1 /docker-entrypoint.sh start Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp,
8778/tcp, 9779/tcp
psql -h <POSTGRESQL_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=requireselect * from retail.orders_info order by orderid desc limit 5;
orderid | custid | amount |   city    |    purchase_time    
---------+--------+--------+-----------+---------------------
10 | 77 | 140 | Seattle | 2020-10-09 07:10:49
9 | 541 | 186 | Cleveland | 2020-10-09 07:10:46
8 | 533 | 116 | Cleveland | 2020-10-09 07:10:42
7 | 225 | 147 | Chicago | 2020-10-09 07:10:39
6 | 819 | 184 | Austin | 2020-10-09 07:10:36
(5 rows)

Debezium PostgreSQL source connector setup

{
"name": "pg-orders-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<enter database name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<enter admin username>@<enter database name>",
"database.password": "<enter admin password>",
"database.dbname": "postgres",
"database.server.name": "myserver",
"plugin.name": "wal2json",
"table.whitelist": "retail.orders_info",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:9090/connectors# to confirm
curl http://localhost:9090/connectors/pg-orders-source
docker exec -it adx-kafka-cdc_kafka_1 bash
cd bin && ./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server kafka:9092 --from-beginning
{
"schema": {....},
"payload": {
"before": null,
"after": {
"orderid": 51,
"custid": 306,
"amount": 183,
"city": "Austin",
"purchase_time":"2020-10-09 07:23:10"
},
"source": {
"version": "1.2.1.Final",
"connector": "postgresql",
"name": "myserver",
"ts_ms": 1602057392691,
"snapshot": "false",
"db": "postgres",
"schema": "retail",
"table": "orders_info",
"txId": 653,
"lsn": 34220200,
"xmin": null
},
"op": "c",
"ts_ms": 1602057392818,
"transaction": null
}
}

Azure Data Explorer sink connector setup

{
"name": "adx-orders-sink",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 30000,
"tasks.max": 2,
"topics": "myserver.retail.orders_info",
"kusto.tables.topics.mapping": "[{'topic': 'myserver.retail.orders_info','db': '<enter database name>', 'table': 'Orders','format': 'json', 'mapping':'OrdersEventMapping'}]",
"aad.auth.authority": "<enter tenant ID from service principal info>",
"kusto.url": "https://ingest-<enter cluster name>.<enter region>.kusto.windows.net",
"aad.auth.appid": "<enter app ID from service principal info>",
"aad.auth.appkey": "<enter password from service principal info>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
{
"orderid": 51,
"custid": 306,
"amount": 183,
"city": "Austin",
"purchase_time":"2020-10-09 07:23:10"
}
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8080/connectors# check status
curl http://localhost:8080/connectors/adx-orders-sink/status

Query Azure Data Explorer

Orders
| where city == 'New York'
Orders
| where city == 'New York'
| project amount, purchase_time
| sort by amount
Orders
| summarize avg_sales = avg(amount) by city
| render columnchart
Orders 
| summarize total = sum(amount) by city
| sort by total
| render piechart
Orders
| summarize orders = count() by city
| sort by orders
| render linechart
Orders
| extend hour = floor(purchase_time % 1d , 10m)
| summarize event_count=count() by hour
| sort by hour asc
| render timechart
Orders
| extend hour= floor( purchase_time % 1d , 10m)
| where city in ("New Delhi", "Seattle", "New York", "Austin", "Chicago", "Cleveland")
| summarize event_count=count() by hour, city
| render columnchart

Azure Data Explorer Dashboards

Clean up

docker-compose -p adx-kafka-cdc down -v
az postgres server delete -g <resource group name> -n <server name>
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Conclusion

Additional resources

--

--

Published in ITNEXT

ITNEXT is a platform for IT developers & software engineers to share knowledge, connect, collaborate, learn and experience next-gen technologies.

Written by Abhishek Gupta

Principal Product Manager at Microsoft | I ❤️ Databases, Go, Kubernetes

No responses yet

Write a response