strimzi-kafka-operator: [Bug] io.confluent.connect.avro.AvroConverter does not work as a key/value converter in KafkaConnectors

Describe the bug This has got to be either a known bug, or I’m doing something stupid. But I’m trying to use io.confluent.connect.avro.AvroConverter for key and value (de)serialization. I’ve tried to use it for a few different Kafka Connect connectors, but to simulate it without much complexity, I used the s3 connector, and I can get it to mess up every time.

I’ve tried downloading this and putting it in my plugins directory, but it still doesn’t seem to work: https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter

I started with a fairly straightforward configuration that worked. I copied the “kafka-connect-s3” directory from the confluent platform 5.5 (is there possibly a compatibility issue here??) directory. Also copied “kafka-connect-storage-common” (there’s a dependency there…)

Everything seems to generally work pretty well until I try to use the AvroConverter. Looking in “kafka-connect-storage-common” there this jar: kafka-connect-avro-converter-5.5.0.jar which should be all I need… But all I get is this:

    Tasks:
      Id:     0
      State:  FAILED
      Trace:  java.lang.NoClassDefFoundError: io/confluent/connect/avro/AvroConverterConfig
              at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:64)
              at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:266)
              at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:417)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)

In messing around with this configuration with other connectors, I’ve been abled to get: java.lang.NoClassDefFoundError for AbstractConfig as well sometimes… and then add those confluent common jars, then it goes back to not NoClassDefFoundError for AvroConverterConfig.

Something must be going on that I’m not seeing here.

Thanks!

To Reproduce I use the following KafkaConnector configuration:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: kafka-connector-s3-avro
  labels:
    strimzi.io/cluster: kafkaconnect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 1
  config:
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    s3.compression.type: gzip
    partitioner.class: io.confluent.connect.storage.partitioner.HourlyPartitioner
    topics: avrokafkamessagestopic
    s3.region: us-east-2
    s3.bucket.name: avrokafkamessages
    flush.size: 1
    storage.class: io.confluent.connect.s3.storage.S3Storage
    locale: en-US
    timezone: UTC
    schemas.enable: false
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry-release-cp-schema-registry:8081
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://schema-registry-release-cp-schema-registry:8081

I know that what I’m doing isn’t supported out of the box, so I’ve followed the many tutorials on how to create your own kafkaconnect image here’s my docker file:

FROM strimzi/kafka-connect:0.11.4-kafka-2.1.0
USER root:root
COPY ./connect-plugins/ /opt/kafka/plugins/
USER 1001

I add the following directories to my ./connect-plugins directory: kafka-connect-s3 kafka-connect-storage-common

from confluent 5.5 platform: share/java

And my KafkaConnect configuration:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: kafkaconnect-cluster
  annotations:
#  # use-connector-resources configures this KafkaConnect
#  # to use KafkaConnector resources to avoid
#  # needing to call the Connect REST API directly
    strimzi.io/use-connector-resources: "true"
spec:
  version: 2.4.0
  replicas: 1
  bootstrapServers: kafka-cluster-kafka-external-bootstrap:9094
  image: ecr-repo/kafkaconnectors:tagname
  config:
    group.id: kafkaconnect-cluster
    offset.storage.topic: kafkaconnect-cluster-offsets
    offset.storage.replication.factor: 1
    config.storage.topic: kafkaconnect-cluster-configs
    config.storage.replication.factor: 1
    status.storage.topic: kafkaconnect-cluster-status
    status.storage.replication.factor: 1
  externalConfiguration:
    env:
      - name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsAccessKey  
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsSecretAccessKey
  metrics:
    lowercaseOutputName: true
    lowercaseOutputLabelNames: true
    rules:
    - pattern : "kafka.connect<type=connect-worker-metrics>([^:]+):"
      name: "kafka_connect_connect_worker_metrics_$1"
    - pattern : "kafka.connect<type=connect-metrics, client-id=([^:]+)><>([^:]+)"
      name: "kafka_connect_connect_metrics_$1_$2"

Expected behavior Would like to be able to use the AvroConverter. I can’t be the only one, it must be something I’m doing wrong.

Environment (please complete the following information):

  • Strimzi version: 0.17.0
  • Installation method: Strimzi operator
  • Kubernetes cluster: Kubernetes 1.16
  • Infrastructure: Amazon EKS

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 17 (4 by maintainers)

Most upvoted comments

@timkalanai thanks mate - thats greatly appreciated, with your help I was able to get it working - thanks again

here’s the Dockerfile i use

