Connecting to CrateDB from Apache NiFi

This article describes how to connect from Apache NiFi to CrateDB and ingest data from NiFi into CrateDB.

Prerequisites

To follow this article, you will need:

  • A CrateDB cluster
  • An Apache NiFi installation that can connect to the CrateDB cluster

Connecting from NiFi to CrateDB

First, we will set up a connection pool to CrateDB:

  1. On the main NiFi web interface, click the gear icon of your process group (“NiFi Flow” by default).
  2. Switch to “Controller Services” and click the plus icon to add a new controller.
  3. Choose “DBCPConnectionPool” as type and click “Add”.
  4. Open the settings of the newly created connection pool and switch to “Properties”. The table below describes in more detail which parameters need to be changed.
Parameter Description Sample value
Database Connection URL The JDBC connection string pointing to CrateDB jdbc:postgresql://<CrateDB host>:5432/doc?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory
Database Driver Class Name The PostgreSQL JDBC driver class name org.postgresql.Driver
Database Driver Location(s) Download the latest PostgreSQL JDBC driver and place it on the file system of the NiFi host /opt/nifi/nifi-1.13.2/postgresql-42.2.23.jar
Database User The CrateDB user name
Password The password of your CrateDB user
  1. After applying the changed properties, click the flash icon to enable the service.

Now the connection pool is ready to be used in one of NiFi’s processors.

Examples for ingesting into CrateDB

From CSV files

One common use case is to design a process in NiFi that results in data being ingested into CrateDB. As an example, we will take a CSV file from the NYC Taxi Data repository, process it in NiFi, and then ingest it into Crate DB.

To achieve high throughput, NiFi uses by default prepared statements with configurable batch size. The optimal batch size depends on your concrete use case, 500 is typically a good starting point. Please also see the documentation on insert performance for additional information.

In CrateDB, we first create the corresponding target table:

CREATE TABLE "doc"."yellow_taxi_trips" (
   "vendor_id" TEXT,
   "pickup_datetime" TIMESTAMP WITH TIME ZONE,
   "dropoff_datetime" TIMESTAMP WITH TIME ZONE,
   "passenger_count" INTEGER,
   "trip_distance" REAL,
   "pickup_longitude" REAL,
   "pickup_latitude" REAL,
   "rate_code" INTEGER,
   "store_and_fwd_flag" TEXT,
   "dropoff_longitude" REAL,
   "dropoff_latitude" REAL,
   "payment_type" TEXT,
   "fare_amount" REAL,
   "surcharge" REAL,
   "mta_tax" REAL,
   "tip_amount" REAL,
   "tolls_amount" REAL,
   "total_amount" REAL
);

After configuring the processors as described below, click the start icon on the process group window. You should see rows appearing in CrateDB after a short amount of time. If you encounter any issues, please also check NiFi’s log files (log/nifi-bootstrap.log and log/nifi-app.log).

GetFile

The GetFile processor points to a local directory that contains the file yellow_tripdata_2013-08.csv.

PutDatabaseRecord

The PutDatabaseRecord has a couple of properties that need to be configured:

  • Record Reader: CSVReader. The CSVReader is configured to use “Use String Fields From Header” as a “Schema Access Strategy”.
  • Database Type: PostgreSQL
  • Statement Type: INSERT
  • Database Connection Pooling Service: The connection pool created previously
  • Schema Name: doc
  • Table Name: yellow_taxi_trips
  • Maximum Batch Size: 200

From another SQL-based database

Data can be also be read from a SQL database and then be inserted into CrateDB:

ExecuteSQLRecord

Reads rows from the source database.

  • Database Connection Pooling Service: A connection pool pointing to the source database
  • SQL select query: The SQL query to retrieve rows as needed
  • RecordWriter: JsonRecordSetWriter. JSON files are required by the following processors for conversion into SQL statements.

ConvertJSONToSQL

Converts the generated JSON files into SQL statements.

  • JDBC Connection Pool: A connection pool pointing to CrateDB
  • Statement Type: INSERT
  • Table Name: Name of the target table in CrateDB (without schema name)
  • Schema Name: The table’s schema name in CrateDB

PutSQL

Executes the previously generated SQL statements as prepared statements.

  • JDBC Connection Pool: A connection pool pointing to CrateDB
  • SQL Statement: No value set
  • Batch Size: 500 (the optimal value for your use case might vary)
2 Likes