Connecting Competing Microservices using RabbitMQ
A common reason cited for breaking up an existing monolithic application into microservices is ‘scalability’. A quick search through StackOverflow shows that few people really get how to do this, and there are actually no really good answers to the question of how to coordinate competing microservices.
The core question many people have is that if multiple, identical, services are competing to consume an event, once the event is consumed by one service, how do you ensure that the other non-competing services still also get invoked.
In this article I explain how to do this with a simple hypothetical example, written in NodeJS, using RabbitMQ as the message bus¹.
Scenario
Let’s imagine you are building a system that retreives images from the ‘net, OCRs them to extract any textual content, and records any meta-data associated with that image. The OCR process is slow, so you want to parallelise this aspect of the system by running a number of competing OCR services.
Just for good measure, you also want to run multiple meta-data analysis services as well.
So you write three microservices, that will be linked together via messages sent through anAMQP
message bus.
- Image Detector: This fetches an image and emits an
'image.detected'
event viaAMQP
when done. - Image OCR Service: This listens for the
'image.detected'
event and, when it hears that event, it OCRs the image. You’ll run two competing instances of this service. - Image Meta-Data Service: This listens for the
'image.detected'
event as well and, when it hears it, extracts the image’s meta-data. You’ll also run two competing instances of this.
Both the ImageOCR
and ImageMetaData
services need to hear the 'image.detected'
event, but only one of each service ought to actually handle the message.
The overall system will look like this:
Of course in the real-world these services would also need a way to send the images to the other services, and store, or transmit, the data they extract. But the focus here is on the message exchange so I am ignoring those aspects.
Code
Full source at github.com/davesag/competing-services-example.
The Image Detector
Setting aside where it gets its images from, and how it stores or transfers them, let’s just focus on the event-publishing aspects of this service.
First define some constants that all of the services can share:
src/constants.js
const exchange = 'IMAGE-ANALYSIS'
const routingKey = 'image.detected'const OCR_QUEUE = 'OCR'
const META_DATA_QUEUE = 'META-DATA'module.exports = {
exchange,
routingKey,
OCR_QUEUE,
META_DATA_QUEUE
}
Then here’s the ImageDetector
(just the publisher part of it):
Note: I’m using amqp-simple-pub-sub
for this as 1) I wrote it, and 2) it makes building AMQP Pub/Sub systems very simple.
src/ImageDetector/index.js
const { makePublisher } = require('amqp-simple-pub-sub')
const { exchange } = require('../constants')const makeService = () => makePublisher({ exchange })const start = async () => {
const service = makeService()
await service.start()
return service
}module.exports = { start }
For the OCR
and MetaData
services, define a genericSubscriber
shell:
src/genericSubscriber.js
const { makeSubscriber } = require('amqp-simple-pub-sub')const { exchange, routingKey } = require('./constants')const genericSubscriber = (queueName, makeHandler) => {
const routingKeys = [routingKey] const makeService = () =>
makeSubscriber({ exchange, queueName, routingKeys }) const start = async name => {
const service = makeService()
const handler = makeHandler(service, name)
await service.start(handler)
return service
} return { start }
}module.exports = genericSubscriber
Then declare each service as follows:
src/ImageMetaData/index.js
const genericSubscriber = require('../genericSubscriber')
const makeHandler = require('./makeHandler')
const { META_DATA_QUEUE: queueName } = require('../constants')module.exports = genericSubscriber(queueName, makeHandler)
and:
src/ImageOCR/index.js
const genericSubscriber = require('../genericSubscriber')
const makeHandler = require('./makeHandler')
const { OCR_QUEUE: queueName } = require('../constants')module.exports = genericSubscriber(queueName, makeHandler)
The handlers
, which are where the real service implementations would live, are simply:
src/ImageMetaData/makeHandler.js
const makeHandler = (subscriber, name) => async message => {
const data = JSON.parse(message.content.toString())
console.log('Meta Data', name, data.meta)
// do service implementation.
subscriber.ack(message)
}module.exports = makeHandler
and:
src/ImageOCR/makeHandler.js
const makeHandler = (subscriber, name) => async message => {
const data = JSON.parse(message.content.toString())
console.log('OCR', name, data.text)
// do service implementation.
subscriber.ack(message)
}module.exports = makeHandler
Wiring It Together
Here is a simple index.js
file that instantiates the various services and runs the event publisher:
src/index.js
const ImageDetector = require('./ImageDetector')
const ImageMetaData = require('./ImageMetaData')
const ImageOCR = require('./ImageOCR')const { routingKey } = require('./constants')
const FIVE_SECONDS = 5000const start = async () => {
const imageDetector = await ImageDetector.start()
const listeners = await Promise.all([
ImageMetaData.start('MetaData A'),
ImageMetaData.start('MetaData B'),
ImageOCR.start('OCR A'),
ImageOCR.start('OCR B')
])
}// Go and get some image and generate a message.
// Obviously I'm just faking it here for brevity.
const run = async () => {
await start()
let count = 0 setInterval(async () => {
const message = JSON.stringify({
text: `This is message ${count}`,
meta: `This is meta-data for ${count}`
})
await imageDetector.publish(routingKey, message)
count += 1
}, FIVE_SECONDS)
}run()
.then(() => {
console.log('Services running.')
})
.catch(err => {
console.error(err)
})
Note I’ve left out details like handling ctrl-c
nicely to close down the services. See the complete source code for the full thing.
Set Up RabbitMQ
You’ll need RabbitMQ
running. Rather than pollute base reality, I like to run services like message queues, databases, and so forth inside Docker
containers.
This docker-compose.yml
file will do the job:
version: "3"
volumes:
rabbit-data:
driver: localservices:
amqp:
image: rabbitmq
ports:
- 15672:15672
- 5672:5672
volumes:
- rabbit-data:/var/lib/rabbitmq
healthcheck:
test: "exit 0"
So now, in your terminal, run:
docker-compose up -d
npm start
And voila:
> Services running.
> OCR OCR B This is message 0
> Meta Data MetaData A This is meta-data for 0
> Meta Data MetaData B This is meta-data for 1
> OCR OCR A This is message 1
> OCR OCR B This is message 2
> Meta Data MetaData A This is meta-data for 2
> Meta Data MetaData B This is meta-data for 3
> OCR OCR A This is message 3
... etc
The ImageDetector
emits a new message every 5 seconds and each of the OCR
and MetaData
services race to do their jobs.
Conclusion
It’s trivial to wire together microservices. The real design decisions you need to make are not how to use topic
exchanges and so forth, but simply how your routing keys should be structured.
Routing keys are the grammar of a microservices network, and will be the topic of another post.
Links
- AQMP Simple Pub/Sub: I wrote this to simplify the making of AMQP MicroService networks.
- Competing Services Example: It works, and has 100% test coverage. Yay.
- RabbitMQ: It’s awesome.
—
¹ You could also use Kafka and that’s what a lot of the answers in StackOverflow recommended, but Kafka is really designed to solve a different problem, namely that of building distributed streaming systems. Which is fine if you are building the next Netflix, but overkill if you are building an otherwise fairly simple microservice network.
—
Like this but not a subscriber? You can support the author by joining via davesag.medium.com.