Import Parquet files into CrateDB using Apache Arrow and SQLAlchemy

This tutorial introduces a way to import Parquet files into CrateDB using the Apache Arrow and SQLAlchemy libraries in Python.

What is a Parquet file?

Apache Parquet is a free and open-source column-oriented data storage format. It provides an optimized data storage and retrieval due to its efficient data compression, which is able to handle complex data structures in bulk. Even though for this tutorial we are going to use Python, Parquet files are compatible with multiple languages and data processing frameworks. Here it will be used to transfer data from a data storage to CrateDB.

Pre-requisites

Besides using the Crate library, we are going to use Pandas, SQLAlchemy and Apache Arrow libraries as well, so you should install them. To do so, you can use the following pip install command. To check the latest version supported by CrateDB, have a look at the CrateDB documentation - SQLAlchemy.

pip install "crate[sqlalchemy]" pandas pyarrow

Before getting started, make sure to create the table that will be populated. In this tutorial we will use a Parquet file containing information from yellow taxi rides from January 2022 in New York, refer to this link to download the file. To create the corresponding table, you can use the following SQL statement, which can be executed in CrateDB console, for example.

CREATE TABLE "doc"."ny_taxi" (
   "id" TEXT NOT NULL,
   "vendorID" TEXT,
   "tpep_pickup_datetime" TIMESTAMP WITH TIME ZONE,
   "tpep_dropoff_datetime" TIMESTAMP WITH TIME ZONE,
   "passenger_count" INTEGER,
   "trip_distance" REAL,
   "PULocationID" TEXT,
   "DOLocationID" TEXT,
   "rateCodeID" INTEGER,
   "store_and_fwd_flag" TEXT,
   "payment_type" INTEGER,
   "fare_amount" REAL,
   "extra" REAL,
   "MTA_tax" REAL,
   "improvement_surcharge" REAL,
   "tip_amount" REAL,
   "tolls_amount" REAL,
   "total_amount" REAL,
   "congestion_surcharge" REAL,
  "airport_fee" REAL,
   PRIMARY KEY ("id")
)

If you are not using the data described above for this tutorial, that’s ok, just make sure to create the corresponding table before proceeding.

Getting started

Once everything is installed and the table is successfully created, you should import the required resources as seen below.

import pandas as pd
import pyarrow.parquet as pq
from uuid import uuid4
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine, DateTime, Float, MetaData, Table
from sqlalchemy.orm import scoped_session, sessionmaker

The first step is to read the Parquet file which will be saved in the ny_taxi_parquet object.

parquet_path='yellow_tripdata_2022-01.parquet'
ny_taxi_parquet = pq.ParquetFile(parquet_path)

Now, make sure to set up the SQLAlchemy engine and session as seen below. If you are not using localhost, remember to replace the URI string with your own.

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None
meta = MetaData()

def init_sqlalchemy(dbname='crate://<USER>:<PASSWORD>@<CRATEDB_HOST>/?ssl=true'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)

Processing the Parquet file

Before processing the newly imported file, the corresponding structure must be replicated as an object so the SQLAlchemy library can work its magic. With that goal, the class NY_taxi was implemented as seen below. Important to notice that the variable __tablename__ corresponds to the name of the table created previously.

init_sqlalchemy()

def gen_key():
    return str(uuid4())

class Taxi_trip(Base):
    __tablename__='ny_taxi'
    id = Column(String, primary_key=True, default=gen_key)
    vendorID = Column(String) 
    tpep_pickup_datetime = Column(DateTime)
    tpep_dropoff_datetime = Column(DateTime)
    passenger_count = Column(Integer)
    trip_distance = Column(Float)
    PULocationID = Column(String)
    DOLocationID = Column(String)
    rateCodeID = Column(Integer)
    store_and_fwd_flag = Column(String)
    payment_type = Column(Integer)
    fare_amount = Column(Float)
    extra = Column(Float)
    MTA_tax = Column(Float)
    improvement_surcharge = Column(Float)
    tip_amount = Column(Float)
    tolls_amount = Column(Float)
    total_amount = Column(Float)
    congestion_surcharge = Column(Float)
    airport_fee = Column(Float)

For further details on how to use the SQLAlchemy with CrateDB, you can refer to the documentation here.

Next, for each record represented in the Parquet file, an object should be instantiated and saved to the session to be committed afterward. However, depending on the size of the file/number of records, the best approach is to process it in batches. In the following step, we are going to read each batch of the Parquet file, transform it into a Pandas data frame and process each row, instantiating an object and adding it to the session before committing.

def create_obj(x,new_trips):
    new_trips.append({
    "vendorID": x.VendorID,
    "tpep_pickup_datetime": x.tpep_pickup_datetime,
    "tpep_dropoff_datetime": x.tpep_dropoff_datetime,
    "passenger_count": x.passenger_count,
    "trip_distance":x.trip_distance,
    "PULocationID": x.PULocationID,
    "DOLocationID": x.DOLocationID,
    "store_and_fwd_flag": x.store_and_fwd_flag,
    "rateCodeID": x.RatecodeID,
    "payment_type":x.payment_type,
    "fare_amount": x.fare_amount,
    "extra": x.extra,
    "MTA_tax": x.mta_tax,
    "improvement_surcharge": x.improvement_surcharge,
    "tip_amount":x.tip_amount,
    "tolls_amount": x.tolls_amount,
    "total_amount": x.total_amount,
    "congestion_surcharge":x.congestion_surcharge,
    "airport_fee":x.airport_fee})
    
for i in ny_taxi_parquet.iter_batches(batch_size=50000):
    new_trips = []
    df = i.to_pandas()
    df.apply(create_obj,args=[new_trips], axis=1)
    engine.execute(
        Taxi_trip.__table__.insert(),
        new_trips
    )

While the batches are processed, you should already see the new records in your CrateDB instance.

Summary

This tutorial provided one of many strategies to import Parquet files into CrateDB using Python, PyArrow, SQLAlchemy and Pandas. To further understand how to fully explore SQLAlchemy in CrateDB, have a look at the documentation CrateDB documentation - SQLAlchemy or keep on exploring the tutorial topics presented on CrateDB Community.

6 Likes