camel-kafka-connector: camel-aws-s3-sink-kafka-connector is showing class error and not pushing data to s3

Hi @oscerd ,

I am trying to archive my Strimzi Kafka datas to AWS S3 bucket. But I am unable to do this. Kubernetes Version: 1.21

KafkaConnect:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: strimzi-kafka
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  image: docker-hub/camel-aws-s3-kafka-connector:1.0.0
  bootstrapServers: kafka-bootstrap.myhost.com:443
  externalConfiguration:
    volumes:
      - name: aws-credentials
        secret:
          secretName: aws-credentials
  tls:
    trustedCertificates:
      - secretName: strimzi-cluster-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      certificate: user.crt
      key: user.key
      secretName: kafka-user
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1

KafkaConnector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: "strimzi-kafka"
spec:
  class: org.apache.camel.kafkaconnector.CamelSinkConnector
  #class: org.apache.camel.kafkaconnector.awss3sink.CamelAwss3sinkSinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: kafka-topic
    camel.sink.url: aws-s3://test-kafka-connect?keyName=${date:now:yyyyMMdd}/${exchangeId}
    camel.sink.maxPollDuration: 10000
    camel.component.aws-s3.configuration.autocloseBody: false
    camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.region: ca-central-1

Available Plugins:

aws-java-sdk-core-1.11.1034.jar
aws-java-sdk-s3-1.11.1034.jar
camel-api-3.15.0.jar
camel-aws-s3-kafka-connector-0.8.0.jar
camel-aws-s3-sink-kafka-connector-1.0.0.jar
camel-aws-s3-source-kafka-connector-1.0.0.jar
camel-aws2-s3-3.15.0.jar
camel-base-3.15.0.jar
camel-cloud-3.15.0.jar
camel-core-3.15.0.jar
camel-core-catalog-3.15.0.jar
camel-core-engine-3.15.0.jar
camel-core-languages-3.15.0.jar
camel-direct-3.15.0.jar
camel-file-3.15.0.jar
camel-http-3.15.0.jar
camel-http-common-3.15.0.jar
camel-jaxp-3.0.1.jar
camel-kafka-3.15.0.jar
camel-kafka-connector-1.0.0.jar
camel-kamelets-0.7.1.jar
camel-log-3.15.0.jar
camel-main-3.15.0.jar
camel-management-3.15.0.jar
camel-management-api-3.15.0.jar
camel-rest-3.15.0.jar
camel-seda-3.15.0.jar
camel-sjms-3.15.0.jar
camel-support-3.15.0.jar
camel-timer-3.15.0.jar
camel-tooling-model-3.15.0.jar
camel-util-3.15.0.jar
camel-util-json-3.15.0.jar
commons-codec-1.15.jar
commons-logging-1.2.jar
httpcore-4.4.15.jar
jackson-annotations-2.13.2.jar
jackson-core-2.13.2.jar
jackson-databind-2.13.2.jar
jackson-dataformat-cbor-2.13.2.jar
jaxb-api-2.4.0-b180830.0359.jar
jaxb-core-3.0.2.jar
jaxb-impl-3.0.2.jar
log4j-api-2.17.2.jar
log4j-core-2.17.2.jar
log4j-jcl-2.17.2.jar
log4j-slf4j-impl-2.17.2.jar

Kafka Connect Pod Logs:

2022-03-14 09:43:09,089 ERROR Uncaught exception in REST call to /connectors/s3-sink-connector/config (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) [qtp552416003-20]
org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches org.apache.camel.kafkaconnector.CamelSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='3.1.0', encodedVersion=3.1.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}
  at org.apache.kafka.connect.runtime.isolation.Plugins.connectorClass(Plugins.java:200)
  at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:172)
  at org.apache.kafka.connect.runtime.AbstractHerder.lambda$getConnector$4(AbstractHerder.java:653)
  at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
  at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:653)
  at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:426)
  at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:362)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)

I noticed that there is no errors on the pod after I created the KafkaConnect. But when I create the KafkaConnect, I am getting the above errors on the KafkaConnect pod. Can you please tell me which plugin is causing the issues and how can I resolve this issue? Also can you tell me if any unnecessary plugins added here to remove from the docker image?

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 41 (24 by maintainers)

Most upvoted comments

As I said in the Tar.gz there are sink and source together. In 1.0.x the connectors are in two different Tar.

In this JAR you have both the sink and source classes: camel-aws2-s3-kafka-connector-0.11.5.jar