IllegalStateException with left join query

I am currently running a Crate instance on Kubernetes for development and testing and from time to time I am getting the following error:

crate.client.exceptions.ProgrammingError: IllegalStateException[stream has already been operated upon or closed]

with queries including left join clause. For example:

SELECT vn.var_id,
    vn.var_name,
    vn.var_description,
    vn.isnumeric,
    f.code,
    c.code,
    vn.gateway_config,
    vn.unit,
    vn.var_type
FROM bemp_demo.var_names vn
    LEFT JOIN bemp.categories c ON vn.category_id = c.cat_id
    LEFT JOIN bemp.var_filters f ON vn.var_filter_id = f.filter_id
WHERE vn.var_id = ?

We are using Python client, but running the same queries in the admin page console returns the same error.

If I restart the pod everything works again for some days until it happens again. Is not only that query, any query including a left join clause fails. This happened with 4.6 series and after updating to 4.7.0 happened again today (after 3 days). I can’t see anything in the Crate logs.

I also have a 3 node production cluster running 4.6.6 and the problem has not occurred until now. I don’t know how to reproduce the problem but I am afraid it could happen at any time in production. Any idea what is happenning? I am going to restart the instance now, so I can’t do any further tests until it happens again.

Could you try to replicate this with the Show error trace option on in the console and share the complete trace.

I have restarted the instance and now the query is working as expected, we will need to wait some days until the error happens again. I will post the results then.

Today it has started to fail again. Here is the trace from the Console:

java.lang.IllegalStateException: stream has already been operated upon or closed
	at java.base/java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:346)
	at java.base/java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:143)
	at io.crate.data.InMemoryBatchIterator.moveToStart(InMemoryBatchIterator.java:79)
	at io.crate.data.join.JoinBatchIterator.moveToStart(JoinBatchIterator.java:70)
	at io.crate.data.MappedForwardingBatchIterator.moveToStart(MappedForwardingBatchIterator.java:36)
	at io.crate.data.join.RightJoinNLBatchIterator.tryAdvanceRight(RightJoinNLBatchIterator.java:126)
	at io.crate.data.join.RightJoinNLBatchIterator.moveLeft(RightJoinNLBatchIterator.java:93)
	at io.crate.data.join.RightJoinNLBatchIterator.moveNext(RightJoinNLBatchIterator.java:80)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.data.LimitingBatchIterator.moveNext(LimitingBatchIterator.java:57)
	at io.crate.data.BatchIterators$1.moveNext(BatchIterators.java:119)
	at io.crate.data.AsyncFlatMapBatchIterator.moveNext(AsyncFlatMapBatchIterator.java:67)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.action.sql.RowConsumerToResultReceiver.consumeIt(RowConsumerToResultReceiver.java:69)
	at io.crate.action.sql.RowConsumerToResultReceiver.accept(RowConsumerToResultReceiver.java:52)
	at io.crate.execution.engine.InterceptingRowConsumer.lambda$tryForwardResult$1(InterceptingRowConsumer.java:92)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Again, after restarting the query has worked again. The exact query used is:

SELECT vn.var_id,
    vn.var_name,
    vn.var_description,
    vn.isnumeric,
    f.code,
    c.code,
    vn.gateway_config,
    vn.unit,
    vn.var_type
FROM bemp_demo.var_names vn
    LEFT JOIN bemp.categories c ON vn.category_id = c.cat_id
    LEFT JOIN bemp.var_filters f ON vn.var_filter_id = f.filter_id
WHERE vn.var_id = 73;

Could you provide the query plan using EXPLAIN

EXPLAIN SELECT vn.var_id,
    vn.var_name,
    vn.var_description,
    vn.isnumeric,
    f.code,
    c.code,
    vn.gateway_config,
    vn.unit,
    vn.var_type
FROM bemp_demo.var_names vn
    LEFT JOIN bemp.categories c ON vn.category_id = c.cat_id
    LEFT JOIN bemp.var_filters f ON vn.var_filter_id = f.filter_id
WHERE vn.var_id = 73;

and maybe also the table schema, some information around the data

The execution plan now (when the query is not failing) is the following:

