Inserting billions of rows the hard way

tldr; inserts are slow. how to speed up?

I have roughly 33 billion records to import into a crate db cluster. Right now, my ETL/MapReduce process cuts the sheer data size down from 18TB to roughly 4TB. The parsed data is stored as parquet files (roughly 6,500 files totaling 10GB). I see crate doesn’t support parquet import, so I wrote some code in julia to pull the data from the parquet files and stuff it into crate via the postgres wire protocol. I have two mid-sized servers (64 CPUs, 512GB ram, 40TB NVMe) in the cluster right now. A third is on the way.

Even with threaded async connections, I can’t seem to make crate insert more than 287 rows/second. It doesn’t seem to make a difference if I load the entire dataset from the parquet file into memory and dump as a transaction vs just insert each row one by one.

At this rate, billions of rows are going to take weeks. Is there a faster way?

Any chance there will be support for parquet as a source format? Anything non-text based would be great for mid-sized datasets.

1 Like

Hi @sampope

I see crate doesn’t support parquet import, so I wrote some code in julia to pull the data from the parquet files and stuff it into crate via the postgres wire protocol.

That is correct. We definitly aldrady are looking for parquet files in various use cases with CrateDB

I have two mid-sized servers (64 CPUs, 512GB ram, 40TB NVMe) in the cluster right now. A third is on the way.

Generally I’d consider that large servers for CrateDB. It might be a good idea to use up to 4 containers to make us of Compressed Oops, but we would also be very much interested on seeing the performance with your current setup.

Even with threaded async connections, I can’t seem to make crate insert more than 287 rows/second.

  • How big are those rows/records?
  • Are you using batched INSERT statements?
  • Could you share the code you are using for inserting?

287 rows/second is even a very low number for single INSERT statements.
For reference a 3 node (8 vcpus each) cluster should reach about 75-80k records / s (7 indexed values)

best regards
Georg

Thank you for the info about performance and server sizing.

How accurate is the admin ui in reporting queries per second? That’s what I’m using to gauge performance. The parsing code is averaging 28,000 lines per second, per process.

The largest table is only 6 data points per row, and not too complex (timezonetz, text, int, ip, int, text). I’m inserting via localhost, meaning the the parquet parser is running on the same host with 64 processes. The server is near idle at a 1m load avg of 10.3.

I first tried using dataframes, to pull in the parquet data as a dataframe table and then dump the whole table to db in one giant transaction. It still had dismal performance and obviously consumes a ton of ram (15-30gb per process).

Now I’m just reading each row out of parquet and putting it into the db. the relevant functions are:

# connect to the database 
const dbclient = LibPQ.Connection("postgresql://crate@localhost/doc")

# the actual SQL command
function import_to_db(document)
  try
    LibPQ.execute(dbclient, document)
  catch
    sleep(1)
    retry
  finally
    return nothing
  end
end

and with slightly anonymized fields, this is the insert:

import_to_db("INSERT INTO table_name_here (timestamptz_field, text_field, int_field, ipaddr_field, int_field, text_field) VALUES ('$timestamptz', '$text_field', $int_field, '$ipaddr', $int_field, '$text_field')")

the import_to_db function needs more error checking and to confirm the row was inserted correctly, but for now, it’s working. This is just a test, obviously, not production.

Theoretically, with 64 processes parsing 6,000+ parquet files at roughly 28,000 lines/sec, that should be roughly 1.7m inserts/second on this hardware.

1 Like

It is pulling the data from the sys.jobs_log table which has a default size of 10.000 (stats.jobs_log_size) with with a query similiar to that:

SELECT CURRENT_TIMESTAMP                             AS last_timestamp,
       DATE_TRUNC('second', ended)                   AS ended_time,
       COUNT(*)                                      AS qps,
       AVG(ended - started)                          AS duration,
       UPPER(REGEXP_MATCHES(stmt, '^\s*(\w+).*')[1]) AS query_type
FROM sys.jobs_log
WHERE DATE_TRUNC('second', ended) BETWEEN (CURRENT_TIMESTAMP - 180000)::timestamp AND (CURRENT_TIMESTAMP - 20000)::timestamp
GROUP BY 1, 2, 5
ORDER BY ended_time ASC;

So it should be pretty accurate.

