Fetching large result sets from CrateDB

As a distributed database system with support for the standard SQL query language, CrateDB is great to run aggregations server-side, working on huge datasets, and getting summarized results back; there are however cases where we may still want to retrieve lots of data from CrateDB, to train a machine learning model for instance.

CrateDB collects results in memory before sending them back to the clients, so trying to run a SELECT statement that returns a very large result set in one go can trigger circuit breakers or result in an OutOfMemoryError, and getting all results in a single operation can also be a challenge client-side, so we need a mechanism to fetch results in manageable batches as we are ready to process them.

One option, for cases where we are looking at a single table, and we know already that we need all the records that satisfy a condition, is to do a bulk export with the COPY TO command which accepts a WHERE clause. It happens, however, that in many cases we may want to run more complex queries, or simply storing the results in files may not fit well into our application.

The case for pagination

A common requirement is also what is called pagination, which is to present results to users with pages with a set number of results each, allowing them to move between these pages. In this case, it is common that many users will only look at the first few pages of results, so we want to implement that in the most efficient way possible.

Let’s imagine we have a table called “observations” with the following data:

ts device reading
2021-10-14T09:39:19 dev2 -1682
2022-02-02T00:33:47 dev1 827
2022-06-11T21:49:53 dev2 -713
2022-06-29T23:23:28 dev1 1059
2022-07-01T09:22:56 dev2 -689
2022-07-10T02:43:36 dev2 -570
2022-09-18T03:28:02 dev1 303
2022-10-14T20:34:10 dev1 1901

We will work with very small number of records here to visualize how different techniques work but imagine that we have thousands or even millions of records. In particular, I will show examples here of retrieving 2 rows at a time but depending on the use case you would probably be looking at retrieving 50, 1000, or even 5000 rows at a time.

Using LIMIT + OFFSET

SELECT date_format(ts),device,reading 
FROM doc.observations 
WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00'
ORDER by ts
LIMIT 2;

This returns:

+-----------------------------+--------+---------+
| date_format(ts)             | device | reading |
+-----------------------------+--------+---------+
| 2022-02-02T00:33:47.000000Z | dev1   |     827 |
| 2022-06-11T21:49:53.000000Z | dev2   |    -713 |
+-----------------------------+--------+---------+

We could then re-issue the query with LIMIT 2 OFFSET 2 and we would get:

+-----------------------------+--------+---------+
| date_format(ts)             | device | reading |
+-----------------------------+--------+---------+
| 2022-06-29T23:23:28.000000Z | dev1   |    1059 |
| 2022-07-01T09:22:56.000000Z | dev2   |    -689 |
+-----------------------------+--------+---------+

There are a number of issues to be aware of with this approach.

Each new query is considered a new request and looks at current data. Consider what happens if the observation for 11 June 2022 above were to be deleted after we run the first query, but before we run the second one with OFFSET 2. Skipping 2 rows we are now skipping the observation from 29 June 2022, and the users will never see it.

Another issue is that there is not always an efficient way for CrateDB to skip the rows so, for certain queries, as the OFFSET value goes up, we may find that execution times grow larger and larger as the engine is actually going through the rows that need to be skipped and just discarding them server-side.

Using LIMIT with a WHERE clause on a watermark field

Continuing from the example above, after we get the initial 2 rows, instead of using OFFSET 2 we could run a query like this:

SELECT date_format(ts),device,reading 
FROM doc.observations 
WHERE ts > '2022-06-11T21:49:53.000000Z' AND ts <='2022-10-01 00:00'
ORDER by ts
LIMIT 2;

That 11 June value is the last value we observed so far on the ts column that in this case, we know to be always increasing, this approach is very efficient, but it can only be used if there is a suitable field in the data which is not always the case.

Also compared to the LIMIT + OFFSET approach we discussed earlier, we cannot use this to let the users jump to a given page of results without first having obtained all the results for the previous pages, we cannot for instance jump directly to page 10 as we do not know what is the last reading of ts at page 9.

Some people call this approach above “cursor pagination”, but the most common concept behind “cursors” is something a bit different which we are going to discuss now.

