Trino on Nomad

Kidong Lee
ITNEXT
Published in
9 min readMay 21, 2021

--

Photo by Chris Lejarazu on Unsplash

Trino(formerly PrestoSQL) is a popular distributed interactive query engine in data lake. Trino can be used as not only query engine, but also data preparation engine in data lake. As data platform component, Trino is one of my favorite components to use in data lake. Here I am going to show you the way to deploy Trino on Nomad.

Prerequisites

The following components should be available to you before getting started.

  • Hive Metastore: In this post, Hive catalog will be added to Trino. If you don’t have Hive Metastore available, you can see Hive Metastore on Nomad.
  • S3 or MinIO S3 Object Storage
  • Nomad v1.0.4
  • Consul Template v0.22.1

Build Trino docker image

Because I am going to use the images of Trino Server and Trino CLI running on Nomad, we should build both of them first.

Build Trino Server docker image

The Dockerfile of Trino Server looks as follows:

FROM openjdk:11-slim

ENV APP_HOME /opt/trino-server
ENV TRINO_USER trino
ARG TRINO_VER

RUN useradd -ms /bin/bash -d ${APP_HOME} ${TRINO_USER}

RUN apt-get update && apt-get install -y curl dnsutils netcat python --no-install-recommends \
&& rm -rf /var/lib/apt/lists/*

RUN set -ex \
&& PACK_NAME=trino-server-${TRINO_VER} \
&& FILE_NAME=${PACK_NAME}.tar.gz \
&& curl -O https://repo1.maven.org/maven2/io/trino/trino-server/${TRINO_VER}/${FILE_NAME} \
&& tar -zxf ${FILE_NAME} \
&& cp -R ${PACK_NAME}/* ${APP_HOME} \
&& mkdir -p ${APP_HOME}/etc/catalog \
&& rm -rf ${FILE_NAME} \
&& rm -rf ${PACK_NAME}

RUN ls -al ${APP_HOME}

RUN chmod a+x -R ${APP_HOME}/bin
RUN chown ${TRINO_USER}: -R ${APP_HOME}

RUN set -ex \
&& echo 'trino soft nofile 131072' >> /etc/security/limits.d/trino.conf \
&& echo 'trino hard nofile 131072' >> /etc/security/limits.d/trino.conf
RUN cat /etc/security/limits.d/trino.conf

USER ${TRINO_USER}
WORKDIR ${APP_HOME}

Trino 356 will be used to build the image. Create and push the Trino Server image.

# image.
export TRINO_VERSION=356
export TRINO_IMAGE=mykidong/trino:${TRINO_VERSION}
# build.
docker build . --build-arg TRINO_VER=${TRINO_VERSION} -t ${TRINO_IMAGE};
## push.
docker push ${TRINO_IMAGE};

Build Trino CLI docker image

Trino CLI will be used to query data in Trino Server on Nomad. Let’s create Dockerfile of it.

FROM openjdk:8-slim

RUN apt-get update && apt-get install -y curl less --no-install-recommends \
&& rm -rf /var/lib/apt/lists/*

ARG TRINO_VER
RUN curl -o /opt/trino-cli https://repo1.maven.org/maven2/io/trino/trino-cli/${TRINO_VER}/trino-cli-${TRINO_VER}-executable.jar \
&& chmod +x /opt/trino-cli

# Remove curl.
RUN apt-get --purge remove -y curl && apt-get autoremove -y

And build and push the image of Trino CLI.

# image.
export TRINO_VERSION=356
export TRINO_CLI_IMAGE=mykidong/trino-cli:${TRINO_VERSION}
# build.
docker build . --build-arg TRINO_VER=${TRINO_VERSION} -t ${TRINO_CLI_IMAGE};
## push.
docker push ${TRINO_CLI_IMAGE};

Deploy Trino on Nomad

We are going to deploy one Trino coordinator, three Trino workers and one Trino CLI on Nomad, so the overview of trino job is.

job "trino" {
namespace = "trino"
...
group "coordinator" {
count = 1
...
}
group "worker" {
count = 3
...
}
group "cli" {
count = 1
...
}
}

Let’s see the details of the group coordinator .

group "coordinator" {
count = 1
restart {
attempts = 3
delay = "30s"
interval = "5m"
mode = "fail"
}
network {
port "http" {
}
}
task "trino-server" {
driver = "docker"
kill_timeout = "300s"
kill_signal = "SIGTERM"
template {
data = <<EOF
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port={{ env "NOMAD_HOST_PORT_http" }}
query.max-memory=5GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
query.max-stage-count=200
task.writer-count=4
discovery-server.enabled=true
discovery.uri=http://{{ env "NOMAD_IP_http" }}:{{ env "NOMAD_HOST_PORT_http" }}
EOF
destination = "local/config.properties"
}
template {
data = <<EOF
node.id={{ env "NOMAD_NAMESPACE" }}-{{ env "NOMAD_JOB_NAME" }}-{{ env "NOMAD_GROUP_NAME" }}-{{ env "NOMAD_ALLOC_ID" }}
node.environment=production
node.data-dir=/opt/trino-server/data
spiller-spill-path=/tmp
max-spill-per-node=4TB
query-max-spill-per-node=1TB
EOF
destination = "local/node.properties"
}
template {
data = <<EOF
-server
-Xmx16G
-XX:-UseBiasedLocking
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
EOF
destination = "local/jvm.config"
}
template {
data = <<EOF
connector.name=hive-hadoop2
hive.metastore.uri=thrift://{{range $index, $element := service "hive-metastore"}}{{if eq $index 0}}{{ .Address }}:{{ .Port }}{{end}}{{end}}
hive.allow-drop-table=true
hive.max-partitions-per-scan=1000000
hive.compression-codec=NONE
hive.s3.endpoint=https://nginx-test.cloudchef-labs.com
hive.s3.path-style-access=true
hive.s3.ssl.enabled=true
hive.s3.max-connections=100
hive.s3.aws-access-key=cclminio
hive.s3.aws-secret-key=rhksflja!@#
EOF
destination = "local/catalog/hive.properties"
}
config {
image = "mykidong/trino:356"
force_pull = false
volumes = [
"./local/config.properties:/opt/trino-server/etc/config.properties",
"./local/node.properties:/opt/trino-server/etc/node.properties",
"./local/jvm.config:/opt/trino-server/etc/jvm.config",
"./local/catalog/hive.properties:/opt/trino-server/etc/catalog/hive.properties"
]
command = "bin/launcher"
args = [
"run"
]
ports = [
"http"
]
ulimit {
nofile = "131072"
nproc = "65536"
}
}
resources {
cpu = 200
memory = 4096
}
service {
name = "trino-coordinator"
port = "http"
check {
name = "rest-tcp"
type = "tcp"
interval = "10s"
timeout = "2s"
}
}
}
}

Trino coordinator task will be run in this group. There are four templates in this group. These templates will generate Trino configurations of config.properties, node.properties, jvm.config and Hive catalog configuration hive.properties which will be mounted to the configuration path in the container. In the service stanza, the service trino-coordinator will be registered to Consul. This trino-coordinator service will be used as discovery path and interface to query SQL.

And see the second group worker stanza.

  group "worker" {
count = 3
restart {
attempts = 3
delay = "30s"
interval = "5m"
mode = "fail"
}
network {
port "http" {
}
}
task "await-coordinator" {
driver = "docker"
config {
image = "busybox:1.28"
command = "sh"
args = ["-c", "echo -n 'Waiting for service'; until nslookup trino-coordinator.service.consul 2>&1 >/dev/null; do echo '.'; sleep 2; done"]
network_mode = "host"
}
resources {
cpu = 100
memory = 128
}
lifecycle {
hook = "prestart"
sidecar = false
}
}
task "trino-server" {
driver = "docker"
kill_timeout = "300s"
kill_signal = "SIGTERM"
template {
data = <<EOF
coordinator=false
http-server.http.port={{ env "NOMAD_HOST_PORT_http" }}
query.max-memory=5GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
query.max-stage-count=200
task.writer-count=4
discovery.uri=http://{{range $index, $element := service "trino-coordinator"}}{{if eq $index 0}}{{ .Address }}:{{ .Port }}{{end}}{{end}}
EOF
destination = "local/config.properties"
}
template {
data = <<EOF
node.id={{ env "NOMAD_NAMESPACE" }}-{{ env "NOMAD_JOB_NAME" }}-{{ env "NOMAD_GROUP_NAME" }}-{{ env "NOMAD_ALLOC_ID" }}
node.environment=production
node.data-dir=/opt/trino-server/data
spiller-spill-path=/tmp
max-spill-per-node=4TB
query-max-spill-per-node=1TB
EOF
destination = "local/node.properties"
}
template {
data = <<EOF
-server
-Xmx16G
-XX:-UseBiasedLocking
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:ReservedCodeCacheSize=512M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
EOF
destination = "local/jvm.config"
}
template {
data = <<EOF
connector.name=hive-hadoop2
hive.metastore.uri=thrift://{{range $index, $element := service "hive-metastore"}}{{if eq $index 0}}{{ .Address }}:{{ .Port }}{{end}}{{end}}
hive.allow-drop-table=true
hive.max-partitions-per-scan=1000000
hive.compression-codec=NONE
hive.s3.endpoint=https://nginx-test.cloudchef-labs.com
hive.s3.path-style-access=true
hive.s3.ssl.enabled=true
hive.s3.max-connections=100
hive.s3.aws-access-key=cclminio
hive.s3.aws-secret-key=rhksflja!@#
EOF
destination = "local/catalog/hive.properties"
}
config {
image = "mykidong/trino:356"
force_pull = false
volumes = [
"./local/config.properties:/opt/trino-server/etc/config.properties",
"./local/node.properties:/opt/trino-server/etc/node.properties",
"./local/jvm.config:/opt/trino-server/etc/jvm.config",
"./local/catalog/hive.properties:/opt/trino-server/etc/catalog/hive.properties"
]
command = "bin/launcher"
args = [
"run"
]
ports = [
"http"
]
ulimit {
nofile = "131072"
nproc = "65536"
}
}
resources {
cpu = 200
memory = 4096
}
service {
name = "trino-worker"
port = "http"
check {
name = "rest-tcp"
type = "tcp"
interval = "10s"
timeout = "2s"
}
}
}
}

The task await-coordinator is a prestart task which will be run before the task of trino-server . This await-coordinator task will wait for the service trino-coordinator being ready. Three Trino workers will be run in this group.

And finally, let’s see the group of cli .

group "cli" {
count = 1
restart {
attempts = 3
delay = "30s"
interval = "5m"
mode = "fail"
}
task "await-coordinator" {
driver = "docker"
config {
image = "busybox:1.28"
command = "sh"
args = ["-c", "echo -n 'Waiting for service'; until nslookup trino-coordinator.service.consul 2>&1 >/dev/null; do echo '.'; sleep 2; done"]
network_mode = "host"
}
resources {
cpu = 100
memory = 128
}
lifecycle {
hook = "prestart"
sidecar = false
}
}
task "trino-cli" {
driver = "docker"
kill_timeout = "300s"
kill_signal = "SIGTERM"
config {
image = "mykidong/trino-cli:356"
force_pull = true
command = "tail"
args = [
"-f",
"/dev/null"
]
}
resources {
cpu = 100
memory = 256
}
}
}

Trino CLI task will be run in this group. With Trino CLI, you can access Trino Cluster and query data.

The complete trino.nomad can be seen here: https://gist.github.com/mykidong/37e22e66026a7850b6b8f65a8f6457ce

First, create a namespace for trino cluster.

nomad namespace apply -description "Trino Cluster" trino;

and then, run Trino nomad job.

nomad job run trino.nomad;

After submitting the job on Nomad successfully, you will see the status of trino job.

nomad job status -namespace trino trino
...
Allocations
ID Node ID Task Group Version Desired Status Created Modified
81eaef50 457a8291 worker 0 run running 2h26s ago 1h59m ago
c014f467 457a8291 coordinator 0 run running 2h26s ago 1h59m ago
c1dcb51e 709ee9cc worker 0 run running 2h26s ago 1h59m ago
cf006809 457a8291 cli 0 run running 2h26s ago 1h59m ago
d83876e0 457a8291 worker 0 run running 2h26s ago 1h59m ago

We have Trino cluster running on Nomad now.

Query data in Trino using Trino CLI

You can query data in Trino with Trino CLI. To query some data in Trino, I am going to create sample tables in MinIO. You can use another object storage which is compatible with S3 to store data.

The following test.json will be used as source data to create tables in MinIO using spark.

{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}
{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}
{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}
{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}
{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}
{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}
{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}
{"itemId":"any-item-id0","quantity":2,"price":1000,"baseProperties":{"uid":"any-uid0","eventType":"cart-event","version":"7.0","ts":1527304486873}}

To create tables in MinIO, write a spark job like this.

...
String metastoreUrl = System.getProperty("metastoreUrl");

SparkConf sparkConf = new SparkConf().setAppName("create sample table");
sparkConf.setMaster("local[2]");

// delta lake log store for s3.
sparkConf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore");

SparkSession spark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();


Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();
hadoopConfiguration.set("fs.defaultFS", "s3a://mykidong");
hadoopConfiguration.set("fs.s3a.endpoint", "https://nginx-test.cloudchef-labs.com");
hadoopConfiguration.set("fs.s3a.access.key", "cclminio");
hadoopConfiguration.set("fs.s3a.secret.key", "rhksflja!@#");
hadoopConfiguration.set("fs.s3a.path.style.access", "true");
hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
hadoopConfiguration.set("hive.metastore.uris", "thrift://" + metastoreUrl);
hadoopConfiguration.set("hive.server2.enable.doAs", "false");
hadoopConfiguration.set("hive.metastore.client.socket.timeout", "1800");
hadoopConfiguration.set("hive.execution.engine", "spark");


// read json.
String json = StringUtils.fileToString("data/test.json");
String lines[] = json.split("\\r?\\n");
Dataset<Row> df = spark.read().json(new JavaSparkContext(spark.sparkContext()).parallelize(Arrays.asList(lines)));

df.show(10);

// write delta.
df.write().format("delta")
.option("path", "s3a://mykidong/test-delta")
.mode(SaveMode.Overwrite)
.save();

// create delta table.
spark.sql("CREATE TABLE IF NOT EXISTS test_delta USING DELTA LOCATION 's3a://mykidong/test-delta'");

// read delta.
Dataset<Row> delta = spark.sql("select * from test_delta");

delta.show(10);

// create persistent parquet table with external path.
delta.write().format("parquet")
.option("path", "s3a://mykidong/test-parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("test_parquet");

// read parquet from table.
Dataset<Row> parquet = spark.sql("select * from test_parquet");

parquet.show(10);

After running spark job, delta lake table and parquet based table will be created in Hive Metastore.

We need ip address and port of Trino coordinator and the allocation id of the Trino CLI task to query data in Trino using Trino CLI.

To get ip address and port of Trino coordinator.

cat <<EOF > service.tpl
{{range \$index, \$element := service "trino-coordinator"}}{{if eq \$index 0}}{{ .Address }}:{{ .Port }}{{end}}{{end}}
EOF
consul-template -template service.tpl -dry;
>
10.0.0.200:21409

Trino coordinator address is returned as 10.0.0.200:21409 .

Let’s get the allocation id of Trino CLI task on Nomad.

nomad job status -namespace trino trino;
...
Allocations
ID Node ID Task Group Version Desired Status Created Modified
81eaef50 457a8291 worker 0 run running 30m50s ago 29m50s ago
c014f467 457a8291 coordinator 0 run running 30m50s ago 30m8s ago
c1dcb51e 709ee9cc worker 0 run running 30m50s ago 29m56s ago
cf006809 457a8291 cli 0 run running 30m50s ago 29m37s ago
d83876e0 457a8291 worker 0 run running 30m50s ago 29m52s ago

The allocation id of the task group cli is cf006809.

Now, run the following command to access Hive catalog in Trino using Trino CLI running on Nomad.

export ALLOC_ID=cf006809;
export COORDINATOR=10.0.0.200:21409;
nomad alloc exec -namespace trino -task trino-cli ${ALLOC_ID} /opt/trino-cli --server ${COORDINATOR} --catalog hive --schema default;

If you query data in Trino, you will get the result from Trino like below.

...trino:default> show tables;
Table
--------------
test_delta
test_parquet
(2 rows)
Query 20210521_102401_00001_ufy36, FINISHED, 3 nodes
Splits: 36 total, 36 done (100.00%)
4.63 [2 rows, 56B] [0 rows/s, 12B/s]
trino:default> select * from test_parquet;
baseproperties | itemid | price | quantity
---------------------------------------------------------------------+--------------+-------+----------
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
{eventtype=cart-event, ts=1527304486873, uid=any-uid0, version=7.0} | any-item-id0 | 1000 | 2
(8 rows)
Query 20210521_102415_00003_ufy36, FINISHED, 2 nodes
Splits: 18 total, 18 done (100.00%)
5.13 [8 rows, 5.85KB] [1 rows/s, 1.14KB/s]

That’s it.

--

--