import_to_db("INSERT INTO table_name_here (timestamptz_field, text_field, int_field, ipaddr_field, int_field, text_field) VALUES ('$timestamptz', '$text_field', $int_field, '$ipaddr', $int_field, '$text_field')")```

so you are doing single inserts?

multiple value statements would probably significantly increase insert performance:

INSERT INTO my_table (id, name)
     VALUES (1, 'Arthur'),
            (2, 'Trillian'),
            (2, 'Marvin');

Update 4 days later. I converted all of the parquet/arrow files to csv. I setup 3 simultaneous imports via COPY FROM, one set of csvs per table. There’s roughly 40 billion rows to import. Both servers in the cluster have access to the shared filesystem, so i used COPY FROM (shared=true) on the theory that both servers can do the imports. After 24h, it’s completed 2.1b rows.

I read the documentation a few times, but not quite sure if there’s a better way.

It might be better performance wise to use INSERTS instead of COPY FROM
COPY FROM currently gets throttled for various reasons, but we ware working on improving the performance:

With initial inserts you might want to set the refresh_interval to 0

ALTER TABLE tab SET (refresh_interval = 0);

and set it back to an appropriate level afterwards

https://crate.io/docs/crate/reference/en/4.4/sql/statements/create-table.html#refresh-interval

24h for 2.1b rows seems quite low (i.e. only 24 000 records / s)

Thanks.

If COPY FROM STDIN existed, one could bulk import via DataFrame from memory straight to db.

I tried the bulk inserts with BEGIN/COMMIT wrapper but it wasn’t any faster. in fact, COPY FROM is the fastest method so far.

I wrote a quick script to benchmark insert performance. All methods seem to hit a limit on inserts. I’m writing a connection pool for LibPQ and might try that to see if i can get more performance using a threaded pool of db connections.

unfortunately my julia knowledge is very limited … or better said non-existent :frowning:

Could you post the output of

SELECT * FROM sys.jobs

while executing the import?

# bulk insert looks like you fire one insert statement :confused:

best practice definitly would be

  • simultaneous connections
  • batched Inserts (~100 - 1000 records)

I’ve been watching that, and it seems the copy from has stopped.

cr> SELECT * from sys.jobs;                       
+--------------------------------------+----------------------------------------------------------+---------------+------------------------+----------+
| id                                   | node                                                     |       started | stmt                   | username |
+--------------------------------------+----------------------------------------------------------+---------------+------------------------+----------+
| bd02fcaa-ff55-4cd7-816e-76faef54e068 | {"id": "id_here", "name": "servername_here"} | 1617041655388 | SELECT * from sys.jobs | crate    |
+--------------------------------------+----------------------------------------------------------+---------------+------------------------+----------+
SELECT 1 row in set (0.002 sec)

It’s a dataframe source, so the system dumps all values in each column in the dataframe and loops it through the statement. It will loop through 50m insert statements, one per column. For the other test, it inserts 10,000 columns per insert statement (one insert statement with 10,000 values).

Is it really 10.000 values? looks like single inserts?

I took a completely different approach, and now have new issues.

I ran this against 1,073,764,789 lines in 2,844 csv files:

ls -1 *hashed_data.csv | parallel -u ~/crash -c \"COPY test_hashed_data FROM \'file:///opt/data/working/{}\'\;\"

Basically, run a crate instance for each unique csv file. Here’s how it looks to the system when running each file:

python3 /home/ubuntu/crash -c COPY test_hashed_data FROM 'file:///opt/data/working/server_1_hashed_data.csv';

i think i have escaping wrong there, but 28.8m data points were inserted pretty quickly (in like 15 minutes) and then nothing since then. In digging through logs, it seems one of the 2 nodes in the cluster has crossed the 85% disk fill mark, so disabled replication for that node. I thought that the other would carry on, but it seems everything halts because replication takes priority.

In crash, what am I doing wrong here?

ls *rawhashes.csv.gz | wc -l
2844
COPY test_rawhashes FROM 'file:///opt/data/working/*rawhashes.csv.gz' WITH (shared = true, compression = 'gzip');                                                                                 
COPY OK, 0 rows affected  (61755.500 sec)

I was expecting roughly 9 billion rows to be inserted.

can you add a RETURN SUMMARY to the copy statement and share the output. Maybe try with a small sample file?

some tips:

  • header is required and must match the table columns
  • data type must match

Thanks. The import works if I uncompress the csv and COPY FROM. I’m re-trying the COPY FROM csv.gz with format = ‘csv’ just to force the format, like this:

COPY test_rawhashes FROM 'file:///opt/data/working/*rawhashes.csv.gz' WITH (format = 'csv', shared = true, compression = 'gzip') RETURN SUMMARY;

Sounds strange. maybe a problem with encoding?