Eval[var_id, var_name, var_description, isnumeric, code, code, gateway_config, unit, var_type]
  └ NestedLoopJoin[LEFT | (var_filter_id = filter_id)]
    ├ NestedLoopJoin[LEFT | (category_id = cat_id)]
    │  ├ Rename[var_id, var_name, var_description, isnumeric, gateway_config, unit, var_type, category_id, var_filter_id] AS vn
    │  │  └ Get[bemp_demo.var_names | var_id, var_name, var_description, isnumeric, gateway_config, unit, var_type, category_id, var_filter_id | DocKeys{73::bigint} | (var_id = 73::bigint)]
    │  └ Rename[code, cat_id] AS c
    │    └ Collect[bemp.categories | [code, cat_id] | true]
    └ Rename[code, filter_id] AS f
      └ Collect[bemp.var_filters | [code, filter_id] | true]

And here I paste the SHOW CREATE TABLE results:

CREATE TABLE IF NOT EXISTS "bemp_demo"."var_names" (
   "alarms" ARRAY(BIGINT),
   "var_id" BIGINT,
   "var_name" VARCHAR(255) NOT NULL,
   "var_description" TEXT,
   "var_creation_date" TIMESTAMP WITHOUT TIME ZONE,
   "isnumeric" BOOLEAN DEFAULT true,
   "var_filter_id" BIGINT,
   "category_id" BIGINT,
   "unit" TEXT,
   "var_type" TEXT,
   "device_id" BIGINT,
   "alarm_id" ARRAY(BIGINT),
   "filter_execution_time" TIMESTAMP WITHOUT TIME ZONE,
   "categories" ARRAY(TEXT),
   "var_unit" TEXT,
   "var_order" INTEGER,
   "alarm_execution_time" TIMESTAMP WITHOUT TIME ZONE,
   "gateway_config" OBJECT(DYNAMIC),
   PRIMARY KEY ("var_id")
)
CLUSTERED BY ("var_id") INTO 2 SHARDS
WITH (
   "allocation.max_retries" = 5,
   "blocks.metadata" = false,
   "blocks.read" = false,
   "blocks.read_only" = false,
   "blocks.read_only_allow_delete" = false,
   "blocks.write" = false,
   codec = 'default',
   column_policy = 'dynamic',
   "mapping.total_fields.limit" = 1000,
   max_ngram_diff = 1,
   max_shingle_diff = 3,
   number_of_replicas = '0-1',
   "routing.allocation.enable" = 'all',
   "routing.allocation.total_shards_per_node" = -1,
   "store.type" = 'fs',
   "translog.durability" = 'REQUEST',
   "translog.flush_threshold_size" = 536870912,
   "translog.sync_interval" = 5000,
   "unassigned.node_left.delayed_timeout" = 60000,
   "write.wait_for_active_shards" = '1'
)
CREATE TABLE IF NOT EXISTS "bemp"."categories" (
   "cat_id" BIGINT,
   "name" TEXT NOT NULL,
   "description" TEXT,
   "parent_cat" BIGINT,
   "client_id" BIGINT,
   "cat_order" INTEGER,
   "code" TEXT,
   "modification_date" TIMESTAMP WITHOUT TIME ZONE GENERATED ALWAYS AS _cast(current_timestamp, 'timestamp without time zone'),
   "creation_date" TIMESTAMP WITHOUT TIME ZONE DEFAULT _cast(current_timestamp, 'timestamp without time zone') NOT NULL,
   PRIMARY KEY ("cat_id")
)
CLUSTERED BY ("cat_id") INTO 4 SHARDS
WITH (
   "allocation.max_retries" = 5,
   "blocks.metadata" = false,
   "blocks.read" = false,
   "blocks.read_only" = false,
   "blocks.read_only_allow_delete" = false,
   "blocks.write" = false,
   codec = 'default',
   column_policy = 'dynamic',
   "mapping.total_fields.limit" = 1000,
   max_ngram_diff = 1,
   max_shingle_diff = 3,
   number_of_replicas = '0-1',
   "routing.allocation.enable" = 'all',
   "routing.allocation.total_shards_per_node" = -1,
   "store.type" = 'fs',
   "translog.durability" = 'REQUEST',
   "translog.flush_threshold_size" = 536870912,
   "translog.sync_interval" = 5000,
   "unassigned.node_left.delayed_timeout" = 60000,
   "write.wait_for_active_shards" = '1'
)
CREATE TABLE IF NOT EXISTS "bemp"."var_filters" (
   "filter_id" BIGINT,
   "client_id" BIGINT NOT NULL,
   "name" VARCHAR(255),
   "description" TEXT,
   "content" TEXT,
   "code" TEXT,
   "modification_date" TIMESTAMP WITHOUT TIME ZONE GENERATED ALWAYS AS _cast(current_timestamp, 'timestamp without time zone'),
   "creation_date" TIMESTAMP WITHOUT TIME ZONE DEFAULT _cast(current_timestamp, 'timestamp without time zone') NOT NULL,
   PRIMARY KEY ("filter_id")
)
CLUSTERED BY ("filter_id") INTO 4 SHARDS
WITH (
   "allocation.max_retries" = 5,
   "blocks.metadata" = false,
   "blocks.read" = false,
   "blocks.read_only" = false,
   "blocks.read_only_allow_delete" = false,
   "blocks.write" = false,
   codec = 'default',
   column_policy = 'dynamic',
   "mapping.total_fields.limit" = 1000,
   max_ngram_diff = 1,
   max_shingle_diff = 3,
   number_of_replicas = '0-1',
   "routing.allocation.enable" = 'all',
   "routing.allocation.total_shards_per_node" = -1,
   "store.type" = 'fs',
   "translog.durability" = 'REQUEST',
   "translog.flush_threshold_size" = 536870912,
   "translog.sync_interval" = 5000,
   "unassigned.node_left.delayed_timeout" = 60000,
   "write.wait_for_active_shards" = '1'
)

