How to connect to Azure Event Hubs for Kafka using Go

Abhishek Gupta
ITNEXT
Published in
8 min readOct 10, 2019

--

This blog will demonstrate how to interact with Event Hubs Kafka cluster using the Sarama Kafka client library. The sarama package provides a pure Go client that supports Kafka v 0.8 and above.

Azure Event Hubs is a streaming platform and event ingestion service, capable of receiving and processing millions of events per second. It also provides a Kafka endpoint that can be used by your existing Kafka based applications as an alternative to running your own Kafka cluster. Since Azure Event Hubs exposes a protocol that is binary compatible with Kafka versions 1.0, you can start using the Kafka endpoint from your existing applications with no code change but a minimal configuration change. This also supports frameworks like Kafka Connect(currently in preview), MirrorMaker etc.

You will learn about:

  • Code: how to configure and use the Sarama Go client to talk to Event Hubs Kafka endpoint and build producer, consumer apps
  • Setup: use Azure CLI to quickly bootstrap an Event Hubs for Kafka instance
  • Test: run the producer and consumer app to try the end to end scenario

as always, the code is available on GitHub

Pre-requisites

If you don’t have an Azure subscription, just create a free account and get going! You will also need the Azure CLI. Of course, you will need to have Go installed as well.

Code walkthrough

The app is pretty simple and consists of a producer and a consumer built using the Sarama Go client. Let’s skim through the code real quick

Configuration for connecting to Event Hubs for Kafka

You need to pass a sarama.Config object in order to create a producer or consumer instance.

func getConfig() *sarama.Config {
config := sarama.NewConfig()
config.Net.DialTimeout = 10 * time.Second
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = getEnv(eventHubsConnStringEnvVar)
config.Net.SASL.Mechanism = "PLAIN"
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: true,
ClientAuth: 0,
}
config.Version = sarama.V1_0_0_0
return config
}

Authentication is the key here — Azure Event Hubs (including the Kafka features) requires SSL or TLS for all communication and uses Shared Access Signatures (SAS) for authentication.

Please refer to the documentation for details regarding the security model

  • SASL is enabled with config.Net.SASL.Enable = true and uses the PLAIN mechanism (config.Net.SASL.Mechanism)
  • SASL User is defined using config.Net.SASL.User and is set to $ConnectionString (yes this is static)
  • SASL password (config.Net.SASL.Password) is set to the Event Hubs connection string (details to follow in the upcoming section)
  • Finally, TLS is enabled (config.Net.TLS.Enable) and config.Net.TLS.Config is set to an instance of TLS Config

The configuration for the producer and consumer (for this example) is the same except for config.Producer.Return.Successes = true which is required for the synchronous version of the producer

Things to watch out for:

  • If you do not configure TLS properly, you might see this error — Failed to start Sarama producer: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
  • At the time of writing this article, Event Hubs does not support message compression, so please ensure that you use the default value of config.Producer.Compression (which is none) or set it explicitly using config.Producer.Compression = sarama.CompressionNone. If not, you might see this error Failed to send msg: kafka server: The requested operation is not supported by the message format version.
  • Make sure you set the Kafka broker version to config.Version = sarama.V1_0_0_0 or higher

Event Hubs Kafka Producer

Create a producer instance passing in the Event Hubs broker and the required configuration (sarama.Config)

producer, err := sarama.NewSyncProducer(brokerList, getConfig())
if err != nil {
fmt.Println("Failed to start Sarama producer:", err)
os.Exit(1)
}

Start a goroutine for producing messages. This happens in an infinite for loop which can be stopped using ctrl+c

go func() {
for {
if producerOpen {
ts := time.Now().String()
msg := &sarama.ProducerMessage{Topic: eventHubsTopic, Key: sarama.StringEncoder("key-" + ts), Value: sarama.StringEncoder("value-" + ts)}
p, o, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Failed to send msg:", err)
continue
}
}
...
}
}()

For a clean exit, listen to the interrupt signal (ctrl+c) and close the producer

close := make(chan os.Signal)
signal.Notify(close, syscall.SIGTERM, syscall.SIGINT)
<-close
err = producer.Close()

Event Hubs Kafka Consumer

Create a consumer group instance passing in the Event Hubs broker

consumer, err := sarama.NewConsumerGroup(brokerList, consumerGroupID, getConfig())

Start consuming in a separate goroutine

