Connecting to CrateDB from Apache NiFi

This article describes how to connect from Apache NiFi to CrateDB and ingest data from NiFi into CrateDB.

Prerequisites

To follow this article, you will need:

  • A CrateDB cluster
  • An Apache NiFi installation that can connect to the CrateDB cluster

Connecting from NiFi to CrateDB

First, we will set up a connection pool to CrateDB:

  1. On the main NiFi web interface, click the gear icon of your process group (“NiFi Flow” by default).
  2. Switch to “Controller Services” and click the plus icon to add a new controller.
  3. Choose “DBCPConnectionPool” as type and click “Add”.
  4. Open the settings of the newly created connection pool and switch to “Properties”. The table below describes in more detail which parameters need to be changed.
Parameter Description Sample value
Database Connection URL The JDBC connection string pointing to CrateDB jdbc:postgresql://<CrateDB host>:5432/doc?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory
Database Driver Class Name The PostgreSQL JDBC driver class name org.postgresql.Driver
Database Driver Location(s) Download the latest PostgreSQL JDBC driver and place it on the file system of the NiFi host /opt/nifi/nifi-1.13.2/postgresql-42.2.23.jar
Database User The CrateDB user name
Password The password of your CrateDB user
  1. After applying the changed properties, click the flash icon to enable the service.

Now the connection pool is ready to be used in one of NiFi’s processors.

Examples for ingesting into CrateDB

From CSV files

One common use case is to design a process in NiFi that results in data being ingested into CrateDB. As an example, we will take a CSV file from the NYC Taxi Data repository, process it in NiFi, and then ingest it into Crate DB.

To achieve high throughput, NiFi uses by default prepared statements with configurable batch size. The optimal batch size depends on your concrete use case, 500 is typically a good starting point. Please also see the documentation on insert performance for additional information.

In CrateDB, we first create the corresponding target table:

CREATE TABLE "doc"."yellow_taxi_trips" (
   "vendor_id" TEXT,
   "pickup_datetime" TIMESTAMP WITH TIME ZONE,
   "dropoff_datetime" TIMESTAMP WITH TIME ZONE,
   "passenger_count" INTEGER,
   "trip_distance" REAL,
   "pickup_longitude" REAL,
   "pickup_latitude" REAL,
   "rate_code" INTEGER,
   "store_and_fwd_flag" TEXT,
   "dropoff_longitude" REAL,
   "dropoff_latitude" REAL,
   "payment_type" TEXT,
   "fare_amount" REAL,
   "surcharge" REAL,
   "mta_tax" REAL,
   "tip_amount" REAL,
   "tolls_amount" REAL,
   "total_amount" REAL
);

After configuring the processors as described below, click the start icon on the process group window. You should see rows appearing in CrateDB after a short amount of time. If you encounter any issues, please also check NiFi’s log files (log/nifi-bootstrap.log and log/nifi-app.log).

GetFile

The GetFile processor points to a local directory that contains the file yellow_tripdata_2013-08.csv.

PutDatabaseRecord

The PutDatabaseRecord has a couple of properties that need to be configured:

  • Record Reader: CSVReader. The CSVReader is configured to use “Use String Fields From Header” as a “Schema Access Strategy”.
  • Database Type: PostgreSQL
  • Statement Type: INSERT
  • Database Connection Pooling Service: The connection pool created previously
  • Schema Name: doc
  • Table Name: yellow_taxi_trips
  • Maximum Batch Size: 200

From another SQL-based database

Data can be also be read from a SQL database and then be inserted into CrateDB:

ExecuteSQLRecord

Reads rows from the source database.

  • Database Connection Pooling Service: A connection pool pointing to the source database
  • SQL select query: The SQL query to retrieve rows as needed
  • RecordWriter: JsonRecordSetWriter. JSON files are required by the following processors for conversion into SQL statements.

ConvertJSONToSQL

Converts the generated JSON files into SQL statements.

  • JDBC Connection Pool: A connection pool pointing to CrateDB
  • Statement Type: INSERT
  • Table Name: Name of the target table in CrateDB (without schema name)
  • Schema Name: The table’s schema name in CrateDB

PutSQL

Executes the previously generated SQL statements as prepared statements.

  • JDBC Connection Pool: A connection pool pointing to CrateDB
  • SQL Statement: No value set
  • Batch Size: 500 (the optimal value for your use case might vary)
4 Likes

Thanks for this tutorial. I’m trying to put the content of a csv into a CrateDB.

I’m getting the following error: PutDatabaseRecord…Failed to process StandardFlowFileRecord… ] due to Cannot cast to boolean: “NEVER”: "

"

Any idea what is the cause of this error ? Thanks in advance.

Here you can see the flowfile that PutDataBaseRecord receives. It is a very simple csv with two columns and one row.

Hi @Martin_Plazzotta,

I was trying to reproduce this, but didn’t run into the same issue you reported (although I tried with a GetFile processor instead of GetHDFS, as I don’t have a Hadoop setup available right now).

Can you share a few more information on the setup? E.g., what columns does your target table in CrateDB have, what type of Record Reader is used in PutDatabaseRecord/how it is configured, any non-default settings in the PutDatabaseRecord configuration?

Hi, thank you so much for your response.
Here you can see the PutDatabaseRecord configuration:

Here is the CSVreader’s configuration:

Finally this is the DB Connection Pooling Service:

I did not create this DB Connection service, it was provided to me by the DB admin.
I have noticed that it has configured DataBase Driver with value “io.crate.client.jdbc.CrateDriver”, instead of “org.postgresql.Driver” (as the article indicates).

Is it possible that this could be causing the problem ? I told the DB admin about this and he told me that it should be ok, that Postgresql driver and Crate Driver can be used indistinctly. Maybe I should ask to give it a try to the Postgresql Driver anyway.

I also have tried this DB connection service when using PutSQL with a hardcoded Insertion comand.
It worked just fine.
The DB table is called “prueba” and has two columns: “codigo” (integer) and “descripcion” (text).
As you can see column names are the same in the table and in the flowfile content.

Thanks for any help with this issue.

1 Like

Hi @Martin_Plazzotta,

we indeed recommend using the original PostgreSQL JDBC driver whenever possible. The Crate JDBC driver is mainly a relict from the past, where PostgreSQL-compatibility of CrateDB was still lacking behind in a few aspects. Nowadays, the PostgreSQL driver is supposed to be used primarily. This is also why we only tested NiFi together with the PostgreSQL driver so far.

I also saw in your screenshot that you are using NiFi 1.7.0, dated from 2018. The earliest NiFi version we tested is 1.13.2. I don’t know if there was any fix particularly for your problem in between those versions, but potentially it could have been addressed by NiFi already? There definitely have been some relevant changes, i.e. there is now a “Database Type” property in the PutDatabaseRecord processor (set to PostgreSQL in my case), which isn’t showing up in the screenshot of your version.

Do you have the possibility to test on a recent NiFi version with the PostgreSQL driver?

1 Like

Hello @hammerhead , sorry for being late.

After I saw your response I change the driver to the PostgreSQL JDBC driver and it worked perfectly !!
The nifi flow works really well now and is pretty simple.

About the nifi version, I will suggest at my work place to update to 1.13

Thank you again for your help.

3 Likes