INSERT a column as JSON using Kafka JDBC Sink

We are having some issues using the JDBC Kafka Sink from Confluent to insert some records into CrateDB. We have this dataset with a few dimensions and a couple of those dimensions are formatted as JSON. So we created our table on CrateDB as follows:

   CREATE TABLE webservicetests.test(
    "ID" STRING,
    "TIMESTAMP" TIMESTAMP WITHOUT TIME ZONE,
    "IP" STRING,
    "ISPINFO" OBJECT,
    "EXTRA" OBJECT,
    "UA" STRING,
    "LANG" STRING,
    "DL" REAL,
    "UL" REAL,
    "PING" REAL,
    "JITTER" REAL,
    "LOG" STRING,
    "MONTH" TIMESTAMP WITHOUT TIME ZONE GENERATED ALWAYS AS date_trunc('month', "TIMESTAMP")
    )
    PARTITIONED BY ("MONTH") WITH (column_policy = 'dynamic');

ISPINFO and EXTRA columns are formatted in json. When the JDBC connector tries to push this, it pushes these two columns as json and we get an error from the CrateDB side saying unknown column format. Following is the error message from the connector side:

Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test" ("ID","TIMESTAMP","IP","ISPINFO","EXTRA","UA","LANG","DL","UL","PING","JITTER","LOG") VALUES (23,1619789224920,'23.23.23.23','{"processedString":"23.23.23.23 - Data Centers Inc., CA (1060 km)","rawIspInfo":{"ip":"23.23.23.23","city":"Montréal","region":"Quebec","country":"CA","loc":"23.568,-73.5878","org":"Data Centers Inc.","postal":"H2W","timezone":"America/Toronto","readme":"https://ipinfo.io/missingauth"}}'::json,'{"server":"test_machine"}'::json,'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.93 Safari/537.36','en-US,en;q=0.9',42.37,8.43,69.64,25.52,'') was aborted: ERROR: Cannot find data type: json  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: Cannot find data type: json
org.postgresql.util.PSQLException: ERROR: Cannot find data type: json

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more

Do we need to change the way we create the table? Maybe use a different column type?
We tried to execute the same INSERT command manually by substituting the ::json with ::object and it worked. Is there a workaround for this?

The OBJECT column type is perfectly fine for such use cases.

It is possible with CrateDB to directly ingest JSON Strings without any casting to OBJECT. Is there any way to turn off the casting in Confluent?

Dear @asavran,

thank you for writing in.

I created a ticket to bring this to our collective attention, see Support PostgreSQL's JSON data type(s) · Issue #11347 · crate/crate · GitHub. Feel free to vote on this on GitHub in order to signal strong interest on this topic.

With kind regards,
Andreas.

Hello folks, what is the final verdict on this issue? Is this part of the roadmap now for CrateDB? Nevertheless, is there some other alternative way that you can suggest.

Thanks

1 Like

Dear @asavran,

thank you very much for bumping this discussion.

I tried my best summarize the current state of the onion within Add "CrateDatabaseDialect" for resolving lack of JSON types used with "PostgreSqlDatabaseDialect" · Issue #3 · crate/kafka-connect-jdbc-crate-dialect · GitHub, where I also outlined an alternative way for getting hold of the problem by adding a CrateDatabaseDialect to Kafka Connect JDBC.

With kind regards,
Andreas.