Scalable Real-Time Apps with Python and Redis: Exploring AsyncIO, FastAPI, and Pub/Sub

Mohammad Hoseini Rad
ITNEXT
Published in
10 min readJul 25, 2023

--

I have recently started working on a side project, a real-time mobile board game with Unity, and I decided to use Python as the server’s programming language. In recent years, Python has introduced mature async I/O frameworks, such as AsyncIO and AnyIO. And web frameworks such as FastAPI are utilizing Python’s asynchronous features to introduce high-performance backend servers. In this article, we discuss concurrency, AsyncIO, and how we combine them with WebSocket and Redis to build a real-time application.

In this tutorial we will be using Upstash’s free Redis cluster. Their free plan supports up to 10k free messages per day plus a free Kafka cluster. More about them in the article.

Let’s dive into the AsyncIO library.

I found the best way to understand a topic is with a good example. So, let’s say we are building an application that connects to an external resource and then shows the result to the client. It’s actually what most of your typical backend projects are, reading from a database, searching in ElasticSearch, or even reading from a cache.

import asyncio
from dataclasses import dataclass
from typing import Dict

from fastapi import FastAPI

app = FastAPI()


@dataclass
class UserInfo:
id: int
name: str
score: int


users: Dict[int, UserInfo] = {
1: UserInfo(1, "Amir", 12),
2: UserInfo(2, "Alex", 15),
3: UserInfo(3, "Sara", 9),
}


@app.get("/user/{user_id}")
async def get_user(user_id: int):
await asyncio.sleep(0.2) # for exaggeration
if user_id in users:
return {"ok": True, "user": users[user_id]}
return {"ok": False, "error": "user not founded"}

I made this pretty simple example of a server that stores user information that can be retrieved by calling the /users/{user_id} API.

Imagine we have a dumb business logic that we want to retrieve three users and combine their scores.

The old-school way to achieve it is by retrieving users one by one and combining their scores. Let’s write a code that executes in this manner:

def get_user_info(user_id: int) -> dict | None:
response = requests.get(f"http://127.0.0.1:8000/user/{user_id}").json()
if 'ok' not in response or not response['ok']:
return None
return response['user']

This function connects to the server, and either returns the user information or None.

def combine_scores(ids: List[int]) -> None:
users = [get_user_info(user_id) for user_id in ids]
scores = [user['score'] for user in users if user is not None]
print(sum(scores))

And then, this function removes None items and calculates the sum of them.

def run_and_analyze(method: Callable) -> None:
start = time.time_ns()
method()
duration = time.time_ns() - start
duration_ms = duration / 1_000_000
print("took {}ms".format(duration_ms))

I made this simple function that gets and executes a callable function to analyze its execution time.

ids = [1, 2, 3]


def test_sync():
combine_scores(ids)


if __name__ == "__main__":
print("Sync:")
run_and_analyze(test_sync)

And in the end, I run and analyze the test_sync function:

Sync:
36
took 612.8564ms

Well, it makes sense. 200ms*3=600ms. However, we are wasting tons of resources. The CPU is idle mostly, and the program is waiting for the result. Imagine the combine_scores function was one of the business logic in a popular API, and we ran the app on a 4-Core CPU:

As you can see, each core can handle around 1.6 requests per second, and for all cores combined, this server can handle around 6.5 requests per second. But it is not efficient at all! The CPU is idle and being wasted mostly.

The red areas represent when the CPU is idle and waiting for the request to be completed. We have talked about the problem only. Let’s discuss a solution that Node.js is mostly known for.

What does Concurrency mean?

Look at this alternative. This is what happens in most enterprise companies. By using concurrency, you are squeezing the performance out of “ONE” CPU core to achieve. Red areas represent CPU usage, and white areas are just I/O waiting.

As you can see, if you draw an imaginary line like this from top to bottom, you will not find it crossing more than one read area. The reason is that concurrency does not mean parallelism. We still have only one CPU core. However, by using it more intelligently, we reached a higher request per second capability, improving from 1.6rps to 7rps for each core, which means 28rps for four cores combined.

Let’s use Async.IO to achieve concurrency.

While working with async applications in Python, you should use async-compatible libraries. For instance, in this article, we will be using aiohttp instead of the requests package for HTTP requests.

async def get_user_info_async(user_id: int) -> dict | None:
async with aiohttp.ClientSession() as session:
async with session.get(f"http://127.0.0.1:8000/user/{user_id}") as response:
response = await response.json()
if 'ok' not in response or not response['ok']:
return None
return response['user']
async def combine_scores_async(ids: List[int]) -> None:
futures = [get_user_info_async(user_id) for user_id in ids]
users = await asyncio.gather(*futures)
scores = [user['score'] for user in users if user is not None]
print(sum(scores))

