Feeding flat json from a kafka topic to CrateDB using telegraf

Hello folks:

Trying to ingest some data from a kafka topic to CrateDB using telegraf v1.22. All the pods are running in the same AKS. As a matter of fact, I was able do the same stuff for a topic that carries metric data in influx format.

Following is a sample message in the kafka topic:
{"SOURCE":"s_net_kafka","PROGRAM":"","PRIORITY":"debug","MESSAGE":"","LEGACY_MSGHDR":"","HOST_FROM":"aks-agentpool-38106399-vmss000000","HOSTNAME":"syslog-server-57dfc74f44-hgpcv","HOST":"aks-agentpool-38106399-vmss000000","FACILITY":"syslog","DATE":"Aug 25 14:35:37"}

I have the telegraf pod running in the same namespace as the kafka with:

  • input plugin: kafka
  • output plugin: Cratedb

I see that tlegraf created a table for the data but no messages are being ingested. I could not locate any useful logs in any of the Crate statefulset pods.

This is a lab system, and the output plugin config on telegraf is as follows:

###############################################################################
    #                            OUTPUT PLUGINS                                   #
    ###############################################################################

    # Configuration for CrateDB to send metrics to.
    [[outputs.cratedb]]
       # A github.com/jackc/pgx/v4 connection string.
       # See https://pkg.go.dev/github.com/jackc/pgx/v4#ParseConfig
       url = "postgres://crate@crate-external-service.crate/doc?sslmode=disable"
       # Timeout for all CrateDB queries.
       timeout = "5s"
       # Name of the table to store metrics in.
       table = "syslog"
       # If true, and the metrics table does not exist, create it automatically.
       table_create = true
       # The character(s) to replace any '.' in an object key with
       key_separator = "_"

Input plugin is as follows:

    # Read metrics from Kafka topics
    [[inputs.kafka_consumer]]
       ## Kafka brokers.
       brokers = ["kafka-service:9092"]

       ## Topics to consume.
       topics = ["syslog"]

       ## When set this tag will be added to all metrics with the topic as the value.
       # topic_tag = ""

       ## Optional Client id
       # client_id = "Telegraf"

       ## Set the minimal supported Kafka version.  Setting this enables the use of new
       ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
       ##   ex: version = "1.1.0"
       # version = ""

       ## Optional TLS Config
       # tls_ca = "/etc/telegraf/ca.pem"
       # tls_cert = "/etc/telegraf/cert.pem"
       # tls_key = "/etc/telegraf/key.pem"
       ## Use TLS but skip chain & host verification
       # insecure_skip_verify = false

       ## SASL authentication credentials.  These settings should typically be used
       ## with TLS encryption enabled
       # sasl_username = "kafka"
       # sasl_password = "secret"

       ## Optional SASL:
       ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
       ## (defaults to PLAIN)
       # sasl_mechanism = ""

       ## used if sasl_mechanism is GSSAPI (experimental)
       # sasl_gssapi_service_name = ""
       # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
       # sasl_gssapi_auth_type = "KRB5_USER_AUTH"
       # sasl_gssapi_kerberos_config_path = "/"
       # sasl_gssapi_realm = "realm"
       # sasl_gssapi_key_tab_path = ""
       # sasl_gssapi_disable_pafxfast = false

       ## used if sasl_mechanism is OAUTHBEARER (experimental)
       # sasl_access_token = ""

       ## SASL protocol version.  When connecting to Azure EventHub set to 0.
       # sasl_version = 1

       # Disable Kafka metadata full fetch
       # metadata_full = false

       ## Name of the consumer group.
       # consumer_group = "telegraf_metrics_consumers"

       ## Compression codec represents the various compression codecs recognized by
       ## Kafka in messages.
       ##  0 : None
       ##  1 : Gzip
       ##  2 : Snappy
       ##  3 : LZ4
       ##  4 : ZSTD
       # compression_codec = 0

       ## Initial offset position; one of "oldest" or "newest".
       offset = "newest"

       ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
       # balance_strategy = "range"

       ## Maximum length of a message to consume, in bytes (default 0/unlimited);
       ## larger messages are dropped
       max_message_len = 1000000

       ## Maximum messages to read from the broker that have not been written by an
       ## output.  For best throughput set based on the number of metrics within
       ## each message and the size of the output's metric_batch_size.
       ##
       ## For example, if each message from the queue contains 10 metrics and the
       ## output metric_batch_size is 1000, setting this to 100 will ensure that a
       ## full batch is collected and the write is triggered immediately without
       ## waiting until the next flush_interval.
       # max_undelivered_messages = 1000

       ## Maximum amount of time the consumer should take to process messages. If
       ## the debug log prints messages from sarama about 'abandoning subscription
       ## to [topic] because consuming was taking too long', increase this value to
       ## longer than the time taken by the output plugin(s).
       ##
       ## Note that the effective timeout could be between 'max_processing_time' and
       ## '2 * max_processing_time'.
       # max_processing_time = "100ms"

       ## Data format to consume.
       ## Each data format has its own unique set of configuration options, read
       ## more about them here:
       ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
       data_format = "json"

Is there a specific telegraf config that I should be following for this?
How does CrateDB know which message field is the timestamp?

Thanks

Do you see any failed inserts in the sys.jobs_log table?

SELECT *
FROM sys.jobs_log
WHERE error is not null

I haven’t tried Kafka with Telegraf before, but if the CREATE TABLE worked, I would assume, that this might rather be an issue with the Kafka consumer setup.


Seems you can define additional properties for the json input format like

json_name_key = ""
json_timestamp_units = "1ns"
json_time_key="timestamp"
json_time_format="unix_ms"

There you also can specify the time column
also see … https://github.com/influxdata/telegraf/tree/master/plugins/parsers/json

Internally Telegraf transforms the json input to the line protocol format which is then sent to the (crate) output.


A different approach might be to use Kafka connect directly with CrateDB.

Turns out, we had to configure some more fields on telegraf agent that is responsible for ingesting data to CrateDB as suggested.

We had to specify the fields with the following addition to telegraf.conf file:

       data_format = "json"
       json_string_fields = ["SOURCE","PROGRAM","PRIORITY","MESSAGE","LEGACY_MSGHDR","HOST_FROM","HOST","FACILITY"]
       json_time_key = "DATE"
       json_time_format = "2006-01-02T15:04:05Z07:00"
2 Likes