Automating recurrent CrateDB queries

Certain database operations can require running identical queries recurringly, creating the wish to make use of a scheduling mechanism. This article will discuss different existing scheduling solutions and how to integrate them with CrateDB.

As an example use case, we will look at implementing continuous aggregates, a strategy to improve the performance of certain aggregations by precalculating the result.

Use Case: Continuous Aggregates

Our base table contains simple periodic sensor readings:

CREATE TABLE sensor_readings_raw (
  ts TIMESTAMP NOT NULL,
  sensor_id INTEGER NOT NULL,
  "value" DOUBLE NOT NULL
);

To analyze the sensor readings, an hourly average is calculated regularly by a data analytics tool:

SELECT DATE_TRUNC('hour', ts),
       sensor_id,
       AVG(value)
FROM sensor_readings_raw;

In certain cases, it can make sense to precalculate the result, e.g. due to strict performance requirements or a high volume of identical queries.

We create a second table that stores the result set of the above query:

CREATE TABLE sensor_readings_aggregated (
  ts_hour TIMESTAMP NOT NULL,
  sensor_id TEXT NOT NULL,
  value_avg DOUBLE PRECISION NOT NULL,
  last_updated GENERATED ALWAYS AS NOW(),
  PRIMARY KEY(ts_hour, sensor_id)
);

The INSERT query below will populate the target table. To update already aggregated data (e.g. for the latest hour which still changes or if data arrives late), we consider the raw readings of the last six hours and use the ON CONFLICT DO UPDATE clause to override previously existing data:

INSERT INTO sensor_readings_aggregated (ts_hour, sensor_id, value_avg)
SELECT DATE_TRUNC('hour', ts),
       sensor_id,
       AVG(value)
FROM sensor_readings_raw
WHERE ts >= DATE_TRUNC('hour', ts - '6 hours'::INTERVAL)
GROUP BY 1, 2
ON CONFLICT (ts_hour, sensor_id) DO UPDATE SET value_avg = excluded.value_avg;

This INSERT query should be scheduled regularly to update the aggregated data.

Scheduling Methods

We will now go through several scheduling methods and how to use them for the given use case.

cron

On Unix-based operating systems, a cron job can be used for scheduling.

Prerequisites

The CrateDB CLI client crash is installed.

Setup

  1. Crate a script ~/update_continuous_aggregates.sh with the following content (replace <placeholders> for CrateDB connection information accordingly):
    #!/bin/bash
    
    UPDATE_QUERY=$(cat << QUERY
      INSERT INTO sensor_readings_aggregated (ts_hour, sensor_id, value_avg)
      SELECT DATE_TRUNC('hour', ts),
             sensor_id,
             AVG(value)
      FROM sensor_readings_raw
      WHERE ts >= DATE_TRUNC('hour', ts - '6 hours'::INTERVAL)
      GROUP BY 1, 2
      ON CONFLICT (ts_hour, sensor_id) DO UPDATE SET value_avg = excluded.value_avg;
    QUERY
    )
    
    CRATEPW=<CrateDB user password> crash -U <CrateDB user name> -c "${UPDATE_QUERY}" --hosts https://<CrateDB host>:4200 > /dev/null
    
  2. Make the script executable: chmod +x ~/update_continuous_aggregates.sh
  3. On the Unix command line, run crontab -e to edit the cron jobs of the current user. Add the following line to update aggregated data every five minutes:
    */5 * * * * ~/update_continuous_aggregates.sh

Caveats

Ensure that your cron jobs are monitored. By default, the cron daemon will attempt to deliver output (such as error messages) to the user’s mailbox if configured correctly.

Several 3rd party cron job monitoring solutions exist as well for more sophisticated monitoring.

Node-RED

Node-RED is a low-code programming tool also offering scheduling functionality.

Prerequisites

The node-red-contrib-postgresql package is installed.

Setup

  1. Import the attached workflow definition (replace <placeholders> for CrateDB connection information accordingly):

    Workflow definition
     [{
      "id": "97faa38a7298a42f",
      "type": "tab",
      "label": "Continuous Aggregates",
      "disabled": false,
      "info": ""
    }, {
      "id": "198292c29380b198",
      "type": "inject",
      "z": "97faa38a7298a42f",
      "d": true,
      "name": "Every 5 minutes",
      "props": [{
        "p": "payload"
      }, {
        "p": "topic",
        "vt": "str"
      }],
      "repeat": "300",
      "crontab": "",
      "once": false,
      "onceDelay": 0.1,
      "topic": "",
      "payloadType": "date",
      "x": 250,
      "y": 280,
      "wires": [["de0833a8befa9217"]]
    }, {
      "id": "de0833a8befa9217",
      "type": "postgresql",
      "z": "97faa38a7298a42f",
      "name": "Update Continuous Aggregates",
      "query": "INSERT INTO sensor_readings_aggregated (ts_hour, sensor_id, value_avg)\nSELECT DATE_TRUNC('hour', ts),\n       sensor_id,\n       AVG(value)\nFROM sensor_readings_raw\nWHERE ts >= DATE_TRUNC('hour', ts - '6 hours'::INTERVAL)\nGROUP BY 1, 2\nON CONFLICT (ts_hour, sensor_id) DO UPDATE SET value_avg = excluded.value_avg\nRETURNING _id, sensor_id, value_avg;",
      "postgreSQLConfig": "79bc378b4e65b06e",
      "split": false,
      "rowsPerMsg": 1,
      "outputs": 1,
      "x": 530,
      "y": 280,
      "wires": [[]]
    }, {
      "id": "79bc378b4e65b06e",
      "type": "postgreSQLConfig",
      "name": "CrateDB",
      "host": "<CrateDB host>",
      "hostFieldType": "str",
      "port": "5432",
      "portFieldType": "num",
      "database": "doc",
      "databaseFieldType": "str",
      "ssl": "true",
      "sslFieldType": "bool",
      "max": "10",
      "maxFieldType": "num",
      "min": "1",
      "minFieldType": "num",
      "idle": "1000",
      "idleFieldType": "num",
      "connectionTimeout": "10000",
      "connectionTimeoutFieldType": "num",
      "user": "<CrateDB user name>",
      "userFieldType": "str",
      "password": "<CrateDB user password>",
      "passwordFieldType": "str"
    }]
    

Apache Airflow

Airflow is a tool to programmatically schedule and control complex workflows.

Prerequisites

The apache-airflow-providers-postgres package is installed.

Setup

  1. Configure the connection to CrateDB by setting a corresponding environment variable: export AIRFLOW_CONN_CRATEDB_CONNECTION="postgresql://<CrateDB user name>:<CrateDB user password>@<CrateDB host>/doc?sslmode=disable"
  2. Create a new DAG with the code below:
    import datetime
    
    from airflow import DAG
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    
    with DAG(
        dag_id="cratedb_continuous_aggregates_operator",
        start_date=datetime.datetime(2021, 9, 8),
        schedule_interval="@hourly",
        catchup=False,
    ) as dag:
        PostgresOperator(
            task_id="insert_aggregates",
            postgres_conn_id="cratedb_connection",
            sql="""
                INSERT INTO sensor_readings_aggregated (ts_hour, sensor_id, value_avg)
                SELECT DATE_TRUNC('hour', ts),
                       sensor_id,
                       AVG(value)
                FROM sensor_readings_raw
                WHERE ts >= DATE_TRUNC('hour', ts - '6 hours'::INTERVAL)
                GROUP BY 1, 2
                ON CONFLICT (ts_hour, sensor_id) DO UPDATE SET value_avg = excluded.value_avg;
              """,
        )
    
3 Likes