go func() {
for {
err = consumer.Consume(ctx, []string{getEnv(eventHubsTopicEnvVar)}, messageHandler{})
....
if ctx.Err() != nil {
return
}
}
}()

messageHandler implements sarama.ConsumerGroupHandler (functions - Setup, Cleanup and ConsumeClaim). ConsumeClaim function is the important one. It is where you specify what to do with each message - in this example, it is logged to standard out and marked as consumed

func (h messageHandler) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {
for msg := range c.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
fmt.Println("Message content", string(msg.Value))
s.MarkMessage(msg, "")
}
return nil
}

The consumer can be stopped by exiting the program (pressing ctrl+c). For a clean exit, the consumer instance is closed

close := make(chan os.Signal)
signal.Notify(close, syscall.SIGTERM, syscall.SIGINT)
<-close
cancel()
if err := consumer.Close(); err != nil {
......
}

Ok, let’s create an Event Hubs cluster and try the end to end scenario.

Create your Kafka enabled Event Hubs cluster

If you have a cluster already, skip this and go to the “Event Hubs connection details” sub-section

Set environment variables:

export AZURE_SUBSCRIPTION=[to be filled]
export AZURE_RESOURCE_GROUP=[to be filled]
export AZURE_LOCATION=[to be filled]
export EVENT_HUBS_NAMESPACE=[name of the event hub namespace - to be filled]
export EVENT_HUB_NAME=[name of the event hub (topic) - to be filled]

Create the resource group if you don’t have one already

az account set --subscription $AZURE_SUBSCRIPTION
az group create --name $AZURE_RESOURCE_GROUP --location $AZURE_LOCATION

Create an Event Hubs namespace (similar to a Kafka Cluster)

For details on Event Hubs namespace, please refer to the Event Hubs documentation

az eventhubs namespace create --name $EVENT_HUBS_NAMESPACE --resource-group $AZURE_RESOURCE_GROUP --location $AZURE_LOCATION --enable-kafka true --enable-auto-inflate false

Documentation for az eventhubs namespace create

And then create an Event Hub (same as a Kafka topic)

az eventhubs eventhub create --name $EVENT_HUB_NAME --resource-group $AZURE_RESOURCE_GROUP --namespace-name $EVENT_HUBS_NAMESPACE --partition-count 10

Documentation for az eventhub create

Event Hubs connection details

Get the connection string and credentials for your cluster

For details, read how Event Hubs uses Shared Access Signatures for authorization

Start by fetching the Event Hub rule/policy name

az eventhubs namespace authorization-rule list --resource-group $AZURE_RESOURCE_GROUP --namespace-name $EVENT_HUBS_NAMESPACE

Documentation for az eventhubs namespace authorization-rule list

You will get a JSON output similar to below:

[
{
"id": "/subscriptions/qwerty42-ae29-4924-b6a7-dda0ea91d347/resourceGroups/foobar-resource/providers/Microsoft.EventHub/namespaces/foobar-event-hub-ns/AuthorizationRules/RootManageSharedAccessKey",
"location": "Southeast Asia",
"name": "RootManageSharedAccessKey",
"resourceGroup": "foobar-resource",
"rights": [
"Listen",
"Manage",
"Send"
],
"type": "Microsoft.EventHub/Namespaces/AuthorizationRules"
}
]

The authorization rule name is the value of the nameattribute (without the quotes), which in this case is RootManageSharedAccessKey

export EVENT_HUB_AUTH_RULE_NAME=RootManageSharedAccessKey

And, then make use of the rule name to extract the connection string

az eventhubs namespace authorization-rule keys list --resource-group $AZURE_RESOURCE_GROUP --namespace-name $EVENT_HUBS_NAMESPACE --name $EVENT_HUB_AUTH_RULE_NAME

Documentation for az eventhubs namespace authorization-rule keys list

You’ll get a JSON response as such:

{
"aliasPrimaryConnectionString": null,
"aliasSecondaryConnectionString": null,
"keyName": "RootManageSharedAccessKey",
"primaryConnectionString": "Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty=",
"primaryKey": "qwertyEiQHIirSNDPzqcqvZEUs6VAW+JIK3L46tqwerty",
"secondaryConnectionString": "Endpoint=sb://abhishgu-temp-event-hub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=qwertyPF2/YRGzxKmb06Z8NBFLCjnX38O7ch6aiYkN0=",
"secondaryKey": "qwertyPF2/YRGzxKmb06Z8NBqwertyX38O7ch6aiYk42="
}

