camel-kafka-connector: camel-aws-s3-sink-kafka-connector is showing class error and not pushing data to s3
Hi @oscerd ,
I am trying to archive my Strimzi Kafka datas to AWS S3 bucket. But I am unable to do this.
Kubernetes Version: 1.21
KafkaConnect:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: strimzi-kafka
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
replicas: 1
image: docker-hub/camel-aws-s3-kafka-connector:1.0.0
bootstrapServers: kafka-bootstrap.myhost.com:443
externalConfiguration:
volumes:
- name: aws-credentials
secret:
secretName: aws-credentials
tls:
trustedCertificates:
- secretName: strimzi-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
certificate: user.crt
key: user.key
secretName: kafka-user
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
KafkaConnector:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: s3-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: "strimzi-kafka"
spec:
class: org.apache.camel.kafkaconnector.CamelSinkConnector
#class: org.apache.camel.kafkaconnector.awss3sink.CamelAwss3sinkSinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
topics: kafka-topic
camel.sink.url: aws-s3://test-kafka-connect?keyName=${date:now:yyyyMMdd}/${exchangeId}
camel.sink.maxPollDuration: 10000
camel.component.aws-s3.configuration.autocloseBody: false
camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
camel.component.aws-s3.region: ca-central-1
Available Plugins:
aws-java-sdk-core-1.11.1034.jar
aws-java-sdk-s3-1.11.1034.jar
camel-api-3.15.0.jar
camel-aws-s3-kafka-connector-0.8.0.jar
camel-aws-s3-sink-kafka-connector-1.0.0.jar
camel-aws-s3-source-kafka-connector-1.0.0.jar
camel-aws2-s3-3.15.0.jar
camel-base-3.15.0.jar
camel-cloud-3.15.0.jar
camel-core-3.15.0.jar
camel-core-catalog-3.15.0.jar
camel-core-engine-3.15.0.jar
camel-core-languages-3.15.0.jar
camel-direct-3.15.0.jar
camel-file-3.15.0.jar
camel-http-3.15.0.jar
camel-http-common-3.15.0.jar
camel-jaxp-3.0.1.jar
camel-kafka-3.15.0.jar
camel-kafka-connector-1.0.0.jar
camel-kamelets-0.7.1.jar
camel-log-3.15.0.jar
camel-main-3.15.0.jar
camel-management-3.15.0.jar
camel-management-api-3.15.0.jar
camel-rest-3.15.0.jar
camel-seda-3.15.0.jar
camel-sjms-3.15.0.jar
camel-support-3.15.0.jar
camel-timer-3.15.0.jar
camel-tooling-model-3.15.0.jar
camel-util-3.15.0.jar
camel-util-json-3.15.0.jar
commons-codec-1.15.jar
commons-logging-1.2.jar
httpcore-4.4.15.jar
jackson-annotations-2.13.2.jar
jackson-core-2.13.2.jar
jackson-databind-2.13.2.jar
jackson-dataformat-cbor-2.13.2.jar
jaxb-api-2.4.0-b180830.0359.jar
jaxb-core-3.0.2.jar
jaxb-impl-3.0.2.jar
log4j-api-2.17.2.jar
log4j-core-2.17.2.jar
log4j-jcl-2.17.2.jar
log4j-slf4j-impl-2.17.2.jar
Kafka Connect Pod Logs:
2022-03-14 09:43:09,089 ERROR Uncaught exception in REST call to /connectors/s3-sink-connector/config (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) [qtp552416003-20]
org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches org.apache.camel.kafkaconnector.CamelSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='1', encodedVersion=1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='3.1.0', encodedVersion=3.1.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='3.1.0', encodedVersion=3.1.0, type=source, typeName='source', location='classpath'}
at org.apache.kafka.connect.runtime.isolation.Plugins.connectorClass(Plugins.java:200)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:172)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$getConnector$4(AbstractHerder.java:653)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:653)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:426)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$2(AbstractHerder.java:362)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
I noticed that there is no errors on the pod after I created the KafkaConnect. But when I create the KafkaConnect, I am getting the above errors on the KafkaConnect pod. Can you please tell me which plugin is causing the issues and how can I resolve this issue? Also can you tell me if any unnecessary plugins added here to remove from the docker image?
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 41 (24 by maintainers)
As I said in the Tar.gz there are sink and source together. In 1.0.x the connectors are in two different Tar.
In this JAR you have both the sink and source classes: camel-aws2-s3-kafka-connector-0.11.5.jar
The parameter names are different so you should change them: https://camel.apache.org/camel-kafka-connector/1.0.x/reference/connectors/camel-aws-s3-source-kafka-source-connector.html