Redis Streams in Action — Part 2 (Tweets consumer app)

Build a Rust app to consume from the Twitter Streaming API

Abhishek Gupta
ITNEXT

--

Welcome to this series of blog posts that covers Redis Streams with the help of a practical example. We will use a sample application to make Twitter data available for search and query in real-time. RediSearch and Redis Streams serve as the backbone of this solution that consists of several co-operating components, each of which will be covered in a dedicated blog post.

The code is available in this GitHub repo — https://github.com/abhirockzz/redis-streams-in-action

In this part, we look at the service which interacts with the Twitter Streaming API to consume tweets and move them on to the next part in the processing pipeline.

High level architecture

Our end goal is to be able to process tweets and make them available for search and queries via RediSearch. One could write a "do it all" service to consume tweets and directly store them in RediSearch. But, in order to scale to handle the volume of tweets, we need a service to act as a buffer and decouple our producer (the application we will focus in this blog) and consumer (covered in the next blog).

This is exactly what our first component facilitates — it consumes streaming Twitter data and forwards it to Redis Streams. We will deploy it to Azure Container Instances, validate its functionality and also walk-through how it works along with the code.

As you will see in the later parts of this series, this also provides a foundation for scale-out architecture.

All in all, this blog post is short and simple! It lays down the ground work for other parts of the solution which will be covered in subsequent posts. Please don’t worry about the fact that the service is written in Rust (in case you don’t know it already). The logic can be easily ported over to your favorite programming language.

Pre-requisites

Start by getting a free Azure account if you don’t have one already and install the Azure CLI as well.

We will be deploying the tweets consumer application to Azure Container Instances using regular Docker CLI commands. This capability is enabled by integration between Docker and Azure. Just ensure that you have Docker Desktop version 2.3.0.5 or later, for Windows, macOS, or install the Docker ACI Integration CLI for Linux.

To use the Twitter Streaming API, you will also need a Twitter developer account. If you don’t have one already, please follow these instructions on how to set it up.

Deploy the app to Azure Container Instances

To start off, setup the Enterprise tier of Azure Cache for Redis, using this quickstart. Once you finish this step, ensure that you save the following information: the Redis host name and Access key

The tweets consumer application is available as a Docker container — the easiest way is to simply re-use it. If you wish to build you own image, please use the Dockerfile available in the GitHub repo.

You will now see how convenient it is to deploy it to Azure Container Instances, that allows you to run Docker containers on-demand in a managed, serverless Azure environment.

First, create an Azure context to associate Docker with an Azure subscription and resource group so you can create and manage container instances.

docker login azure
docker context create aci aci-context
docker context use aci-context

Set the environment variables — make sure to update Redis host and credentials as per your account:

export REDIS_HOSTNAME=<redis host port e.g. my-redis-host:10000>
export IS_TLS=true
export REDIS_PASSWORD=<redis access key (password)>
# don't forget your twitter api credentials
export TWITTER_API_KEY=<api key>
export TWITTER_API_KEY_SECRET=<api key secret>
export TWITTER_ACCESS_TOKEN=<access token>
export TWITTER_ACCESS_TOKEN_SECRET=<access token secret>

Just execute the good old docker run:

docker run -d --name redis-streams-producer \
-e REDIS_HOSTNAME=$REDIS_HOSTNAME \
-e IS_TLS=$IS_TLS \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e TWITTER_API_KEY=$TWITTER_API_KEY \
-e TWITTER_API_KEY_SECRET=$TWITTER_API_KEY_SECRET \
-e TWITTER_ACCESS_TOKEN=$TWITTER_ACCESS_TOKEN \
-e TWITTER_ACCESS_TOKEN_SECRET=$TWITTER_ACCESS_TOKEN_SECRET \
abhirockzz/tweets-redis-streams-producer-rust

A container should now be created in Azure and you should see an output similar to this:

[+] Running 2/2
⠿ Group redis-streams-producer Created 4.2s
⠿ redis-streams-producer Created 15.8s

Validate this using the Azure portal:

Tweets consumer app in Azure Container Instances

To check the container logs:

docker logs redis-streams-producer

So, does it work?

