Setting up data pipelines with CrateDB and Kestra.io

Kestra.io is an open-source workflow automation and orchestration tool that enables users to automate and manage complex workflows in a streamlined and efficient manner. The tool provides a wide range of features and integrations, including Postgres, Git, Docker, Kubernetes, and more, making automating processes across different platforms and environments easy. Kestra comes with a user-friendly web-based interface, allowing users to create, modify, and manage workflows without the need for any coding skills.

In this tutorial, we will show you how CrateDB integrates with Kestra using the PostgreSQL plugin to create an efficient and scalable data pipeline.

Running Kestra on Docker

Getting started with Kestra using Docker is a straightforward process. First, you’ll need to install Docker on your machine, if you haven’t already. Next, you can pull the Kestra official Docker image from the Docker registry and run it using the following command:

docker run -d -p 8080:8080 kestra/kestra:latest

This will start the Kestra server on your local machine, which you can access by navigating to http://localhost:8080 in your web browser. From there, you can start creating workflows and automating your processes using Kestra’s web interface.

Deploy a CrateDB Cloud Cluster

To deploy a new cluster on CrateDB Cloud, you need to sign up for a CrateDB Cloud account. When creating a new organization, you are entitled to a $200 free credit to spend on cluster deployment, scaling, and other operations as you see fit. Once you’ve signed up, you can create a new cluster by selecting the Create Cluster button and choosing your preferred cloud provider and region. You can then configure your cluster by selecting the number of nodes and the amount of storage you need. In this example, we used the 1-node cluster with 4GiB of storage which is enough for development environments and low-traffic applications.

Once your cluster is up and running, you can start using CrateDB’s powerful distributed SQL database features via a web-based Admin UI.

Move data between clusters with Kestra

There are several ways you can use to move data between CrateDB clusters and in the following example, we will illustrate how to simply do this with Kestra. The prerequisite for this use case is to have two running CrateDB clusters. The most straightforward way is to deploy both CrateDB clusters on CrateDB Cloud.

Now, let’s import some data on the first cluster. To do so, go back to the cluster overview page and click on Learn how to import data link. This will open a list of statements you need to execute to load NYC taxi data.

CREATE TABLE "nyc_taxi" (
  "congestion_surcharge" REAL, 
  "dolocationid" INTEGER, 
  "extra" REAL, 
  "fare_amount" REAL, 
  "improvement_surcharge" REAL, 
  "mta_tax" REAL, 
  "passenger_count" INTEGER, 
  "payment_type" INTEGER, 
  "pickup_datetime" TIMESTAMP WITH TIME ZONE, 
  "pulocationid" INTEGER, 
  "ratecodeid" INTEGER, 
  "store_and_fwd_flag" TEXT, 
  "tip_amount" REAL, 
  "tolls_amount" REAL, 
  "total_amount" REAL, 
  "trip_distance" REAL, 
  "vendorid" INTEGER) WITH ("column_policy" = 'dynamic', "number_of_replicas" = '0', "refresh_interval" = 10000);
COPY "nyc_taxi" FROM 'https://s3.amazonaws.com/crate.sampledata/nyc.yellowcab/yc.2019.07.gz' WITH (compression = 'gzip');

On the second cluster, create an empty nyc_taxi table. As a next step, we will create a new Kestra Flow to move data between clusters.

Flows in Kestra are used to implement workflows. Each flow is defined as a declarative model in the YAML file and it contains all the tasks and the order in which the tasks will be run. A flow must have an identifier (id), a namespace, and a list of tasks.

Query CrateDB table

To query a PostgreSQL-compatible database, such as CrateDB, Kestra offers io.kestra.plugin.jdbc.postgresql.Query plugin. The Query task allows users to execute custom SQL queries against a database and use the results in their workflow. This task offers various parameters such as auto-committing SQL statements, specifying access-control rules, and storing fetch results.

The following snippet shows the declaration of our workflow and the specification of the first task that selects data from the nyc_taxi table and runs the query on the first CrateDB cluster:

id: cratedb-kestra
namespace: io.kestra.crate
tasks:
- id: query
  type: io.kestra.plugin.jdbc.postgresql.Query
  url: jdbc:postgresql://cratedb-kestra.aks1.westeurope.azure.cratedb.net:5432/
  username: admin
  password: my_super_secret_password
  sql: SELECT * FROM doc.nyc_taxi LIMIT 1000
  store: true

In this task, we set the store parameter is set to true to allow storing the results that will be used as input in the following task.

Insert data into the second table

In Kestra, a batch task is a type of task that allows you to fetch rows from a table and bulk insert them into another one. We will use an instance of a batch task to fetch the results of the first task and to insert the results into the table on the second cluster.

The following snippet shows the Batch task declaration used for inserting data into the table on the second CrateDB cluster:

- id: update
  type: io.kestra.plugin.jdbc.postgresql.Batch
  from: "{{ outputs.query.uri }}"
  url: jdbc:postgresql://cratedb-kestra2.aks1.eastus2.azure.cratedb.net:5432/
  username: admin
  password: my_super_secret_password
  sql: |
    INSERT INTO doc.nyc_taxi VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )

When a Kestra task is executed, it may create or modify a resource, such as a file, database record, or API endpoint. Kestra allows the output produced by the task to be stored in the execution flow context and used by subsequent tasks. The output object is used to capture information about the results of the task, including any resources that were created or modified.

In our example, the output.query.uri refers to the URI of the resource that was created by the previous task. Specifically, it refers to the URI of the database records returned by the SELECT statement.

Finally, once the data are imported to the second table, let’s create a new task that selects data from that table:

- id: select
  type: io.kestra.plugin.jdbc.postgresql.Query
  url: jdbc:postgresql://kestra-testing-cluster2.aks1.eastus2.azure.cratedb.net:5432/
  username: admin
  password: my_super_secret_password
  sql: SELECT MAX_BY(passenger_count, fare_amount) FROM doc.nyc_taxi
  store: false

In the last task, we select the passenger_count value for which the fare_amount is the highest and to achieve that we use the MAX_BY aggregation function. MAX_BY is one of the latest aggregation functions supported by CrateDB and to learn more about it, check out our recent blog post.

Execute the flow

To execute the flow click on the New Execution button below the Flow specification. To monitor the execution of your Flow, check the Logs view:

The Logs view shows the execution status of each task, as well as the running times. There are other ways to monitor the execution of Flows in Kestra and we refer to the official Kestra documentation for a better overview of its full capabilities.

Finally, let’s check the data in the second cluster. As illustrated below, we can see that exactly 1000 records got inserted:

Wrap up

If you need to automatically manage CrateDB data pipelines, kestra.io can be a good choice. It allows you to specify workflows without requiring coding skills. Furthermore, it supports integrations with various systems including Postgres (and CrateDB), Kubernetes, Docker, Git, and many others.

In this tutorial, we have also shown how to deploy your CrateDB cluster in a few clicks. If you want to try it out and enjoy all of the CrateDB features, sign up for the CrateDB Cloud trial.

To learn more about updates, features, and other questions you might have, join our CrateDB community.

2 Likes