CrateDB and Apache Airflow: Building a hot/cold storage data retention policy

In this fourth article on automating recurrent CrateDB queries with Apache Airflow, we will present a second strategy for implementing a data retention policy. Previously, we shared the Data Retention Delete DAG, which dropped old partitions after a certain period of time. In that article, we introduce the complementary strategy of a hot/cold storage approach.

What is a hot/cold storage strategy?

A hot/cold storage strategy is often motivated by a tradeoff between performance and cost-effectiveness. In a database such as CrateDB, more recent data tends to have a higher significance for analytical queries. Well-performing disks (hot storage) play a key role on the infrastructure side to support performance requirements but can come at a high cost. As data ages and gets less business-critical for near-real-time analysis, transitioning it to slower/cheaper disks (cold storage) helps to improve the cost-performance ratio.

In a CrateDB cluster, nodes can have different hardware specifications. Hence, a cluster can consist of a combination of hot and cold storage nodes, each with respective disks. By assigning corresponding attributes to nodes, CrateDB can be made aware of such node types and consider if when allocating partitions.

CrateDB setup

To create a multi-node setup, we make use of Docker Compose to spin up three nodes – two hot nodes, and one cold node. For the scope of this article, we will not actually use different hardware specifications for disks, but each disk is represented as a separate Docker volume.

The designation of a node type is done by passing the -Cnode.attr.storage parameter to each node with the value hot or cold. The resulting docker-compose.yml file with two hot nodes and one cold node is as follows:

version: '3.8'
services:
  cratedb01:
    image: crate:latest
    ports:
      - "4201:4200"
      - "5532:5432"
    volumes:
      - /tmp/crate/hot-01:/data
    command: ["crate",
              "-Ccluster.name=crate-docker-cluster",
              "-Cnode.name=cratedb01",
              "-Cnode.attr.storage=hot",
              "-Cnetwork.host=_site_",
              "-Cdiscovery.seed_hosts=cratedb02,cratedb03",
              "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03",
              "-Cgateway.expected_nodes=3",
              "-Cgateway.recover_after_nodes=2"]
    environment:
      - CRATE_HEAP_SIZE=2g

  cratedb02:
    image: crate:latest
    ports:
      - "4202:4200"
      - "5632:5432"
    volumes:
      - /tmp/crate/hot-02:/data
    command: ["crate",
              "-Ccluster.name=crate-docker-cluster",
              "-Cnode.name=cratedb02",
              "-Cnode.attr.storage=hot",
              "-Cnetwork.host=_site_",
              "-Cdiscovery.seed_hosts=cratedb01,cratedb03",
              "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03",
              "-Cgateway.expected_nodes=3",
              "-Cgateway.recover_after_nodes=2"]
    environment:
      - CRATE_HEAP_SIZE=2g

  cratedb03:
    image: crate:latest
    ports:
      - "4203:4200"
      - "5732:5432"
    volumes:
      - /tmp/crate/cold-03:/data
    command: ["crate",
              "-Ccluster.name=crate-docker-cluster",
              "-Cnode.name=cratedb03",
              "-Cnode.attr.storage=cold",
              "-Cnetwork.host=_site_",
              "-Cdiscovery.seed_hosts=cratedb01,cratedb02",
              "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03",
              "-Cgateway.expected_nodes=3",
              "-Cgateway.recover_after_nodes=2"]
    environment:
      - CRATE_HEAP_SIZE=2g

The cluster is started via docker-compose up. For more details, please see the above-linked documentation.

Once the cluster is up and running, we create our partitioned time-series table. By specifying "routing.allocation.require.storage" = 'hot' in the WITH clause, we configure new partitions to be placed on a hot node.

CREATE TABLE IF NOT EXISTS doc.raw_metrics (
   "model" TEXT,
   "objectid" TEXT,
   "variable" TEXT,
   "timestamp" TIMESTAMP WITH TIME ZONE,
   "ts_day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"),
   "value" REAL,
   "quality" INTEGER,
   PRIMARY KEY ("model", "objectid", "variable", "timestamp", "ts_day")
)
PARTITIONED BY (ts_day)
WITH ("routing.allocation.require.storage" = 'hot');

To validate the allocation of shards we insert a sample row:

INSERT INTO doc.raw_metrics (model, objectid, variable, timestamp, value, quality) VALUES ('water-sensors', 5, 'water-flow', NOW() - '5 months'::INTERVAL, 12, 1);

The INSERT statement will implicitly trigger the creation of a new partition consisting of six shards. Since we configured cratedb01 and cratedb02 as hot nodes, we expect shards to be allocated only on those two nodes, and not on cratedb03 (which is a cold node). The allocation can be validated by navigating to the “Shards” section in the Admin UI:

As expected, primary shards, as well as replicas, are evenly distributed between the first two nodes, while no shards are stored on the third node.

Next, we will create a table storing the retention policy used to transition partitions from hot to cold nodes:

CREATE TABLE IF NOT EXISTS doc.retention_policies (
   "table_schema" TEXT,
   "table_name" TEXT,
   "partition_column" TEXT NOT NULL,
   "retention_period" INTEGER NOT NULL,
   "reallocation_attribute_name" TEXT,
   "reallocation_attribute_value" TEXT,
   "strategy" TEXT NOT NULL,
   PRIMARY KEY ("table_schema", "table_name", "strategy")
)
CLUSTERED INTO 1 SHARDS;

The schema is an extension of what was introduced in the first article on the Data Retention Delete DAG. The strategy column allows switching between the previously introduced dropping of partitions (delete) and the now added reallocation (reallocate). For our raw_metrics table, we add a policy of transitioning from hot to cold nodes after 60 days:

INSERT INTO doc.retention_policies VALUES ('doc', 'raw_metrics', 'ts_day', 60, 'storage', 'cold', 'reallocate');

