A Workerpool From Scratch in TypeScript and Node
The await
and async
keywords are loved by JavaScript developers for how they enable complex asynchronous logic to be writing in an elegant way. But there is one thing that is impossible with them: ‘real’ concurrency where multiple chunks of code run at the same time. The architecture of JavaScript is setup in such a way that there is always one single process running on a single thread that executes all the code. Asynchronous behavior is achieved by using a queue that holds the next chunks of code that will be executed. With tactical scheduling on this queue a function can be halted until a certain background tasks that runs outside the node process has completed, like a file operation or an network call. But it is impossible to let other JavaScript functions continue while another JavaScript function is running.
In the code sample above the api call will be made, but everything in the api
function after that will never be executed because the process is blocked by the while loop. This can become a serious issue in big node applications that process a lot of data. To solve this problem workers where introduced. They allow developers to start an entirely new node process that runs a script and can communicate with the main process via an event system. However, this really is a new node instance and has no shared memory like thread based systems have (e.g. multi-threading in C# or Java). This means that when you send data to a worker it will be cloned and recreated in the other runtime. This can be confusing when passing objects around and mutating them in one process. Those mutations will not be reflected on the other side! Modules will also have an instance per node process, breaking their default singleton behavior.
Resource pooling
In the previous paragraph we have discussed that creating workers will create a new node instance. This means that is a relative heavy task to create new workers. There is also the risk of creating to many workers and flooding the host system with processes. This is where resource pooling comes in. We will create a certain amount of workers when we start our application and reuse those every time we want to run a task on another process. This pool of workers will be managed by the pool manager who accepts tasks and will assign them to an idle worker. The functions themselves will be wrapped in the Task
interface which has methods for easy chaining and running of tasks. We can use Promises to make it easy to deal with this ‘real’ asynchronous code, just like with network and file operations that the node host runs for us in the background.
Creating a pool
We will start of by defining the general shape of our architecture. There will be one workerpool that will contain all the workers and queues with tasks. I decided to give the workerpool one public method: createTask
. This method will create a task that will run inside the pool that it was created in. The Task
interface also has a single public method: runAsync
. This method will run the wrapped function on a worker. The return value of this function is a Promise that resolves when a worker is done executing the function. Those interfaces are defined as follows:
Implementing those interface will start of with creating a constructor for the workerpool. This function start of with defining all the internals of the pool:
- a map of ids and workers that holds all the workers that are part of our pool. Those workers will be created a single time and be reused for all the tasks the workerpool is going to execute.
- a collection with the ids of the workers that are idle and ready to accept an new task.
- the backlog with tasks that have been queued and will be executed ASAP. Those tasks exist of an id, a function and the additional data to send to the worker. All the data here is typed as
any
, type safety will be introduced in theTask
interface - a map with taskid and resolvers. A resolver is the function that will be called when a worker has completed the task. Every task will have its own resolver.
- a counter that will be used to assign incremental task identifiers.
The workers are constructed with a script that will be the main process of that node instance. We will define its content later on. With those definitions in place we can start adding the logic for executing tasks. We will start this of by creating a function that will run the next tasks on the backlog if there is a worker ready. For this a certain steps have to be taken:
- check if we have both a task and an idle worker
- take the next task and idle worker
- build the message for the worker. We cannot send functions between workers so we have to turn it into a string
- send the task to the worker
At the end we will call runNext
again. In this way we will keep scheduling tasks on workers as long as we have idle workers and tasks left. runNext
should be called every time we touch either the backlog or the idle
collection. In this way we will always schedule the tasks as soon as possible.
After this is it time to create the script that will run inside the workers. This script will listen for tasks to arrive. When a task arrives it will run them and send the result back with the identifier. To run our function we will have to first turn it into named function. After that we can use eval
to run compile and run it. The script can be found below:
The next thing that will happen is that the workerpool receives the result and resolves the task. For this we add a message
listener to all the workers. Inside here we call the resolver function for the task id and put the worker on the list of idle tasks. At the end we call runNext
so that the backlog will be processed and the new idle worker gets a task.
Creating the tasks
Now we have the internals of the workerpool is in place we can start creating and queuing tasks. The tasks will be created by a function that gets returned from createWorkerpool
. This means that a task is always bounded to a specific workerpool. As the result of this our structure will look like this:
Inside the createTask
nothing exiting happens, it just returns an object with the function that will run the task and uses the pool. For running the task we have to take to following steps:
- create a new taskid
- add the task to the backlog
- create a Promise and add the resolver to the resolvers
- return the Promise
- call
runNext
to set the system in motion
Note that we set the resolver inside the Promise constructor to a map that in a higher closure. In this way we can return the Promise and resolve it later on from the outside.
Now it is time to give our system it first test run. In the example below I run 3 function in the background that calculate the Fibonacci sequence of different numbers. I you read the console output you will see that the task run in parallel en finish at different times.
Chaining tasks
A commonly used features of Promises is to chain multiple of them in sequence where every next one depends on the output of the previous. The api call in the introduction of this article is good example of this. We use the result of fetch
as to call json
on and the result of that we used to extract the value
property. I will also the then
function to our Task
interface to allow this kind of chaining of background processes. The updated interface looks like this:
For the implementation we simple override the runAsync
function to first run the current task and when that has resolved run the next task. With this function we can endlessly chain task together to create bigger processes that make optimal use of the workerpool’ queuing abilities.
This feature is used like this:
Conclusion
In this article I have discusses some of the biggest weaknesses and strongest features of JavaScript. Around this we have written a managed workerpool that allows us to go further down the concurrency path then JavaScript was ever designed to do. The source code can be found on Github. This article is a follow up on my latest article on ITNEXT which you may like: