Schema evolution via Confluent JDBC sink is failing due to wrong ALTER request

I have the Confluent JDBC Kafka Connector sinking data to CrateDB. It mostly works fine. I have the schema evolution enabled on the Kafka Connector. When I add a new field to my JSON, the schema registry is updated successfully and it is trying to update the table I have on CrateDB. My full table name is webservicetests.netflow. Following is the error I am getting in the JDBC connector output:

[2021-04-22 16:23:12,599] INFO Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] among column names [AS_SRC, AS_DST, WRITER_ID, IP_SRC, TOS, EVENT_TYPE, TIME STAMP_END, IP_DST, IFACE_OUT, BYTES, PORT_SRC, PACKETS, TCP_FLAGS, PORT_DST, IFACE_IN, IP_PROTO, TIMESTAMP_START, MONTH] (io.confluent.connect.jdbc.sink.DbStructure:273)
[2021-04-22 16:23:12,602] INFO Amending TABLE to add missing fields:[SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] maxRetries:10 with SQL: [ALTER TABLE "netflow" ADD "APPLICATI ON" TEXT NULL] (io.confluent.connect.jdbc.sink.DbStructure:199)
[2021-04-22 16:23:12,608] WARN Amend failed, re-attempting (io.confluent.connect.jdbc.sink.DbStructure:220)
org.postgresql.util.PSQLException: ERROR: line 1:46: extraneous input 'NULL' expecting {<EOF>, ';'}
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2510)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2245)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:311)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
        at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
        at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:246)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1189)
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:207)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:80)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2021-04-22 16:23:12,740] INFO Checking PostgreSql dialect for type of TABLE "netflow" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:840)
[2021-04-22 16:23:12,754] INFO Refreshing metadata for table "netflow" to Table{name='"netflow"', type=TABLE columns=[Column{'AS_SRC', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'AS_DST', isPrim aryKey=false, allowsNull=true, sqlType=int8}, Column{'WRITER_ID', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'IP_SRC', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TOS', isP rimaryKey=false, allowsNull=true, sqlType=int8}, Column{'EVENT_TYPE', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'TIMESTAMP_END', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}, Colu mn{'IP_DST', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'IFACE_OUT', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'BYTES', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Co lumn{'PORT_SRC', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'PACKETS', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'TCP_FLAGS', isPrimaryKey=false, allowsNull=true, sqlType=varcha r}, Column{'PORT_DST', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'IFACE_IN', isPrimaryKey=false, allowsNull=true, sqlType=int8}, Column{'IP_PROTO', isPrimaryKey=false, allowsNull=true, sqlType= varchar}, Column{'TIMESTAMP_START', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}, Column{'MONTH', isPrimaryKey=false, allowsNull=true, sqlType=timestamp}]} (io.confluent.connect.jdbc.util.TableDefini tions:86)
[2021-04-22 16:23:12,754] INFO Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] among column names [AS_SRC, AS_DST, WRITER_ID, IP_SRC, TOS, EVENT_TYPE, TIME STAMP_END, IP_DST, IFACE_OUT, BYTES, PORT_SRC, PACKETS, TCP_FLAGS, PORT_DST, IFACE_IN, IP_PROTO, TIMESTAMP_START, MONTH] (io.confluent.connect.jdbc.sink.DbStructure:273)
[2021-04-22 16:23:12,754] INFO Amending TABLE to add missing fields:[SinkRecordField{schema=Schema{STRING}, name='APPLICATION', isPrimaryKey=false}] maxRetries:9 with SQL: [ALTER TABLE "netflow" ADD "APPLICATIO N" TEXT NULL] (io.confluent.connect.jdbc.sink.DbStructure:199)
[2021-04-22 16:23:12,755] WARN Amend failed, re-attempting (io.confluent.connect.jdbc.sink.DbStructure:220)
org.postgresql.util.PSQLException: ERROR: line 1:46: extraneous input 'NULL' expecting {<EOF>, ';'}
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2510)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2245)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:311)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
        at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
        at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
        at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:246)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1189)
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:207)
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:223)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:80)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

Can someone shed some light on this please. How come I am not able to ALTER my table on CrateDB.

Thanks,

It looks like when I add the new column, the schema is updated and the default value for the new column (string type) is NULL. However, on CrateDB, NULL keyword is not listed as a valid option (ALTER TABLE — CrateDB: Reference); “NOT NULL” is.

Am I on the right track to find the root cause? If I am, is there a workaround for this?

1 Like

Hi @asavran

It seems like we currently don’t support adding values with a default value, after the initial schema generation.

In addition, adding a base column with Default clause is not supported.

However the default value of a column would be null anyway.


I am not quite certain how the confluent JDBC sink works, but CrateDB has a dynamic setting for the column policy which dynamically adds new columns on ingest/insert. i.e. if you insert a column, which is not part of the schema, CrateDB would add it dynamically.

What happens when you disable auto.evolve in Confluent and set the column policy to dynamic in CrateDB?

https://crate.io/docs/crate/reference/en/4.5/sql/statements/create-table.html#column-policy

I just wanted to share my experience with the auto.evolve. I disabled auto-evolve and added a new field in my messages. I see this in the JDBC sink from Confluent:

[2021-05-12 14:03:27,563] ERROR WorkerSinkTask{id=cratedb-connector-webperformance-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table "webperformance" is missing fields ([SinkRecordField{schema=Schema{STRING}, name='AGENT_CASETARGET', isPrimaryKey=false}]) and auto-evolution is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "webperformance" is missing fields ([SinkRecordField{schema=Schema{STRING}, name='AGENT_CASETARGET', isPrimaryKey=false}]) and auto-evolution is disabled
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:190)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:80)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
[2021-05-12 14:03:27,565] ERROR WorkerSinkTask{id=cratedb-connector-webperformance-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "webperformance" is missing fields ([SinkRecordField{schema=Schema{STRING}, name='AGENT_CASETARGET', isPrimaryKey=false}]) and auto-evolution is disabled
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:190)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:80)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more
[2021-05-12 14:03:27,565] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:161)

I think you need the auto-evolve after all.