FROM confluentinc/cp-kafka-connect:5.5.0 as cp
FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
COPY --from=cp /usr/share/java/kafka-connect-storage-common /opt/kafka/plugins/kafka-connect-storage-common
COPY --from=cp /usr/share/java/confluent-common /opt/kafka/plugins/confluent-common
COPY --from=cp /usr/share/java/kafka-connect-s3 /opt/kafka/plugins/kafka-connect-s3
COPY --from=cp /usr/share/java/kafka-connect-jdbc /opt/kafka/plugins/kafka-connect-jdbc
RUN cd /opt/kafka/plugins && for plugin in kafka-connect-s3 kafka-connect-jdbc; do cd $plugin; ln -s ../confluent-common; ln -s ../kafka-connect-storage-common; cd ..; done

This would be the Dockerfile for MongoDB connector:

FROM confluentinc/cp-kafka-connect:6.2.0 as cp
RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.6.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.0
FROM quay.io/strimzi/kafka:latest-kafka-3.0.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/kafka-connect-mongodb
COPY --from=cp /usr/share/confluent-hub-components/mongodb-kafka-connect-mongodb/lib /opt/kafka/plugins/mongodb/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/mongodb/
USER 1001

I think I figured it out, and it’s weird…

I think my problem was with the way that kafka connect scans for “Connectors” versus “Converters”. There’s a lot of classloading magic in that Plugins file mentioned above. I’m probably not going to do an explanation justice because I don’t quite get it myself.

But as it scans through the folder structure (you can see it loading connectors and convertors in the logs as kafka connect starts up), it looks for Converters and Connectors in parallel. That being said, I think because they try to isolate connectors within each directory in the plugins folder, the dependencies for each have to be within each directory (which is what we thought).

Confluent has laid out their folder structure a little differently. There’s a commons folder with common libs (I needed common-config.jar and common-config.jar, but there are others in there too). There’s also a “kafka-storage-common” that has AvroConverter.

Confluent puts a symlink in the connector directories to make sure that they have access to the right converters. Think when connect is traversing each folder, it does a deep traversal, and a symlink looks like a directory.

I had a number of issues. But I finally solved it by keeping the symlink, and putting the confluent-common jars into the Kafka-storage-common folder. That way, when AvroConverter is detected in the kafka-storage-common folder, it loads in the same class loader as the common jars in the same folder.

And any connector I need “AvroConverter” for, I add a symlink to the “kafka-storage-common” directory.

I know it’s convoluted. Maybe I’m just too tired and not seeing straight, but everything seems to be working now. Hope this helps some person in the future.

Note: connectors using cp-kafka-connect version 6.0 or above are managed via confluent-hub. Example usage is

FROM confluentinc/cp-kafka-connect:6.2.0 as cp
RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:1.5.5 \
  && confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.0
FROM quay.io/strimzi/kafka:0.25.0-kafka-2.8.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/snowflake && mkdir -p /opt/kafka/plugins/avro/
COPY --from=cp /usr/share/confluent-hub-components/snowflakeinc-snowflake-kafka-connector/lib /opt/kafka/plugins/snowflake/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/
USER 1001

The key isn’t really in the dockerfile which is this:

FROM strimzi/kafka-connect:0.11.4-kafka-2.1.0
USER root:root
COPY ./connect-plugins/ /opt/kafka/plugins/
USER 1001

It’s in the way that you structure the directories in your plugin folders (unfortunately) - in my case ./connect-plugins/

Look at 2 things below

  • The symbolic link in kafka-connect-s3 which links to …/kafka-connect-storage-common (which is a directory in the confluent platform) this allows the s3 plugin to access common jars that are needed for it to run
  • In kafka-connect-storage-common, I had to add all of the confluent “common” jars, which are common-config, common-metrics, common-utils (from confluent-common in the java folder in the confluent platform download)
total 40
drwxr-xr-x    9 someuser  removed    288 May 17 20:16 .
drwxr-xr-x   16 someuser  removed    512 May 17 22:18 ..
drwxr-xr-x@  26 someuser  removed    832 May 17 19:34 kafka-connect-s3
drwxr-xr-x@ 159 someuser  removed   5088 May 17 20:17 kafka-connect-storage-common

