CrateDB and Apache Airflow: Implementation of Data Retention Policy

Written by Niklas Schmidtmer and Marija Selakovic

What is Data Retention Policy?

Data retention policy describes the practice of storing and managing data for a designated period of time. Once a data set completes its retention period, it should be deleted or archived, depending on requirements. Implementing data retention policies in the right way ensures compliance with existing guidelines and regulations, such as data privacy law, optimizes storage space by discarding outdated data, and reduces storage costs.

Specification of Data Retention Policy in CrateDB

In the previous tutorial, we illustrated how to use CrateDB and Apache Airflow to automate periodic data export to a remote filesystem with the infrastructure provided by Astronomer. In this tutorial, we focus on a more complex use case: implementation of effective retention policies for time-series data. To define retention policies we create a new table in CrateDB with the following schema:

CREATE TABLE IF NOT EXISTS "doc"."retention_policies" (
   "table_schema" TEXT,
   "table_name" TEXT,
   "partition_column" TEXT NOT NULL,
   "retention_period" INTEGER NOT NULL,
   "strategy" TEXT NOT NULL,
   PRIMARY KEY ("table_schema", "table_name")

Retention policy requires the use of a partitioned table, as in CrateDB, data can be deleted in an efficient way by dropping partitions. Therefore, for each retention policy, we store table schema, table name, the partition column, and the retention period defining how many days data should be retained.
The strategy column is reserved for future implementations of additional data retention policies. For now, we will always set it to the value delete.

Next, define the table for storing demo data:

CREATE TABLE IF NOT EXISTS "doc"."raw_metrics" (
   "model" TEXT,
   "objectid" TEXT,
   "variable" TEXT,
   "ts_day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"),
   "value" REAL,
   "quality" INTEGER,
   PRIMARY KEY ("model", "objectid", "variable", "timestamp", "ts_day")

For demo data, we rely on existing sources, but we leave it up to the user to choose its data source. The important part is that data should be partitioned: in our case, we partition the table on the ts_day column. Finally, we store the retention policy of 1 day for demo data into the retention_policies table:

INSERT INTO retention_policies VALUES ('doc', 'raw_metrics', 'ts_day', 1, 'delete');

Implementation in Apache Airflow

To automate the process of deleting expired data we use Apache Airflow. Our workflow implementation does the following: once a day, fetch policies from the database and delete all data for which the retention period expired.

Retrieving Retention Policies

The first step consists of a task that queries partitions affected by retention policies. We do this by joining retention_policies and information_schema.table_partitions tables and selecting values with expired retention periods. In CrateDB, information_schema.table_partitions [documentation] contains information about all partitioned tables including the name of the table, schema, partition column, and the values of the partition.
The resulting query is constructed as:

SELECT QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name),
       TRY_CAST(p.values[r.partition_column] AS BIGINT)
FROM information_schema.table_partitions p
JOIN doc.retention_policies r ON p.table_schema = r.table_schema
  AND p.table_name = r.table_name
  AND p.values[r.partition_column] < '{date}'::TIMESTAMP - r.retention_period
WHERE r.strategy = 'delete';

To separate SQL logic from orchestration logic, we save the query as a file to include/data_retention_retrieve_delete_policies.sql.

In the query, we use again the logical {date} to filter partitions. This is especially useful in case of failing workflow: the next time Airflow will pick up the date on which the job failed. This makes job runs consistent. To implement the query above we use the PythonOperator. PythonOperator is one of the most flexible Airflow operators allowing users to execute Python callable functions from the DAG. The most important reason behind choosing this type of operator is the need to pass the query result to the next operator. In our case that would be the list of affected partitions. However, it would be natural to expect that we want to execute a query on CrateDB as a PostgresOperator, but since this operator always returns None as a result, we would not be able to access the query result outside the operator.

The implementation of the corresponding PythonOperator looks as follows:


def get_policies(logical_date):
    pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
    sql = Path('include/data_retention_retrieve_delete_policies.sql') \
    records = pg_hook.get_records(sql=sql)

    return json.dumps(records)

with DAG(
    start_date=datetime.datetime(2021, 11, 19),
) as dag:
    get_policies = PythonOperator(
            "logical_date": "{{ ds }}",


The first step is to create the function get_policies that takes as a parameter the logical date. The SQL statement gets loaded from a file. The PostgresHook establishes the connection with CrateDB. A PostgresHook takes the information from the postgres_conn_id and hooks us up with the CrateDB service. Then, the function executes the query and returns the result. To invoke the function, we create a new PythonOperator and pass the logical data as a parameter.

Cross-Communication Between Tasks

Before we continue into the implementation of the next task in Apache Airflow, we would like to give a brief overview of how the data is communicated between different tasks in a DAG. For this purpose, Airflow introduces the XCom system. Simply speaking XCom can be seen as a small object with storage that allows tasks to push data into that storage that can be later used by a different task in the DAG.

The key thing here is that it allows the exchange of a small amount of data between tasks. From Airflow 2.0, if a PythonOperator calls a function that returns a value, that value will be automatically stored in XCom. For our example, this means that the retention_policies return value is available from the next task after the get_policies operator executes. To assess the data from another task, the xcom_pull method should be used. This method is invoked on a current task instance whose copy is passed from the operator that runs the callable function, e.g. PythonOperator.

Applying Retention Policies

After we make sure that the list of retention policies is stored in XCom, we need to create another task that will go through each element in the list and delete expired data. We do this by dynamically creating PostgresOperator, as illustrated with the code below:

def delete_partitions(ti):
    retention_policies = ti.xcom_pull(task_ids="retrieve_retention_policies")
    policies_obj = json.loads(retention_policies)

    for policy in policies_obj:
        partition = map_policy(policy)"Deleting partition %s = %s for table %s",


The result of the first task is retrieved with an xcom_pull function call on the current task instance. The xcom_pull function requires as an input the ID of the task that stored the values, in our case task_ids="retrieve_retention_policies".
After fetching the policies, we iterate through the list, and first, call a helper function map_policy that converts the policy array into a Python Dictionary for easier named access to each element.
Then, we create a new PostgresOperator that executes the DELETE statement on the affected table. We delete only rows where the partition column is equal to the value that satisfies the retention policy (include/data_cleanup_delete.sql):

DELETE FROM {table} WHERE {column} = {value};

Finally, we wrap this logic into another PythonOperator. To make sure that the delete_partitions function has access to the current task instance we set the provide_context=True parameter as illustrated below:

 apply_policies = PythonOperator(

The last step is to specify task dependencies:

get_policies >> apply_policies

The full DAG implementation of the data retention policy can be found in our GitHub repository. To run the workflow, we rely on Astronomer infrastructure with the same setup as shown in the first part of the CrateDB and Apache Airflow tutorial.


This tutorial gives a guide on how to delete data with expired retention policies. The first part shows how to design policies in CrateDB and then, how to use Apache Airflow to automate the data deletion. The DAG implementation is fairly simple: the first task performs the extraction of relevant policies, while the second task makes sure that affected partitions are deleted. In the following tutorial, we will focus on another real-world example that can be automated with Apache Airflow and CrateDB.