Currently the data in those table is not too much:

  • “bemp_demo”.“var_names” = 149 records
  • “bemp”.“categories” = 79
  • “bemp”.“var_filters” = 51

As said the query is not failing now, I can send the same information when it starts failing again.

It is failing again. The result of the explain query is the folowing:

Eval[var_id, var_name, var_description, isnumeric, code, code, gateway_config, unit, var_type]
  └ Fetch[var_id, var_name, var_description, isnumeric, gateway_config, unit, var_type, category_id, var_filter_id, code, cat_id, code, filter_id]
    └ Limit[100::bigint;0]
      └ NestedLoopJoin[LEFT | (var_filter_id = filter_id)]
        ├ NestedLoopJoin[LEFT | (category_id = cat_id)]
        │  ├ Rename[var_id, var_name, var_description, isnumeric, gateway_config, unit, var_type, category_id, var_filter_id] AS vn
        │  │  └ Get[bemp_demo.var_names | var_id, var_name, var_description, isnumeric, gateway_config, unit, var_type, category_id, var_filter_id | DocKeys{73::bigint} | (var_id = 73::bigint)]
        │  └ Rename[c._fetchid, cat_id] AS c
        │    └ Collect[bemp.categories | [_fetchid, cat_id] | true]
        └ Rename[f._fetchid, filter_id] AS f
          └ Collect[bemp.var_filters | [_fetchid, filter_id] | true]

I want to add the only queries failing are the ones with two left joins. All other queries seem to work.

@iames Do I understand you correctly that queries with a single left join work?

Can you try build a minimal failing query? (e.g. does its fail without the where? selecting just a single column? does it fails with single left join?)

Is it possible that you can provide us the table data so that we can try to reproduce this issue?

Today has failed again and I have been able to do more tests. Here are my findings:

1.- Simplifying the query to (selecting only one column):

SELECT vn.id FROM bemp_demo1.desc_real vn LEFT JOIN bemp.categories c ON vn.category_id = c.cat_id LEFT JOIN bemp.var_filters f ON vn.filter_id = f.filter_id WHERE vn.id = 10000 limit 100;

gives a slightly different error but still fails:

java.lang.IllegalStateException: stream has already been operated upon or closed
	at java.base/java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:346)
	at java.base/java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:143)
	at io.crate.data.InMemoryBatchIterator.moveToStart(InMemoryBatchIterator.java:79)
	at io.crate.data.join.JoinBatchIterator.moveToStart(JoinBatchIterator.java:70)
	at io.crate.data.MappedForwardingBatchIterator.moveToStart(MappedForwardingBatchIterator.java:36)
	at io.crate.data.join.RightJoinNLBatchIterator.tryAdvanceRight(RightJoinNLBatchIterator.java:126)
	at io.crate.data.join.RightJoinNLBatchIterator.moveLeft(RightJoinNLBatchIterator.java:93)
	at io.crate.data.join.RightJoinNLBatchIterator.moveNext(RightJoinNLBatchIterator.java:80)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.data.LimitingBatchIterator.moveNext(LimitingBatchIterator.java:57)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.action.sql.RowConsumerToResultReceiver.consumeIt(RowConsumerToResultReceiver.java:69)
	at io.crate.action.sql.RowConsumerToResultReceiver.accept(RowConsumerToResultReceiver.java:52)
	at io.crate.execution.engine.InterceptingRowConsumer.lambda$tryForwardResult$1(InterceptingRowConsumer.java:92)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

2.- Doing the query with a single join (the first one):

SELECT vn.id FROM bemp_demo1.desc_real vn LEFT JOIN bemp.categories c ON vn.category_id = c.cat_id WHERE vn.id = 10000 limit 100;

returns the data correctly (only one result as expected)

3.- Doing the query with a single join (the second one):

SELECT vn.id FROM bemp_demo1.desc_real vn LEFT JOIN bemp.var_filters f ON vn.filter_id = f.filter_id WHERE vn.id = 10000 limit 100;

returns the data correctly (only one result as expected)

4.- Doing the double join query selecting only one column from the second join table:

SELECT f.name FROM bemp_demo1.desc_real vn LEFT JOIN bemp.categories c ON vn.category_id = c.cat_id LEFT JOIN bemp.var_filters f ON vn.filter_id = f.filter_id WHERE vn.id = 1000 limit 100;

Also returns error:

java.lang.IllegalStateException: stream has already been operated upon or closed
	at java.base/java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:346)
	at java.base/java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:143)
	at io.crate.data.InMemoryBatchIterator.moveToStart(InMemoryBatchIterator.java:79)
	at io.crate.data.join.JoinBatchIterator.moveToStart(JoinBatchIterator.java:70)
	at io.crate.data.MappedForwardingBatchIterator.moveToStart(MappedForwardingBatchIterator.java:36)
	at io.crate.data.join.RightJoinNLBatchIterator.tryAdvanceRight(RightJoinNLBatchIterator.java:126)
	at io.crate.data.join.RightJoinNLBatchIterator.moveLeft(RightJoinNLBatchIterator.java:93)
	at io.crate.data.join.RightJoinNLBatchIterator.moveNext(RightJoinNLBatchIterator.java:80)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.data.LimitingBatchIterator.moveNext(LimitingBatchIterator.java:57)
	at io.crate.data.BatchIterators$1.moveNext(BatchIterators.java:119)
	at io.crate.data.AsyncFlatMapBatchIterator.moveNext(AsyncFlatMapBatchIterator.java:67)
	at io.crate.data.MappedForwardingBatchIterator.moveNext(MappedForwardingBatchIterator.java:41)
	at io.crate.action.sql.RowConsumerToResultReceiver.consumeIt(RowConsumerToResultReceiver.java:69)
	at io.crate.action.sql.RowConsumerToResultReceiver.accept(RowConsumerToResultReceiver.java:52)
	at io.crate.execution.engine.InterceptingRowConsumer.lambda$tryForwardResult$1(InterceptingRowConsumer.java:92)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

This is the test data (hope is enough):

INSERT INTO bemp_demo1.desc_real (id, name, description, filter_id, category_id, isnumeric, unit, type, device_id, alarm_id, filter_execution_time, alarm_execution_time, gateway_config) VALUES(10000, 'ac_energy', 'Energy consumption from AC unit', 96, 107, 1, 'wh', 'electricity', NULL, [19,18], NULL, NULL, NULL);
INSERT INTO bemp.categories (cat_id, "name", description, parent_cat, client_id, cat_order, code) VALUES(107, 'Meteogune', '', NULL, 15, 1, 'b8Ksh2');
INSERT INTO bemp.var_filters (filter_id, client_id, name, description, content, code) VALUES(96, 15, 'Energía eléctrica 15min con máx.', '', '[]', 'kUm2Yt');

Here I attach the definitions of these new tables:
crate_tables_sql.txt (3.8 KB)

After restarting the node everything works again as expected.