As you can see, by calling the get_user_info_async, we are creating a coroutine and by utilizing asyncio.gather, we wait for all requests once and let them be executed concurrently.

def test_async():
asyncio.run(combine_scores_async(ids))


if __name__ == "__main__":
print("Sync:")
run_and_analyze(test_sync)
print("Async:")
run_and_analyze(test_async)

And finally, let’s run and analyze the function:

Sync:
36
took 638.3428ms
Async:
36
took 212.9474ms

We got the same result three times faster. However, is it just three times faster? imagine now we need 10 I/O calls.

users: Dict[int, UserInfo] = {
1: UserInfo(1, "Amir", 12),
2: UserInfo(2, "Alex", 15),
3: UserInfo(2, "Sara", 9),
4: UserInfo(2, "Sara", 9), #here
5: UserInfo(2, "Sara", 9),
6: UserInfo(2, "Sara", 9),
7: UserInfo(2, "Sara", 9),
8: UserInfo(2, "Sara", 9),
9: UserInfo(2, "Sara", 9),
10: UserInfo(2, "Sara", 9),
}


@app.get("/user/{user_id}")
async def get_user(user_id: int):
await asyncio.sleep(0.2)
if user_id in users:
return {"ok": True, "user": users[user_id]}
return {"ok": False, "error": "user not founded"}

First, I added new users to the users dictionary.

ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # here


def test_sync():
combine_scores(ids)


def test_async():
asyncio.run(combine_scores_async(ids))


if __name__ == "__main__":
print("Sync:")
run_and_analyze(test_sync)
print("Async:")
run_and_analyze(test_async)

And then, I added those ids to the ids list. Let’s run and see the difference:

Sync:
99
took 2086.9449ms
Async:
99
took 231.0776ms

This time it is ten times faster. As you can see, the code is more resilient and flexible.

Let’s build a real-time application.

First I should discuss the architecture of the application. Why are we using Redis in the first place?

Scaling Applications

In most languages like Python, PHP, and Nodejs, your programming language can run your webserver on only one CPU core. And to utilizes all CPU cores, server gateway interfaces or other applications are used to load balance your application accros all cores. Therefore, they do not share memory, or in some cases you can use the fork feature of unix os to have read-only shared memory. Therefore, we will be using Redis as the primary shared memory and communication between all application instances.

Let’s build a messaging application

I wanted to dive deep into async programming so that everyone who is not familiar with async functionality in Python understands why I am using FastAPI.

Let’s first create a simple FastAPI server (main.py):

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
return {"hello": "world"}

You can run it using the uvicorn:

uvicorn main:app - reload

Ok, it works fine. Next, we need to install the websockets package:

pip install websockets

Then we can build our first WebSocket route:

from fastapi import FastAPI
from starlette.websockets import WebSocket

app = FastAPI()


@app.get("/")
async def root():
return {"hello": "world"}


@app.websocket("/ws")
async def ws_root(websocket: WebSocket):
await websocket.accept()
await websocket.send_text("Hi")
await websocket.close()

As you can see, we didn’t do anything complicated. It accepts a request, replies with “Hi”, and closes the connection. You can use Postman to test WebSocket APIs:

Everything works fine. The next step is connecting to Redis. Let’s build a Redis connection builder and use FastAPI’s dependency injection tool.

pip install redis

Python’s redis package supports async io by importing redis.asyncio.

import redis.asyncio as redis

Then we need to create a connection pool:

redis_connection_pool = redis.ConnectionPool()

If you are using Upstash’s free Redis cluster, you can access your cluster’s credentials by going to the details tab:

redis_connection_pool = redis.ConnectionPool(
host='eu1-live-bison-38247.upstash.io',
port=38247,
password='********'
)

Now we need to create a builder for dependency injection:

redis_connection_pool = redis.ConnectionPool(
host='eu1-live-bison-38247.upstash.io',
port=38247,
password='********'
)


def redis_connection() -> redis.Redis:
return redis.Redis(connection_pool=redis_connection_pool)

Now let’s use fastapi.Depends to create a new redis connection from the connection pool:

@app.websocket("/ws")
async def ws_root(websocket: WebSocket, rdb: redis.Redis = Depends(redis_connection)):
await websocket.accept()
await websocket.send_text("Hi")
if await rdb.ping():
await websocket.send_text("connected to redis")
await websocket.close()

