strimzi-kafka-operator: [Question] ...S3 CamelSinkConnector is not dumping data into S3 bucket

Using the following documents as reference: https://ibm-cloud-architecture.github.io/refarch-eda/scenarios/connect-s3/ and https://developers.redhat.com/blog/2020/02/14/using-secrets-in-apache-kafka-connect-configuration/

I can successfully created Strimzi kafka connect cluster and S3 sink, but the data is not dumped into S3. What Am I missing here??!!

Here is my image Docker file

FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

Here is my plugins folder image

Here is my cluster deployment yaml file

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  image: selumalai/selumalai-s3-kafkaconnect
  replicas: 1
  bootstrapServers: 34.214.12.113:9092
  externalConfiguration:
    volumes:
      - name: aws-credentials
        secret:
          secretName: aws-credentials
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false

Here is my KafkaConnector yaml file

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.CamelSinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: my-replicated-topic 
    camel.sink.url: aws-s3://selumalai-kafka-s3?keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
    camel.sink.maxPollDuration: 10000
    camel.component.aws-s3.configuration.autocloseBody: false
    camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.region: US_WEST_2

output of kubectl get kctr

ML-C02Z605SLVDQ:selumalai-k8s-s3-connect e192270$ kubectl get kctr --selector strimzi.io/cluster=my-connect-cluster -o name
kafkaconnector.kafka.strimzi.io/s3-sink-connector

Here is the last log of the pod

… 2020-05-28 02:42:34,925 WARN [Worker clientId=connect-1, groupId=connect-cluster] Catching up to assignment’s config offset. (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-05-28 02:42:34,925 INFO [Worker clientId=connect-1, groupId=connect-cluster] Current config state offset -1 is behind group assignment 2, reading to end of config log (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-05-28 02:42:35,635 INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished reading to end of log and updated config snapshot, new config log offset: 2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-05-28 02:42:35,635 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset 2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-05-28 02:42:35,635 INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-05-28 02:42:36,927 INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [KafkaBasedLog Work Thread - connect-cluster-configs] 2020-05-28 02:43:19,890 INFO JVM Runtime does not support Modules (org.eclipse.jetty.util.TypeUtil) [qtp818785904-49] …

Why data is not pushed to S3.?? I am unable to find pod logs showing the data is getting consumed. Any help will be appreciated!!

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 31 (13 by maintainers)

Most upvoted comments

Normally the SSL for connecting to Kafka is configured in Connect and not in the connector. The S3 connector should IMHO not need its own connection to Kafka, it just passes the messages to/from Connect framework.