To remember which partitions have already been reallocated, we create an additional tracking table that gets populated by the DAG:

CREATE TABLE IF NOT EXISTS doc.retention_policy_tracking (
   "table_schema" TEXT,
   "table_name" TEXT,
   "strategy" TEXT,
   "last_partition_value" TIMESTAMP WITH TIME ZONE NOT NULL,
   PRIMARY KEY (table_schema, table_name, strategy)
)
CLUSTERED INTO 1 SHARDS;

Airflow setup

We assume that a basic Astronomer/Airflow setup is already in place, as described in our first post of this series. The general idea behind the hot/cold DAG implementation is similar to the one introduced in the initial data retention post. Let’s quickly go through the steps of the algorithm:

  1. get_policies: A query on doc.retention_policies and information_schema.table_partitions identifies partitions affected by a retention policy.
  2. map_policies: A helper method transforming the output of get_policies into a Python dict data structure.
  3. generate_sql_reallocate: For each partition, an SQL statement for changing the allocation setting is generated. In our case, this results in the statement ALTER TABLE <table> PARTITION (<partition key> = <partition value>) SET ("routing.allocation.require.storage" = 'cold');.
  4. reallocate_partitions: Executes the previously generated SQL statement. The CrateDB cluster will then automatically initiate the relocation of the affected partition to a node that fulfills the requirement (cratedb03 in our case).
  5. add_tracking_information: For each partition, an upsert is performed on the table doc.retention_policy_tracking to save the information that reallocation has been performed for the partition. This information is needed for the next DAG run to prevent reallocating the same partition again.

The full implementation is available as data_retention_reallocate_dag.py on GitHub.

To validate our implementation, we trigger the DAG once manually via the Airflow UI at http://localhost:8081. Once executed, log messages of the apply_data_retention_policies task confirm the reallocation was triggered for the partition with the sample data set up earlier:

[2021-12-08, 12:39:44 UTC] {data_cleanup_dag.py:47} INFO - Reallocating partition ts_day = 1625702400000 for table doc.raw_metrics to storage = cold
[2021-12-08, 12:39:44 UTC] {dbapi.py:225} INFO - Running statement: ALTER TABLE doc.raw_metrics PARTITION (ts_day = 1625702400000) SET ("routing.allocation.require.storage" = 'cold');
, parameters: None

Revisiting the “Shards” section in the CrateDB Admin UI confirms that all shards have been moved to cratedb03. Since the default replication setting is 0-1 and there is only one cold node in this setup, replicas have been discarded.

Combined hot/cold and deletion strategy

The presented hot/cold storage strategy also integrates seamlessly with the previously introduced Data Retention Delete DAG. Both strategies can be combined:

  1. Transition to cold nodes: Reallocates partitions from (expensive) hot nodes to (cheaper) cold nodes
  2. Deletion from cold nodes: After another retention period on cold nodes, permanently delete partitions

Both DAGs use the same control table for retention policies. In our example, we already added an entry for the reallocate strategy after 60 days. If we want to keep partitions on cold nodes for another 60 days and then discard them, we add an additional delete policy. Note that the retention periods are not additive, i.e. we need to specify the delete retention period as 120 days:

INSERT INTO doc.retention_policies (table_schema, table_name, partition_column, retention_period, strategy) VALUES ('doc', 'raw_metrics', 'ts_day', 120, 'delete');

Summary

Building upon the previously discussed data retention policy implementation, we showed that reallocating partitions seemingly integrates and consists only of a single SQL statement.
CrateDB’s self-organization capabilities take care of all low-level operations and the actual moving of partitions. Furthermore, we showed that a multi-staged approach of data retention policies can be achieved to first reallocate and eventually delete partitions permanently.

2 Likes

Inspired from above blog, I created a table in my three node crate cluster. Table is partitioned by a workday column which is timestamp. I created table with below command,

create table hk.workitems (
	col1 long,	
	col2 long,
	workday timestamp,
	more_cols object as (
	col3 long,	
	col4 long,
       ),
	INDEX col_ft using fulltext(col1,col2),
	primary key (col1,col2,workday )
) PARTITIONED BY (workday)
WITH (
   "translog.durability" = 'ASYNC',
"routing.allocation.include._name" = 'nodeX,nodeY'
);

where nodeX and nodeY are the names of 2 of my nodes given as node.name in crate.yml file on two instances.

When I inserted the data, it went only on these two nodes and that I could figure out by running below command

select distinct node['name'] from sys.shards where table_name = 'workitems' limit 100;

This showed me only two nodes, nodeX and nodeY. So far it worked like as described here.

After that I decided to move a partition to nodeZ using below command.

ALTER TABLE hk.workitems PARTITION ( workday = 1652121000000) SET ("routing.allocation.require._name" = 'nodeZ');

I was expecting data to move on nodeZ but that did not happen. I see all shards on nodeX and nodeY only. nodeZ is still blank.
Did I miss any step ?

Hi @vinayak.shukre,

After you run the ALTER TABLE query you have two shard allocation filters set

  • routing.allocation.require._name from the ALTER TABLE
  • routing.allocation.include._name from the CREATE TABLE

As the second filter (require name ‘nodeZ’) contradicts the first filter (include name ‘nodeX’ or ‘nodeY’) no shards are moved.

Once you reset the routing.allocation.include._name filter all your shards will move to nodeZ

ALTER TABLE hk.workitems PARTITION ( workday = 1652121000000) RESET ("routing.allocation.include._name" );
1 Like

Thanks. I think I will try with node.attr.storage attribute and then I can assign same value for two nodes and then use the routing.allocation.require instead of routing.allocation.include.

It is working as expected with node.attr.storage attribute. Thanks @jayeff.

3 Likes