Now if you run it you must see that it is connected to the Redis:

Now, let’s subscribe to a channel whenever we connect to the WebSocket API.

@app.websocket("/ws")
async def ws_root(websocket: WebSocket, rdb: redis.Redis = Depends(redis_connection)):
await websocket.accept()
# creating pubsub instance and listening on the "test_channel"
ps = rdb.pubsub()
await ps.psubscribe("test_channel")
# waiting for new messages from the channel
while True:
message = await ps.get_message(ignore_subscribe_messages=True, timeout=None)
if message is None:
continue
text_message = message['data'].decode('utf-8')
if text_message == "stop":
await websocket.send_text("closing the connection")
break
await websocket.send_text(text_message)
await websocket.close()

As you can see, first, it creates a pub-sub instance and then registers on the “test_channel” channel. Now, whenever it receives a new message sends that message to the client by calling websocket.send_text. I should mention that Redis messages are binary, and we should either send binary messages to the client by calling send_bytes or convert them to text. Let’s test it.

You can use either redis-cli tool or Upstash’s CLI tool to connect to your Redis cluster. Let’s run the application and connect to the WebSocket and then run these commands in the Redis console.

As you can see, our Redis subscriber is working fine. Now, there is a problem. We want to handle WebSocket incoming messages and publish them to the channel. We need two concurrent coroutines to handle this:

@app.websocket("/ws")
async def ws_root(websocket: WebSocket, rdb: redis.Redis = Depends(redis_connection)):
await websocket.accept()

async def listen_redis():
ps = rdb.pubsub()
await ps.psubscribe("test_channel")
while True:
message = await ps.get_message(ignore_subscribe_messages=True, timeout=None)
if message is None:
continue
text_message = message['data'].decode('utf-8')
if text_message == "stop":
await websocket.send_text("closing the connection")
break
await websocket.send_text(text_message)

async def listen_ws():
while True:
message = await websocket.receive_text()
await websocket.send_text("Yay, we have received you message " + message)

await websocket.close()

First, I defined these two functions. We need to run them in parallel and let them be until one of them exists. We can achieve that by running the asyncio.wait function:

@app.websocket("/ws")
async def ws_root(websocket: WebSocket, rdb: redis.Redis = Depends(redis_connection)):
await websocket.accept()

async def listen_redis():
ps = rdb.pubsub()
await ps.psubscribe("test_channel")
while True:
message = await ps.get_message(ignore_subscribe_messages=True, timeout=None)
if message is None:
continue
text_message = message['data'].decode('utf-8')
if text_message == "stop":
await websocket.send_text("closing the connection")
break
await websocket.send_text(text_message)

async def listen_ws():
while True:
message = await websocket.receive_text()
await websocket.send_text("Yay, we have received you message " + message)
# here:
await asyncio.wait([listen_ws(), listen_redis()], return_when=asyncio.FIRST_COMPLETED)
await websocket.close()

Now, let’s test it:

The last step is to publish incoming messages from the WebSocket to all other connections:

@app.websocket("/ws")
async def ws_root(websocket: WebSocket, rdb: redis.Redis = Depends(redis_connection)):
await websocket.accept()

async def listen_redis():
ps = rdb.pubsub()
await ps.psubscribe("test_channel")
while True:
message = await ps.get_message(ignore_subscribe_messages=True, timeout=None)
if message is None:
continue
text_message = message['data'].decode('utf-8')
if text_message == "stop":
await websocket.send_text("closing the connection")
break
await websocket.send_text(text_message)

async def listen_ws():
while True:
message = await websocket.receive_text()
await rdb.publish("test_channel", message) # publishing the message to the redis pubsub channel

await asyncio.wait([listen_ws(), listen_redis()], return_when=asyncio.FIRST_COMPLETED)
await websocket.close()

Let’s test it:

Source codes

Conclusion

In this article, we skipped parts such as error handling that can make a massive impact on the application. Overall, Python always fighter its inner daemon, which is called GIL. First, Python async libraries were used by Google to improve the performance of its crawlers, and since then, many proposals have been introduced to remove GIL from Python. But to be honest, GIL is one of the reasons that make Python efficient and fast for single-threaded applications and, in my opinion, will always be a part of Python because removing it will have a significant impact on all Python packages. However, we can already use async/await and AsyncIO to overcome most of its limitations.

--

--

I've been working as a software engineer for 5 years. I love Go, Python, and building rebust and scalable apps with Kafka, Redis, and Kubernetes.