Building a data retention policy for CrateDB with Apache Airflow

In time-series use cases, a significant amount of data can accumulate over time. Making efficient use of available storage can be a key factor for the economic success of a database solution.

Keeping full historical data in the original time-series table (hot storage) guarantees the best querying performance. After a certain time span, it might be desirable to transition historic data to cheaper storage (cold storage). This is also often referred to as a retention policy.

In previous posts, we already introduced two data retention policies, automated using Apache Airflow:

  1. Delete: Dropping cold partitions without keeping the data
  2. Reallocate: With a CrateDB cluster consisting of designated hot/cold nodes (e.g., with different disk types), moving partitions from hot nodes to cold nodes

This post adds a third snapshot strategy. Based on CrateDB’s snapshot mechanism. It allows exporting partitions to repositories such as AWS S3, Azure Blob storage, local file systems, and more. Once exported, partitions can be dropped from hot storage. If needed for querying again, a restore is possible.

Prerequisites

If you do not have an Airflow setup available, please see our Astronomer introduction. You may also adapt the implementation to work with your automation tool of choice or run the queries manually.

Data model

We assume that you have a time-series table that is partitioned, such as the one below:

CREATE TABLE doc.sensor_readings (
   time TIMESTAMP WITH TIME ZONE NOT NULL,
   time_month TIMESTAMP WITH TIME ZONE GENERATED ALWAYS AS DATE_TRUNC('month', "time"),
   sensor_id TEXT NOT NULL,
   battery_level DOUBLE PRECISION,
   battery_status TEXT,
   battery_temperature DOUBLE PRECISION
)
PARTITIONED BY (time_month);

Creating a repository

The cold storage is represented in CrateDB as a repository. For this tutorial, we are creating a new repository pointing to an already existing S3 bucket:

CREATE REPOSITORY export_cold TYPE s3 WITH (access_key = '...', secret_key = '...', bucket = 'cratedb-cold-storage');

For other types of supported repositories, please see the CREATE REPOSITORY reference and choose your preferred repository type.

Implementation

The implementation in Airflow is represented as a DAG (Directed Acyclic Graph). A DAG describes the orchestration logic of a set of tasks. In our context, tasks are usually SQL queries.

To combine all of the previously introduced retention strategies, we make use of a control table that stores all retention policies. The different types of policies are distinguished by their strategy name. For our strategy, we introduce the name snapshot and add an entry to snapshot data older than 12 months:

INSERT INTO doc.retention_policies (table_schema, table_name, partition_column, retention_period, target_repository_name, strategy) VALUES ('doc', 'sensor_readings', 'time_month', 365, 'export_cold', 'snapshot');

Identifying cold partitions

As we are using time_month as the partition key, the value of that key represents the age of the partition. This allows us to easily identify partitions that should be transferred to cold storage, by querying partition metadata stored in information_schema:

SELECT values['time_month']
FROM information_schema.table_partitions
WHERE table_schema = 'doc'
  AND table_name = 'sensor_readings'
  AND values['time_month'] < NOW() - '12 months'::INTERVAL;

In our particular demo case, one row with the timestamp 1480550400000 is returned.

Transferring cold partitions

For each of the above-identified partitions, a corresponding CREATE SNAPSHOT statement is issued, which exports the partition to our cold storage repository. The scope of a snapshot can vary from the whole cluster to only specific partitions. While for general backup purposes, you would typically snapshot the whole cluster, we are generating one snapshot per partition in this example. This simplifies a potential restore later on.

CREATE SNAPSHOT export_cold."sensor_readings-1480550400000" TABLE doc.sensor_readings PARTITION (time_month = 1480550400000) WITH ("wait_for_completion" = true);

The wait_for_completion parameter guarantees that the snapshot generation is complete before we proceed with deleting any data. Once the creation succeeded, the partition can be dropped from the original table. To drop a partition, a DELETE statement is used with the partition key and its value:

DELETE FROM sensor_readings WHERE time_month = 1480550400000;

Airflow DAG

The data_retention_snapshot_dag.py consists of two tasks that combine the described queries:

  1. get_policies: Queries CrateDB to retrieve all expired partitions according to doc.retention_policies.
  2. map_policy (not shown in the graph above): A helper method transforming the list of policies into a dict Python data structure. for easier handling
  3. snapshot_partitions: For each partition, generates the CREATE SNAPSHOT SQL query and executes it.
  4. delete_partitions: For each partition, generates the DELETE SQL query and executes it.

Restoring cold partitions


Note

The queries to look up snapshots require CrateDB >= 4.7.0. The general RESTORE SNAPSHOT command is available in older CrateDB versions as well.

For older CrateDB versions, information on the snapshots can be inferred from their name, if corresponding naming conventions have been used. Example: SELECT name FROM sys.snapshots WHERE name SPLIT_PART(name, '-', 2) = 1480550400000::TEXT;


Should the need arise to again query partitions that have been removed from the original table, the corresponding snapshot can get restored. Restoring is done manually and not part of the above-described automation.

The below query identifies the snapshot based on the table name and the time_month values to be restored. For simplicity, the query makes two assumptions:

  1. Each snapshot includes exactly one partition
  2. The partition key consists of exactly one column

A more sophisticated query is provided under Advanced partition setups.

SELECT repository, name, started
FROM sys.snapshots
WHERE table_partitions[1]['table_schema'] = 'doc'
  AND table_partitions[1]['table_name'] = 'sensor_readings'
  AND table_partitions[1]['values'] = [1480550400000];

A result set of one row is returned in our case, listing the snapshot created above:

+-------------+-------------------------------+---------------+
| repository  | name                          |       started |
+-------------+-------------------------------+---------------+
| export_cold | sensor_readings-1480550400000 | 1636551691062 |
+-------------+-------------------------------+---------------+

To restore the partition, compose a RESTORE SNAPSHOT statement with the information retrieved from the result set:

RESTORE SNAPSHOT export_cold."sensor_readings-1480550400000" TABLE doc.sensor_readings PARTITION (time_month = 1480550400000);

After restoring, the partition will re-appear in the original table and can be queried the same way as before the deletion.

Advanced partition setups

If your partition key is composed of more than one column, the query to identify could partitions needs to be extended. It also enables use cases where more than one partition is stored in a snapshot, which can be the case when running full cluster-wide snapshots (rather than dedicated per-partition snapshots).

Before running the query, please create a user-defined function:

CREATE OR REPLACE FUNCTION array_index(input_array ARRAY(TEXT), search_name TEXT)
RETURNS INTEGER
LANGUAGE JAVASCRIPT
AS 'function array_index(input_array, search_name) {
      return Array.prototype.findIndex.call(input_array, (element) => element == search_name) + 1;
   }';

To search for matching snapshots, snapshot metadata gets joined with table metadata. Replace the schema name (doc), table name (sensor_readings), partition key (time_month), and the partition timestamp (1480550400000) with your values:

SELECT repository,
       name AS snapshot_name,
       started
FROM (
  SELECT repository,
         name,
         started,
         s.snapshot['values'] AS values,
         -- replace time_month with your partition key
         array_index(t.partitioned_by, 'time_month') AS partition_key_index
  FROM information_schema.tables t
  JOIN (
    SELECT repository,
           name,
           UNNEST(table_partitions) snapshot,
           started
    FROM sys.snapshots
  ) s ON s.snapshot['table_schema'] = t.table_schema AND s.snapshot['table_name'] = t.table_name
  -- replace table_schema/table_name with your table
  WHERE t.table_schema = 'doc'
    AND t.table_name = 'sensor_readings'
) snapshots_unnested
-- replace 1480550400000 with your partition timestamp
WHERE values[partition_key_index] = 1480550400000
ORDER BY 1, 2, 3 DESC;
2 Likes