Well, it should! To confirm, connect to the Redis instance using redis-cli:

redis-cli -h <redis cache host> -p <redis port> -a <access key> --tls

… and run the XRANGE command to introspect Redis Streams:

XRANGE tweets_stream - + COUNT 5

This will return the first five tweets. You can change the COUNT as per your requirements.

The - and + special IDs mean respectively the minimum ID possible and the maximum ID possible inside the stream

That’s all you need to confirm that our application is able to consume tweets and add them to Redis Streams. As mentioned before, the rest of the components in our solution will build on top of this foundation.

You can either pause the app for now or delete it:

#to pause
docker stop redis-streams-producer
#to delete
docker rm redis-streams-producer

Now that you’ve seen the application in action, let’s quickly walk through “how” things work. If you’re interested in exploring some Rust code, you will find it useful.

Code walk through

You can refer to the code here

The app uses the following libraries:

It starts by connecting to Redis and Twitter:

fn connect_redis() -> redis::Connection {
println!("Connecting to Redis");
let redis_host_name =
env::var("REDIS_HOSTNAME").expect("missing environment variable REDIS_HOSTNAME");
let redis_password = env::var("REDIS_PASSWORD").unwrap_or_default();
//if Redis server needs secure connection
let uri_scheme = match env::var("IS_TLS") {
Ok(_) => "rediss",
Err(_) => "redis",
};
let redis_conn_url = format!("{}://:{}@{}", uri_scheme, redis_password, redis_host_name);
println!("redis_conn_url {}", redis_conn_url);
let client = redis::Client::open(redis_conn_url).expect("check Redis connection URL");
client.get_connection().expect("failed to connect to Redis")
}

Rather than follow a specific set of keywords or a user, we simply connect to the Twitter sample stream, which provides access to about 1% of all Tweets in real-time:

let token = twitter_token();    TwitterStream::sample(&token)
.try_flatten_stream()
.try_for_each(|json| {
let msg: model::StreamMessage =
serde_json::from_str(&json).expect("failed to convert tweet JSON to struct");
process(msg, c.clone());
future::ok(())
})
.await
.expect("error connecting to Twitter stream!");

Bulk of the logic is encapsulated in the process function. Let's go through that bit by bit.
twitter-stream crate returns each tweet in raw JSON form. It is converted into a model::StreamMessage which is a struct that's modeled as per the data we intend to extract from the raw tweet.

We use serde_json to get this done:

serde_json::from_str(&json).expect("json to struct conversion failed");

It is then passed to the process function along with a redis::Connection.

let conn = connect_redis();
let c = Arc::new(Mutex::new(conn));
...
fn process(msg: model::StreamMessage, conn: Arc<Mutex<redis::Connection>>) {
//omitted
}

But why wrap it within an Arc of Mutex?

That’s because we need to pass the redis::Connection to a FnMut closure. It moves the connection, thus we need to use a shared reference, which Arc provides. But Arc is not enough since we are not allowed to mutate the data. Thus, we use to use a Mutex to lock the connection object - the Rust compiler can be confident that only one thread can access it at a time (preserve immutability)

The processing part is relatively simple. It’s all about using the xadd_map function to add the tweet to a Redis Stream. It accepts a BTreeMap, which we create from info in model::StreamMessage - the tweet text, twitter user (screen) name, ID, location and hashtags (if any). Ultimately, the goal is to be able to index these in RediSearch and query them flexibly.

let mut stream_entry: BTreeMap<String, String> = BTreeMap::new();
stream_entry.insert("id".to_string(), tweet.id.to_string());
stream_entry.insert("user".to_string(), tweet.user.screen_name);
stream_entry.insert("text".to_string(), tweet.text);
stream_entry.insert("location".to_string(), tweet.user.location);

That’s all for this part.

Moving on to the next one…

We’re just getting started! This was the first component in our service that lays the foundation for processing the tweets and making them queryable via RediSearch. In the upcoming blog, we will dive into how to consume and process tweets from Redis Streams using a Java based application. Stay tuned!

--

--

Principal Developer Advocate at AWS | I ❤️ Databases, Go, Kubernetes