The primary connection string is the value of the primaryConnectionString attribute (without the quotes), which in this case is "Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty=". Make a note of the connection string as you will be using it in the next step.

This information is sensitive — please excercise caution

Test producer and consumer

Clone the GitHub repository and navigate to the right directory:

git clone https://github.com/abhirockzz/eventhubs-kafka-go-sarama
cd eventhubs-kafka-go-sarama

Fetch the Sarama Kafka client library

go get github.com/Shopify/sarama

Producer

Set environment variables

export EVENTHUBS_CONNECTION_STRING=[value of primary connection string obtained in the previous step]
export EVENT_HUBS_NAMESPACE=[event hub namespace]
export EVENTHUBS_BROKER=$EVENT_HUBS_NAMESPACE.servicebus.windows.net:9093
export EVENTHUBS_TOPIC=[name of the event hub (topic)]

for EVENTHUBS_CONNECTION_STRING variable, please ensure that you include the double-quotes in the value received using the Azure CLI e.g. export EVENTHUBS_CONNECTION_STRING="Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty="

Start the producer

go run producer/sarama-producer.go

Once it starts, you should see the logs

Event Hubs broker [foo-bar.servicebus.windows.net:9093]
Event Hubs topic testhub
Waiting for program to exit...
sent message to partition 0 offset 1
sent message to partition 7 offset 1
sent message to partition 6 offset 1
sent message to partition 8 offset 1
sent message to partition 2 offset 1

To stop, just press ctrl+c on your terminal

Consumer

Start the consumer process in a different terminal. Set environment variables

export EVENTHUBS_CONNECTION_STRING=[value of primary connection string obtained in the previous step]
export EVENT_HUBS_NAMESPACE=[event hub namespace]
export EVENTHUBS_BROKER=$EVENT_HUBS_NAMESPACE.servicebus.windows.net:9093
export EVENTHUBS_TOPIC=[name of the event hub (topic) - to be filled]
export EVENTHUBS_CONSUMER_GROUPID=[name of consumer group e.g. testgroup]

for EVENTHUBS_CONNECTION_STRING variable, please ensure that you include the double-quotes in the value received using the Azure CLI e.g. export EVENTHUBS_CONNECTION_STRING="Endpoint=sb://foobar-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Nbaz0D42MT7qwerty6D/W51ao42r6EJuxR/zEqwerty="

Start the consumer

go run consumer/sarama-consumer.go

In the logs, you will see that the consumer group gets created and all the partitions (10 in this example) are allocated to it

Event Hubs broker [foo-bar.servicebus.windows.net:9093]
Sarama client consumer group ID testgroup
new consumer group created
Event Hubs topic testhub
Waiting for program to exit
Partition allocation - map[testhub:[0 1 2 3 4 5 6 7 8 9]]
Message topic:"testhub" partition:9 offset:45
Message content value-2019-10-08 16:12:23.704802 +0530 IST m=+1.003667284
Message topic:"testhub" partition:3 offset:32
Message content value-2019-10-08 17:05:42.388301 +0530 IST m=+0.912420074

Scale out…

In a different terminal, start another instance of the consumer. This will trigger a rebalance of the partitions and you will see that few (5 in this case) will get allocated to this (new) consumer instance

Event Hubs broker [foo-bar.servicebus.windows.net:9093]
Sarama client consumer group ID testgroup
new consumer group created
Event Hubs topic testhub
Waiting for program to exit
Partition allocation - map[testhub:[0 1 2 3 4]]

If you go back to the terminal for the first consumer instance, you will see that few partitions have been taken away as a result of the rebalancing

Consumer group clean up initiated
Partition allocation - map[testhub:[5 6 7 8 9]]

To stop, just press ctrl+c on your terminal

Now, both the consumers will share the workload and consume messages from Event Hubs. You keep scaling out by starting more consumer instances, but this will only be useful until you reach the point where the number of consumer instances is equal to the number of partitions. In essence, the number of partitions of your Events Hub is the unit of parallelism and scale.

That’s all for this blog. I would love to have your feedback and suggestions! Don’t be shy, just tweet or drop a comment. And, if you found this article useful, please like and follow 😃😃

--

--

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes