CrateDB and Apache Airflow: Automating stock data collection and storage


Previously, on the How to automate financial data collection and storage in CrateDB with Python and pandas post in CrateDB’s community, I wrote a step-by-step tutorial on how to get stock market data from S&P-500 companies and store them in CrateDB using Jupyter Notebook.

Although Jupyter is a great environment for prototyping, where I can get instant code evaluations and easily plot data, it’s not production efficient due to difficult unit testing, no proper dependency management, and no code versioning, among others.

Airflow, on the other hand, is widely used in production, runs Python code, allows code versioning, and I can easily track the execution of my tasks in their user-friendly UI. So, in this tutorial, I will demonstrate how to adapt the Python code I already had on Jupyter to run as an Airflow DAG.

This post will show how to:

  • write Python functions as Airflow tasks
  • create a DAG
  • run a DAG in Airflow

The complete DAG, as well as installing and starting instructions, can be found in our GitHub repository.


This blog post is part of the CrateDB and Apache Airflow series, a series with various use cases of Airflow and CrateDB. If you are interested in learning more about Airflow, Astronomer, and getting more detailed setup steps, head to the first post CrateDB and Apache Airflow: Automating Data Export to S3, which was the foundation for this tutorial.

I follow the tutorial mentioned above and have both CrateDB and Airflow up and running. Now I can head for the next steps and create Airflow tasks.


This is a high-level explanation of the tasks. If you wish to know more about the logic behind them, the previous tutorial has an in-depth explanation of each of the functions used in this section.

I can divide my script into the following tasks:

  • Download financial data from Yahoo
  • Prepare the data
  • Format and insert data into CrateDB

And I go through each one of them separately:

Download financial data

In the get_sp500_ticker_symbols function, I get the ticker symbols from the List of S&P-500 companies and return them.

In download_YFinance_data_function, I get these tickers symbols, download the Adjusted Close data for each of them using the yfinance API and return it as a JSON object.

def get_sp500_ticker_symbols():
    """Extracts SP500 companies' tickers from the SP500's wikipedia page"""

    # Getting the html code from S&P 500 wikipedia page
    url = ""
    r_html = requests.get(url,timeout=2.5).text
    soup = BeautifulSoup(r_html, 'html.parser')

    # The stock tickers are found in a table in the wikipedia page,
    # whose html "id" attribute is "constituents". Here, the html
    # soup is filtered to get the  table contents
    table_content = soup.find(id="constituents")

    # The stocks' data is stored in a 'tbody' division in the table,
    # so we use it to filter the table content.
    # Each stock's information is stored in a 'tr' division,
    # so we use this as a filter to generate a list of stock data.
    # The first section (index=0) in the generaed list contains
    # the headers (which are unimportant in this context), therefore,
    # only data from index=1 on is taken.
    stocks_data = table_content.find("tbody").find_all("tr")[1:]
    tickers = []

    # extracting the tickers from each stock's data
    for stock in stocks_data:
        ticker = stock.text.split("\n")[1]

    return tickers

def download_yfinance_data_function(start_date):
    """downloads Adjusted Close data from SP500 companies"""

    tickers = get_sp500_ticker_symbols()
    data =, start=start_date)['Adj Close']
    return data.to_json()

Prepare the data

In Airflow, it’s not possible to trivially pass arguments between functions. Such data can, however, be stored temporarily in Airflow’s database (for instance, data returned by a function is automatically stored there) and pulled from functions as XCom variables, more on that at XComs — Airflow Documentation.

For this reason, as I want to get the downloaded data from the previous function, I must pass ti to the prepare_data_function as a parameter, and pull the data as shown in the first code lines.

Here, I get the extracted financial data, clean it, turn each company’s data into a dictionary of values and store it in a list of dictionaries values_dict.

def prepare_data_function(ti):
    """creates a list of dictionaries with clean data values"""

    # pulling data (as string)
    string_data = ti.xcom_pull(task_ids='download_data_task')

    # transforming to json
    json_data = json.loads(string_data)

    # transforming to dataframe for easier manipulation
    df = pd.DataFrame.from_dict(json_data, orient='index')

    values_dict = []

    for col, closing_date in enumerate(df.columns):

        for row, ticker in enumerate(df.index):
            adj_close = df.iloc[row, col]

            if not(adj_close is None or math.isnan(adj_close)):
                    {'closing_date': closing_date, 'ticker': ticker, 'adj_close': adj_close}

    return values_dict

Format and insert data into CrateDB

Finally, I have a function to format the data values into SQL statements and insert these values into CrateDB.

def format_and_insert_data_function(ti):
    """formats values to SQL standards and inserts financial data values into CrateDB"""

    values_dict = ti.xcom_pull(task_ids='prepare_data_task')
    insert_stmt = "INSERT INTO sp500 (closing_date, ticker, adjusted_close) VALUES "
    formatted_values = []

    for values in values_dict:
            f"({values['closing_date']}, '{values['ticker']}', {values['adj_close']})"

    insert_stmt += ", ".join(formatted_values) + ";"

    insert_data_task = PostgresOperator(


Now that I have gone through the functions, it’s time to finally create the Airflow DAG.

Create the DAG

I must gather these functions into the DAG structure for Airflow.

Inside the dags folder in my project, I add a file with the following structure:

  • Import operators
  • Import python modules
  • Declare functions
  • Set DAG and its tasks

As a prerequisite to insert data, the table sp500 must be manually created (once) in CrateDB.

Downloads stock market data from S&P 500 companies and inserts it into CrateDB.

In CrateDB, the schema to store this data needs to be created once manually.
See the file setup/financial_data_schema.sql in this repository.

import datetime
import math
import json
import requests
from bs4 import BeautifulSoup
import yfinance as yf
import pandas as pd

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator





with DAG(
    start_date=datetime.datetime(2022, 1, 10),
) as dag:

    download_data_task = PythonOperator(task_id='download_data_task',
                                        "start_date": "{{ ds }}",

    prepare_data_task = PythonOperator(task_id='prepare_data_task',

    format_and_insert_data_task = PythonOperator(task_id='format_and_insert_data_task',

download_data_task >> prepare_data_task >> format_and_insert_data_task

Run DAG in Airflow

With the all set, I save it and see it in the Airflow UI.

I click on the start button in the right corner to trigger the DAG and click on the Last Run to follow the execution

This shows the DAG has been executed without errors.

I navigate to the CrateDB Admin UI to check whether the data was inserted, and I see my sp500 table is already filled with data:

I make an easy query to get some company’s data with

FROM sp500
WHERE ticker = 'AAPL' LIMIT 100;

And I get the historical stock data instantly!