PubSub to BigTable — Piping your Data Stream in via GCP Cloud Functions

Kerri Rapes
ITNEXT
Published in
7 min readSep 11, 2019

--

Reading, writing, publishing the code bits you need to implement Serverless ETLs today.

Your boss has just announced that you’re going to attach IoT sensors to everything… and I mean EVERYTHING. Now you have collect and write all that data to a DB. All the while, the Frontend team tells you they need sub-second response times. Annnnd the executives keep throwing out buzz words like “Serverless” and “NoSQL” … if you don’t hurry they are going to start demanding “Machine Learning” and “BLOCKCHAIN”…

The following is a simple and scalable, serverless solution for moving data. It doesn’t do all the tricks and flips like Dataflow, but it’s faster to implement, easy to work with in teams and doesn’t require you to learn Apache Beam.

The TL;DR GitHub repo is available HERE.

The Setup

Iot — PubSub — CloudFunction — BigTable

Definitions

— Services

  1. PubSub

PubSub is a publisher — subscriber messaging service provided by GCP. Think of it like SNS from the AWS world or Kafka from the Open Source community. The fully managed service can be used in either a pull or push setup. In the pull scenario, the system functions as a queue of sorts where applications pull messages and acknowledge that they were handled successfully. In the push case, PubSub automatically pushes the messages to the subscribing endpoints (in our case the Cloud Function) and recieves an acknowledgement when the function has terminated (regardless if it terminated with ‘ok’ or ‘crash’).

2. CloudFunctions

Cloud Functions are serverless compute packages that allow the developers to run bits of stateless code when triggered by an event (think AWS Lambda).

3. BigTable

BigTable is a managed NoSQL database. It works with a single key store and permits sub 10ms latency on requests.

4. Serverless Framework

Serverless Framework is an open-source deployment framework for serverless applications. For this project, we’re going to use it to create and deploy GCP resources. In practice, this framework allows for reliable testing, as identical resources and configurations can be applied to all Dev, Test, and Production environments.

— Connections

  1. IoT (Simulator)— PubSub

The IoT in this demo is a simulated device based on Google’s public NYC Taxi dataset. The mock_sensorData.py script will read through the CSV file and publish events at the same pace as they originally occurred (as indicated by the timestamp) or they can be altered as a factor of that pace. For example, if we wish to see the series of events unfold more rapidly, we can alter the — speed_factor parameter to be 60 indicating that 1 hour worth of data will be processed over the course of 1 min. The events are published as PubSub messages on the sensorData_STAGE topic where they will later be picked up by the Cloud Function.

2. PubSub — CloudFunction

For this project, we will be using a ‘push’ setup with a Cloud Function subscribing to the PubSub topic and an automatic trigger launching the function when a message is published.

3. CloudFunction — BigTable

To perform the ETL and store the data the Cloud Function will write to file the contents of the event message. This event will be stored with a primary key specifically constructed to provide the optimum response time for the most common access patterns. For more information on constructing partition keys checkout Google’s Schema Design page.

Implementation

Serverless Framework Service

— Installing packages

To build the service from scratch start by installing Serverless Framework with npm.

$npm install -g serverless

— Creating service

Next, create a basic python service with the `create` command.

$sls create --template google-python --path my-service

— Basic anatomy

This will generate a boilerplate package containing the serverless.yml and a main.py file.

  • Serverless.yml

The YML file is the ground truth providing any and all information necessary to set up and deploy the service. Inside the file things like name, region, provider are defined as well as functions, API endpoints, triggers, and infrastructure.

  • GCP plug-in

Serverless Framework operates by calling Google’s API endpoints for infrastructure construction. In order to work correctly, the GCP plug-in must be installed.

$npm install --save serverless-google-cloudfunctions
  • Access keys

Now you need to prove to Google that you’re a real developer and not a hacker trying to rule the world with some wild ETL functions. Make yourself credentials in the IAM console, save the Secret Token JSON file in a secure location, and paste the file path in the credentials sections of your severless.yml. (In the current GitHub repo the .yml is setup to look in your HOME directory for a JSON with the same name as your project)

**serverless.yml**
credentials: ~/${self:provider.project}.json

CloudFunction

— Writing the handler

The serverless-google-cloudfunctions plug-in directs all traffic through a single python file called main.py. While this might be fine for small projects I find it ill-equipped when trying to manage Dev, Test, Prod environments. The best solution I’ve found to date is to direct the flow of events through the main.py file to separate *_handler.py files.

## main.py ##import pubSubReciever_handler
import mock_sensorData_handler
def pubSubReciever_dev(event, context):
return pubSubReciever_handler.main(event, context)
def pubSubReciever_test(event, context):
return pubSubReciever_handler.main(event, context)
def pubSubReciever_prod(event, context):
return pubSubReciever_handler.main(event, context)
def mock_sensorData_dev(event):
return mock_sensorData_handler.main(event)
def mock_sensorData_test(event):
return mock_sensorData_handler.main(event)
def mock_sensorData_prod(event):
return mock_sensorData_handler.main(event)

