Together in electric dream bunnies. (Adapted from a public domain photo by rente42)

Connecting Competing Microservices using RabbitMQ

Dave Sag
ITNEXT

--

A common reason cited for breaking up an existing monolithic application into microservices is ‘scalability’. A quick search through StackOverflow shows that few people really get how to do this, and there are actually no really good answers to the question of how to coordinate competing microservices.

The core question many people have is that if multiple, identical, services are competing to consume an event, once the event is consumed by one service, how do you ensure that the other non-competing services still also get invoked.

In this article I explain how to do this with a simple hypothetical example, written in NodeJS, using RabbitMQ as the message bus¹.

Scenario

Let’s imagine you are building a system that retreives images from the ‘net, OCRs them to extract any textual content, and records any meta-data associated with that image. The OCR process is slow, so you want to parallelise this aspect of the system by running a number of competing OCR services.

Just for good measure, you also want to run multiple meta-data analysis services as well.

So you write three microservices, that will be linked together via messages sent through anAMQP message bus.

  1. Image Detector: This fetches an image and emits an 'image.detected' event via AMQP when done.
  2. Image OCR Service: This listens for the 'image.detected' event and, when it hears that event, it OCRs the image. You’ll run two competing instances of this service.
  3. Image Meta-Data Service: This listens for the 'image.detected' event as well and, when it hears it, extracts the image’s meta-data. You’ll also run two competing instances of this.

Both the ImageOCR and ImageMetaData services need to hear the 'image.detected' event, but only one of each service ought to actually handle the message.

The overall system will look like this:

Image Analysis Microservice Architecture using RabbitMQ

Of course in the real-world these services would also need a way to send the images to the other services, and store, or transmit, the data they extract. But the focus here is on the message exchange so I am ignoring those aspects.

Code

Full source at github.com/davesag/competing-services-example.

The Image Detector

Setting aside where it gets its images from, and how it stores or transfers them, let’s just focus on the event-publishing aspects of this service.

First define some constants that all of the services can share:

src/constants.js

const exchange = 'IMAGE-ANALYSIS'
const routingKey = 'image.detected'
const OCR_QUEUE = 'OCR'
const META_DATA_QUEUE = 'META-DATA'
module.exports = {
exchange,
routingKey,
OCR_QUEUE,
META_DATA_QUEUE
}

Then here’s the ImageDetector (just the publisher part of it):

Note: I’m using amqp-simple-pub-sub for this as 1) I wrote it, and 2) it makes building AMQP Pub/Sub systems very simple.

src/ImageDetector/index.js

const { makePublisher } = require('amqp-simple-pub-sub')
const { exchange } = require('../constants')
const makeService = () => makePublisher({ exchange })const start = async () => {
const service = makeService()
await service.start()
return service
}
module.exports = { start }

For the OCR and MetaData services, define a genericSubscriber shell:

src/genericSubscriber.js

const { makeSubscriber } = require('amqp-simple-pub-sub')const { exchange, routingKey } = require('./constants')const genericSubscriber = (queueName, makeHandler) => {
const routingKeys = [routingKey]
const makeService = () =>
makeSubscriber({ exchange, queueName, routingKeys })
const start = async name => {
const service = makeService()
const handler = makeHandler(service, name)
await service.start(handler)
return service
}
return { start }
}
module.exports = genericSubscriber

Then declare each service as follows:

src/ImageMetaData/index.js

const genericSubscriber = require('../genericSubscriber')
const makeHandler = require('./makeHandler')
const { META_DATA_QUEUE: queueName } = require('../constants')
module.exports = genericSubscriber(queueName, makeHandler)

and:

src/ImageOCR/index.js

const genericSubscriber = require('../genericSubscriber')
const makeHandler = require('./makeHandler')
const { OCR_QUEUE: queueName } = require('../constants')
module.exports = genericSubscriber(queueName, makeHandler)

The handlers, which are where the real service implementations would live, are simply:

src/ImageMetaData/makeHandler.js

const makeHandler = (subscriber, name) => async message => {
const data = JSON.parse(message.content.toString())
console.log('Meta Data', name, data.meta)
// do service implementation.
subscriber.ack(message)
}
module.exports = makeHandler

and:

src/ImageOCR/makeHandler.js

const makeHandler = (subscriber, name) => async message => {
const data = JSON.parse(message.content.toString())
console.log('OCR', name, data.text)
// do service implementation.
subscriber.ack(message)
}
module.exports = makeHandler

Wiring It Together

Here is a simple index.js file that instantiates the various services and runs the event publisher:

src/index.js

const ImageDetector = require('./ImageDetector')
const ImageMetaData = require('./ImageMetaData')
const ImageOCR = require('./ImageOCR')
const { routingKey } = require('./constants')
const FIVE_SECONDS = 5000
const start = async () => {
const imageDetector = await ImageDetector.start()
const listeners = await Promise.all([
ImageMetaData.start('MetaData A'),
ImageMetaData.start('MetaData B'),
ImageOCR.start('OCR A'),
ImageOCR.start('OCR B')
])
}
// Go and get some image and generate a message.
// Obviously I'm just faking it here for brevity.
const run = async () => {
await start()
let count = 0
setInterval(async () => {
const message = JSON.stringify({
text: `This is message ${count}`,
meta: `This is meta-data for ${count}`
})
await imageDetector.publish(routingKey, message)
count += 1
}, FIVE_SECONDS)
}
run()
.then(() => {
console.log('Services running.')
})
.catch(err => {
console.error(err)
})

Note I’ve left out details like handling ctrl-c nicely to close down the services. See the complete source code for the full thing.

Set Up RabbitMQ

You’ll need RabbitMQ running. Rather than pollute base reality, I like to run services like message queues, databases, and so forth inside Docker containers.

This docker-compose.yml file will do the job:

version: "3"
volumes:
rabbit-data:
driver: local
services:
amqp:
image: rabbitmq
ports:
- 15672:15672
- 5672:5672
volumes:
- rabbit-data:/var/lib/rabbitmq
healthcheck:
test: "exit 0"

So now, in your terminal, run:

docker-compose up -d
npm start

And voila:

> Services running. 
> OCR OCR B This is message 0
> Meta Data MetaData A This is meta-data for 0
> Meta Data MetaData B This is meta-data for 1
> OCR OCR A This is message 1
> OCR OCR B This is message 2
> Meta Data MetaData A This is meta-data for 2
> Meta Data MetaData B This is meta-data for 3
> OCR OCR A This is message 3
... etc

The ImageDetector emits a new message every 5 seconds and each of the OCR and MetaData services race to do their jobs.

Conclusion

It’s trivial to wire together microservices. The real design decisions you need to make are not how to use topic exchanges and so forth, but simply how your routing keys should be structured.

Routing keys are the grammar of a microservices network, and will be the topic of another post.

Links

¹ You could also use Kafka and that’s what a lot of the answers in StackOverflow recommended, but Kafka is really designed to solve a different problem, namely that of building distributed streaming systems. Which is fine if you are building the next Netflix, but overkill if you are building an otherwise fairly simple microservice network.

Like this but not a subscriber? You can support the author by joining via davesag.medium.com.

--

--