CrateDB and Apache Airflow: Implementation of Data Retention Policy

Written by Niklas Schmidtmer and Marija Selakovic

What is Data Retention Policy?

Data retention policy describes the practice of storing and managing data for a designated period of time. Once a data set completes its retention period, it should be deleted or archived, depending on requirements. Implementing data retention policies in the right way ensures compliance with existing guidelines and regulations, such as data privacy law, optimizes storage space by discarding outdated data, and reduces storage costs.

Specification of Data Retention Policy in CrateDB

In the previous tutorial, we illustrated how to use CrateDB and Apache Airflow to automate periodic data export to a remote filesystem with the infrastructure provided by Astronomer. In this tutorial, we focus on a more complex use case: implementation of effective retention policies for time-series data. To define retention policies we create a new table in CrateDB with the following schema:

CREATE TABLE IF NOT EXISTS "doc"."retention_policies" (
   "table_schema" TEXT,
   "table_name" TEXT,
   "partition_column" TEXT NOT NULL,
   "retention_period" INTEGER NOT NULL,
   PRIMARY KEY ("table_schema", "table_name")
)

Retention policy requires the use of a partitioned table, as in CrateDB, data can be deleted in an efficient way by dropping partitions. Therefore, for each retention policy, we store table schema, table name, the partition column, and the retention period defining how many days data should be retained.

Next, define the table for storing demo data:

CREATE TABLE IF NOT EXISTS "doc"."raw_metrics" (
   "model" TEXT,
   "objectid" TEXT,
   "variable" TEXT,
   "timestamp" TIMESTAMP WITH TIME ZONE,
   "ts_day" TIMESTAMP ALWAYS AS date_trunc('day', "timestamp"),
   "value" REAL,
   "quality" INTEGER,
   PRIMARY KEY ("model", "objectid", "variable", "timestamp", "ts_day")
)
PARTITIONED BY ("ts_day")

For demo data, we rely on existing sources, but we leave it up to the user to choose its data source. The important part is that data should be partitioned: in our case, we partition the table on the ts_day column. Finally, we store the retention policy of 1 day for demo data into the retention_policies table:

INSERT INTO retention_policies VALUES ('doc', 'raw_metrics', 'ts_day', 1);

Implementation in Apache Airflow

To automate the process of deleting expired data we use Apache Airflow. Our workflow implementation does the following: once a day fetch policies from the database and delete all data for which the retention period expired.

Retrieving Retention Policies

The first step consists of a task that queries partitions affected by retention policies. We do this by joining retention_policies and information_schema.table_partitions tables and selecting values with expired retention periods. In CrateDB, information_schema.table_partitions [documentation] contains information about all partitioned tables including the name of the table, schema, partition column, and the values of the partition. The resulting query is constructed as:

SELECT QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name) as fqn,
    r.partition_column,
    p.values[r.partition_column]
FROM information_schema.table_partitions p
    JOIN doc.retention_policies r ON p.table_schema = r.table_schema
    AND p.table_name = r.table_name
    AND p.values[r.partition_column] < { date } - r.retention_period;

Again, we use the logical {date} to filter partitions. This is especially useful in case of failing workflow: the next time Airflow will pick up the date on which the job failed. This makes job runs consistent. To implement the query above we use the PythonOperator. PythonOperator is one of the most flexible Airflow operators allowing users to execute Python callable functions from the DAG. The most important reason behind choosing this type of operator is the need to pass the query result to the next operator. In our case that would be the list of affected partitions. However, it would be natural to expect that we want to execute a query on CrateDB as a PostgresOperator, but since this operator always returns None as a result, we would not be able to access the query result outside the operator.