Cursors

A cursor is like having a bookmark pointing to a specific record in the result set of a query, this is a generic approach that is implemented efficiently and does not require us to have a special anchor/watermark column.

In CrateDB we can use cursors at the protocol level or with SQL commands.

Cursors in CrateDB are INSENSITIVE, meaning that the client can take all the time it needs to retrieve the results, and the data will always reflect the status of the tables as it was at the time the cursor was declared, ignoring any records that were updated, deleted, or newly inserted.

Using cursors in Python

In Python one way to work with cursors is with asyncpg, taking advantage of CrateDB’s compatibility with the PostgreSQL wire protocol.
First, we need to install the library:

pip install asyncpg

Then we can use it like this:

import asyncio
import asyncpg

# If you are using jupyter-notebook 
# remove this function definition line and the indentation in the block of code that follows
async def main():
    # we will then establish a connection    
    conn = await asyncpg.connect(host='localhost', user='crate')

    # and we need a “transaction” context, 
    # there are no transactions as such in CrateDB, 
    # but this gives a scope where the cursor lives:  
    async with conn.transaction():

        # and now we can declare the cursor 
        # specifying how many rows we want asyncpg to fetch at a time from CrateDB, 
        # and we can iterate over the results: 
        query = "SELECT ts,device,reading FROM doc.observations WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00';"
        async for record in conn.cursor(query, prefetch=1000):
            print(record)

# Remove this line if you are using jupyter-notebook
asyncio.run(main())

Just to clarify, our Python code works with one record at a time, but behind the scenes asyncpg is requesting 1000 records at a time from CrateDB.

Using cursors in Java

In Java, we can use the PostgreSQL JDBC driver.
In a Maven project add this to your pom.xml:

<dependencies>
	<dependency>
		<groupId>org.postgresql</groupId>
		<artifactId>postgresql</artifactId>
		<version>42.5.0</version>
	</dependency>
</dependencies>

Then we can use it like this:

import java.sql.*;
/* ... */	
/* we first establish a connection to CrateDB */
try (Connection conn = DriverManager.getConnection("jdbc:postgresql://localhost/", "crate","")) {
	try (Statement st = conn.createStatement()) {
		/* We will then open the cursor 
		 * defining how many rows we want to retrieve at a time, 
		 * in this case 1,000: 
		 */
		st.setFetchSize(1000);
		String query = "SELECT ts,device,reading ";
		query += "FROM doc.observations ";
		query += "WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00';"; 
		try (ResultSet resultSet = st.executeQuery(query)) {
    		/* and while there are rows available, we will iterate over them: */
			while (resultSet.next()) {		            	
				System.out.println(resultSet.getDate("ts").toString());		            	
			}
		}
	}
}

This works like in the Python case above, in our Java code we see one row at a time, but the rows are retrieved from CrateDB 1,000 at a time and kept in memory on the client side.

Using cursors with SQL commands

An approach that works with all clients is to use SQL commands supported since CrateDB 5.1.0.

First, we need to issue this command:

BEGIN;

This is a SQL command that would normally start a transaction, there are no transactions as such in CrateDB, but this will create a scope on which cursors can be created.

DECLARE observations_cursor NO SCROLL CURSOR FOR 
	SELECT ts,device,reading 
	FROM doc.observations 
	WHERE ts BETWEEN '2022-01-01 00:00' AND '2022-10-01 00:00';

This associates a cursor name with a query and determines the point in time at which data is “frozen” from the point of view of the cursor.

FETCH 10 FROM observations_cursor;

This retrieves 10 rows from the query, and when issued again it will retrieve the next 10 rows and so on. We can retrieve a different number of records each time and we know we have reached the end of the result set when FETCH returns zero rows.

Once the cursor is not needed anymore it can be closed with either END;, CLOSE ALL;, CLOSE observations_cursor;, COMMIT;, COMMIT TRANSACTION;, or COMMIT WORK;.

Take a look at this short animation showing an example of how this works: Mathias FuĂźenegger on Twitter

We hope you find this useful, and we will be happy to hear about your experience in the Community.

4 Likes