— Reading an incoming message

Once your function is set up, the first thing it needs to do is read the Pubsub message. Start by decoding it out of base64..

data = base64.b64decode(event['data']).decode('utf-8')
print("DATA: {}".format(data))

— Writing to BigTable

After you have the data your function will probably do some really cool transformations and then it will need to write the results to BigTable.

def writeToBigTable(table, data):    timestamp = data['event']['date']
rts = reverseTimestamp(timestamp)
row_key = '{}'.format(rts).encode()
row = table.row(row_key)
for colFamily in data.keys():
for key in data[colFamily].keys():
row.set_cell(colFamily,
key,
data[colFamily][key],
timestamp=timestamp)
table.mutate_rows([row])
return data

PubSub

— Writing the IoT event to a Topic

The goal of this project was to pipe your data stream into BigTable. So to get the whole thing started you need to publish your IoT data to Pubsub. Just like the message decoding in the Cloud Function, all data needs to be encoded before publishing.

def publish(publisher, topic, events):
numobs = len(events)
if numobs > 0:
logging.info('Publishing {0} events from {1}'.format(numobs, get_timestamp(events[0])))
for event_data in events:
publisher.publish(topic,event_data)

Pulling it all together

— Creating a topic

After the bits and pieces have been built it’s time to pull the whole thing together. Start by creating a topic in the resources section of the serverless.yml

- name: mock_SensorData_${self:provider.stage}
type: gcp-types/pubsub-v1:projects.topics
properties:
topic: sensorData_${self:provider.stage}

— Subscribing a function

Next, create the cloud function in the functions section and subscribe it to the topic.

pubSubReciever:
handler: pubSubReciever_${self:provider.stage}
events:
- event:
eventType: providers/cloud.pubsub/eventTypes/topic.publish
resource: projects/${self:provider.project}/topics/sensorData_${self:provider.stage}

— -Creating a table

Finally, back in resources, declare the BigTable cluster and defines its columns.

- type: gcp-types/bigtableadmin-v2:projects.instances
name: projects/${self:provider.project}/instances/iotincoming
properties:
parent: projects/${self:provider.project}
instanceId: iotincoming
clusters:
iotcluster:
defaultStorageType: HDD
location: projects/${self:provider.project}/locations/${self:provider.region}
instance:
displayName: IotIncoming
type: DEVELOPMENT
- type: gcp-types/bigtableadmin-v2:projects.instances.tables
name: incomingraw_${self:provider.stage}
properties:
parent: projects/${self:provider.project}/instances/iotincoming
tableId: incomingraw_${self:provider.stage}
table:
granularity: MILLIS
columnFamilies:
event:
gcRule:
maxNumVersions: 2

— Deployment and Execution

It’s now ready to fulfill all your boss’ ETL dreams. To deploy run a quick sls deploy and then begin streaming your data by publishing to the Pubsub Topic.

$sls deploy --project PROJECTNAME$curl --header "Content-Type: application/json" \
--request POST \
--data '{"limit": NUMER_OF_EVENTS, "speedFactor": VELOCITY_OF_SIMULATION, "project": PROJECT_NAME}' \
https://REGION-PROJECT_NAME.cloudfunctions.net/FUNCTION_NAME_STAGE_NAME

Collecting the Fruits of your Labor

— Reading from the table

After its all over and the dust has settled you’re going to have access the data in BigTable. There are a few different options outlined in Google’s BigTable white paper but the method we found most useful was to stream the data into a dictionary.

from google.cloud import bigtable
from google.cloud.bigtable import column_family
from google.cloud.bigtable import row_filters


client = bigtable.Client(project=project_id, admin=True)
instance = client.instance(instance_id)
table = instance.table(table_id)

def streamToDict(partial):
def dc(byte):
return byte.decode("utf-8")
newDict = {}
for row in partial:
newDict[dc(row.row_key)] = {}
for col in row.cells.keys():
newDict[dc(row.row_key)][col] = {}
for key in row.cells[col].keys():
newDict[dc(row.row_key)][col][dc(key)] = dc(row.cells[col][key][0].value)
return newDict


partial = table.read_rows(limit=3)
response = streamToDict(partial)
for key in response.keys():
print(response[key])
print()

Conclusion

To see the complete implementation in all it’s glory (and used for the graphics below) see the GitHub page.

— Performance

The service was able to receive, process and store all the data at sub-second speeds. The only drawback we noticed is that PubSub does not guarantee that messages are delivered in order. This means that sometimes, if we were reading as fast as we were writing we would see data points A and C, then on the next read we would see A, B, and C. In our case the BigTable keys were designed to create DateTime order and we don’t have a need to read datapoints milliseconds after they are written to PubSub so it wasn’t a big deal. However, it is something to keep in mind.

— Looking Forward

Looking forward this ETL provides a nice foundation for a fan-out architecture. The ability to build more functions for specific tasks and chain together PubSub topics makes both sequential and parallel processing possible. While the isolated nature of Cloud Functions makes the service great for working with teams and robust if one of the processes should fail.

--

--