The implementation of the corresponding PythonOperator looks as follows:

  def get_policies(sql):
    pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
    records = pg_hook.get_records(sql=sql)
    retention_policies = json.dumps(records)
    return retention_policies
  
  get_policies = PythonOperator(
        task_id="retrieve_retention_policies",
        python_callable=get_policies,
        op_kwargs={
            "sql": """ SELECT QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name) as fqn, 
                        r.partition_column, p.values[r.partition_column] 
                       FROM information_schema.table_partitions p JOIN doc.retention_policies r ON p.table_schema = r.table_schema 
                       AND p.table_name = r.table_name 
                       AND p.values[r.partition_column] < {date}::TIMESTAMP - r.retention_period;"""
            .format(date="{{ ds }}")
        }

As we can see, the first step is to create the function get_policies that takes as a parameter a SQL statement to be executed. It establishes the connection with CrateDB by using the PostgresHook object. A PostgresHook takes the information from the postgres_conn_id and hooks us up with the CrateDB service. Then, the function executes the query and returns the result. To invoke the function, we create a new PythonOperator and specify the query as a parameter.

Cross-Communication Between Tasks

Before we continue into the implementation of the next task in Apache Airflow, we would like to give a brief overview on how the data is communicated between different tasks in a DAG. For this purpose, Airflow introduces the XCom system. Simply speaking XCom can be seen as a small object with storage that allows tasks to push data into that storage that can be later used by a different task in the DAG.

The key thing here is that it allows the exchange of a small amount of data between tasks. From Airflow 2.0, if a PythonOperator calls a function that returns a value, that value will be automatically stored in XCom. For our example, this means that the retention_policies return value is available from the next task after the get_policies operator executes. To assess the data from another task, the xcom_pull method should be used. This method is invoked on a current task instance whose copy is passed from the operator that runs the callable function, e.g. PythonOperator.

Applying Retention Policies

After we make sure that the list of retention policies is stored in XCom, we need to create another task that will go through each element in the list and delete expired data. We do this by dynamically creating PostgresOperator, as illustrated with the code below:

def delete_partitions(ti):
    retention_policies = ti.xcom_pull(task_ids="retrieve_retention_policies")
    policies_obj = json.loads(retention_policies)
    for policy in policies_obj:
        table_name = policy[0]
        column_name = policy[1]
        partition_value = policy[2]
        PostgresOperator(
            task_id="delete_from_{table}".format(table=str(table_name)),
            postgres_conn_id="cratedb_connection",
            sql="DELETE FROM %(table)s WHERE %(column)s=%(value)s",
            parameters={
                "table": str(table_name),
                "column": str(column_name),
                "value": partition_value,
            },
        ).execute(dict())

The result of the first task is retrieved with an xcom_pull function call on the current task instance. The xcom_pull function requires as an input the ID of the task that stored the values, in our case task_ids="retrieve_retention_policies". After fetching the policies, we iterate through the list, and for each element, we first extract the table name, partition column, and partition value. Then, we create a new PostgresOperator that executes the DELETE statement on the affected table. We delete only rows where the partition column is equal to the value that satisfies the retention policy.

Finally, we wrap this logic into another PythonOperator. To make sure that the delete_partitions function has access to the current task instance we set the provide_context=True parameter as illustrated below:

 apply_policies = PythonOperator(
        task_id="apply_data_retention_policies",
        python_callable=delete_partitions,
        provide_context=True,
        op_kwargs={},
    )

The last step is to specify task dependencies:

get_policies >> apply_policies

The full DAG implementation of the data retention policy can be found in our GitHub repository. To run the workflow, we rely on Astronomer infrastructure with the same setup as shown in the first part of the CrateDB and Apache Airflow tutorial.

Summary

This tutorial gives a guide on how to delete data with expired retention policies. The first part shows how to design policies in CrateDB and then, how to use Apache Airflow to automate the data deletion. The DAG implementation is fairly simple: the first task performs the extraction of relevant policies, while the second task makes sure that affected partitions are deleted. In the following tutorial, we will focus on another real-world example that can be automated with Apache Airflow and CrateDB.

4 Likes