./kafka-connect-s3:
total 18296
drwxr-xr-x@ 26 someuser  removed      832 May 17 19:34 .
drwxr-xr-x   9 someuser  removed      288 May 17 20:16 ..
-rw-r--r--@  1 someuser  removed    20437 Apr  2 13:50 audience-annotations-0.5.0.jar
-rw-r--r--@  1 someuser  removed   999670 Apr  2 13:50 aws-java-sdk-core-1.11.725.jar
-rw-r--r--@  1 someuser  removed   592109 Apr  2 13:50 aws-java-sdk-kms-1.11.725.jar
-rw-r--r--@  1 someuser  removed  1047529 Apr  2 13:50 aws-java-sdk-s3-1.11.725.jar
-rw-r--r--@  1 someuser  removed   148009 Apr  2 13:50 aws-java-sdk-sts-1.11.725.jar
-rw-r--r--@  1 someuser  removed    20904 Apr  2 13:50 common-utils-5.1.4.jar
-rw-r--r--@  1 someuser  removed   335042 Apr  2 13:50 commons-codec-1.11.jar
-rw-r--r--@  1 someuser  removed    61829 Apr  2 13:50 commons-logging-1.2.jar
-rw-r--r--@  1 someuser  removed   774384 Apr  2 13:50 httpclient-4.5.9.jar
-rw-r--r--@  1 someuser  removed   326724 Apr  2 13:50 httpcore-4.4.4.jar
-rw-r--r--@  1 someuser  removed   565410 Apr  2 13:50 ion-java-1.0.2.jar
-rw-r--r--@  1 someuser  removed    66894 Apr  2 13:50 jackson-annotations-2.9.10.jar
-rw-r--r--@  1 someuser  removed   325636 Apr  2 13:50 jackson-core-2.9.10.jar
-rw-r--r--@  1 someuser  removed  1348786 Apr  2 13:50 jackson-databind-2.9.10.3.jar
-rw-r--r--@  1 someuser  removed    50986 Apr  2 13:50 jackson-dataformat-cbor-2.9.10.jar
-rw-r--r--@  1 someuser  removed   213911 Apr  2 13:50 jline-2.12.1.jar
-rw-r--r--@  1 someuser  removed    27586 Apr  2 13:50 jmespath-java-1.11.725.jar
-rw-r--r--@  1 someuser  removed    73631 Apr  2 13:50 kafka-connect-s3-5.1.4.jar
-rw-r--r--@  1 someuser  removed  1292696 Apr  2 13:50 netty-3.10.6.Final.jar
-rw-r--r--@  1 someuser  removed    41203 Apr  2 13:50 slf4j-api-1.7.25.jar
lrwxr-xr-x   1 someuser  removed       31 Apr  2 13:50 storage-common -> ../kafka-connect-storage-common
-rw-r--r--@  1 someuser  removed    74798 Apr  2 13:50 zkclient-0.10.jar
-rw-r--r--@  1 someuser  removed   906708 Apr  2 13:50 zookeeper-3.4.13.jar

./kafka-connect-storage-common:
total 233536
drwxr-xr-x@ 159 someuser  removed      5088 May 17 20:17 .
drwxr-xr-x    9 someuser  removed       288 May 17 20:16 ..
-rw-r--r--@   1 someuser  removed     23698 Apr  2 13:09 common-config-5.1.4.jar
-rw-r--r--@   1 someuser  removed     41450 Apr  2 13:09 common-metrics-5.1.4.jar
-rw-r--r--@   1 someuser  removed     20904 Apr  2 13:09 common-utils-5.1.4.jar
-rw-r--r--@   1 someuser  removed    188671 Apr  2 13:40 commons-beanutils-1.7.0.jar
.......
-rw-r--r--@   1 someuser  removed     34997 Apr  2 13:40 kafka-avro-serializer-5.1.4.jar
-rw-r--r--@   1 someuser  removed   1899410 Apr  2 13:40 kafka-clients-2.1.1-cp6.jar
-rw-r--r--@   1 someuser  removed     46863 Apr  2 13:40 kafka-connect-avro-converter-5.1.4.jar
-rw-r--r--@   1 someuser  removed      7627 Apr  2 13:40 kafka-connect-storage-common-5.1.4.jar
-rw-r--r--@   1 someuser  removed     26196 Apr  2 13:40 kafka-connect-storage-core-5.1.4.jar
-rw-r--r--@   1 someuser  removed      4371 Apr  2 13:40 kafka-connect-storage-format-5.1.4.jar
-rw-r--r--@   1 someuser  removed     32971 Apr  2 13:40 kafka-connect-storage-hive-5.1.4.jar
-rw-r--r--@   1 someuser  removed     26376 Apr  2 13:40 kafka-connect-storage-partitioner-5.1.4.jar
-rw-r--r--@   1 someuser  removed      2955 Apr  2 13:40 kafka-connect-storage-wal-5.1.4.jar
-rw-r--r--@   1 someuser  removed     54234 Apr  2 13:40 kafka-schema-registry-client-5.1.4.jar
-rw-r--r--@   1 someuser  removed    442805 Apr  2 13:40 kryo-2.22.jar
-rw-r--r--@   1 someuser  removed   1045744 Apr  2 13:40 leveldbjni-all-1.8.jar
.....

It’s not a great solution, and I tried to explain why it works above. But it’s what’s worked for me so far.