Connecting to CrateDB from Go

This article describes how to connect to CrateDB from Go using the pgx PostgreSQL driver.

Prerequisites

To connect to CrateDB, we first install the main pgx package as well as its connection pooling package:

$ go get github.com/jackc/pgx/v4
$ go get github.com/jackc/pgx/v4/pgxpool

Connecting to CrateDB

Below you will find a snippet utilizing pgx to create a pool of 10 connections to CrateDB. A simple test query gets run against a system table to ensure data can be retrieved:

package main

import (
	"context"
	"fmt"
	"os"
	"github.com/jackc/pgx/v4/pgxpool"
)

func main() {
	ctx := context.Background()
	// a regular PostgreSQL-style connection URL with a pool of 10 connections
	config, err := pgxpool.ParseConfig("postgresql://crate@localhost:5432/doc?pool_max_conns=10")
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to parse database configuration: %v\n", err)
		os.Exit(1)
	}
	dbpool, err := pgxpool.ConnectConfig(ctx, config)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
		os.Exit(1)
	}
	defer dbpool.Close()

	var id string
	var hostname string
	err = dbpool.QueryRow(ctx, "SELECT id, hostname FROM sys.nodes LIMIT 1").Scan(&id, &hostname)
	if err != nil {
		fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
		os.Exit(1)
	}

	fmt.Println(id, hostname)
}

Ingesting into CrateDB

Several strategies are available to achieve high performance for ingesting data into CrateDB.

Multi-Value Insert

pgx supports prepared statements for bulk ingests, which the following example makes use of:

package main

import (
	"context"
	"fmt"
	"os"
	"github.com/jackc/pgx/v4"
	"github.com/jackc/pgx/v4/pgxpool"
)

func main() {
	ctx := context.Background()
	// a regular PostgreSQL-style connection URL with a pool of 10 connections
	config, err := pgxpool.ParseConfig("postgresql://crate@localhost:5432/doc?pool_max_conns=10")
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to parse database configuration: %v\n", err)
		os.Exit(1)
	}
	dbpool, err := pgxpool.ConnectConfig(ctx, config)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
		os.Exit(1)
	}
	defer dbpool.Close()

	// simple test table, created via CREATE TABLE doc.my_test_table(numeric_value INTEGER);
	stmt := `INSERT INTO doc.my_test_table (numeric_value) VALUES ($1);`

	// create a new batch
	batch := &pgx.Batch{}

	// fill the batch with 200 sample values
	for i := 0; i < 200; i++ {
		batch.Queue(stmt, i)
	}

	// send the batch
	br := dbpool.SendBatch(ctx, batch)

	// execute the batch
	_, err = br.Exec()
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to execute batch %v\n", err)
		os.Exit(1)
	}

	// close the batch
	err = br.Close()
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to close batch %v\n", err)
		os.Exit(1)
	}
}

UNNEST

Alternatively, we can also ingest by passing arrays and using UNNEST:

package main

import (
	"context"
	"fmt"
	"os"
	"github.com/jackc/pgx/v4/pgxpool"
)

func main() {
	ctx := context.Background()
	// a regular PostgreSQL-style connection URL with a pool of 10 connections
	config, err := pgxpool.ParseConfig("postgresql://crate@localhost:5432/doc?pool_max_conns=10")
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to parse database configuration: %v\n", err)
		os.Exit(1)
	}
	dbpool, err := pgxpool.ConnectConfig(ctx, config)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
		os.Exit(1)
	}
	defer dbpool.Close()

	// fill an array with 200 sample values
	var values [200]int
	for i := 0; i < 200; i++ {
		values[i] = i
	}

	// simple test table, created via CREATE TABLE doc.my_test_table(numeric_value INTEGER);
	stmt := `INSERT INTO doc.my_test_table (numeric_value) SELECT * FROM UNNEST($1::INTEGER[]);`

	// send the batch
	result, err := dbpool.Exec(ctx, stmt, values)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Unable to execute query %v\n", err)
		os.Exit(1)
	}

	fmt.Println(result)
}
3 Likes