argo-events: Kafka event source not working

I’m trying to use kafka as an event source.

I have kafka deployed on my cluster.

I then deployed argo and argo-events and then tried out the webhook example available on the getting started doc page. I used the namespace argo isntead of “argo-events” but that shouldn’t be a problem since it worked for the webhook example

These are the files I used:

source-kafka.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: kafka-event-source
spec:
  type: "kafka"
  kafka:
    testingArgo:
      url: "kafka.testing-argo:9092"
      topic: "ArgoTester"
      partition: "1"
      connectionBackoff:
        # duration in nanoseconds. following value is 10 seconds
        duration: 10000000000
        # how many backoffs
        steps: 5
        # factor to increase on each step.
        # setting factor > 1 makes backoff exponential.
        factor: 2
        jitter: 0.2

sensor-kafka.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: kafka-sensor
  labels:
    # sensor controller with instanceId "argo-events" will process this sensor
    sensors.argoproj.io/sensor-controller-instanceid: argo-events
spec:
  template:
    spec:
      containers:
        - name: sensor
          image: argoproj/sensor:v0.12
          imagePullPolicy: Always
      serviceAccountName: argo-events-sa
  subscription:
    http:
      port: 9300
  dependencies:
    - name: test-dep
      gatewayName: kafka-gateway
      eventName: example-with-retry
  triggers:
    - template:
        name: kafka-workflow-trigger
        k8s:
          group: argoproj.io
          version: v1alpha1
          resource: workflows
          operation: create
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: kafka-workflow-
              spec:
                entrypoint: whalesay
                arguments:
                  parameters:
                  - name: message
                    # this is the value that should be overridden
                    value: hello world
                templates:
                - name: whalesay
                  inputs:
                    parameters:
                    - name: message
                  container:
                    image: docker/whalesay:latest
                    command: [cowsay]
                    args: ["{{inputs.parameters.message}}"]
          parameters:
            - src:
                dependencyName: test-dep
              dest: spec.arguments.parameters.0.value

gateway-kafka.yaml
apiVersion: argoproj.io/v1alpha1
kind: Gateway
metadata:
  name: kafka-gateway
  labels:
    # gateway controller with instanceId "argo-events" will process this gateway
    gateways.argoproj.io/gateway-controller-instanceid: argo-events
spec:
  replica: 1
  type: kafka
  eventSourceRef:
    name: kafka-event-source
  template:
    metadata:
      name: kafka-gateway
      labels:
        gateway-name: kafka-gateway
    spec:
      containers:
        - name: gateway-client
          image: argoproj/gateway-client:v0.12
          imagePullPolicy: Always
          command: ["/bin/gateway-client"]
        - name: kafka-events
          image: argoproj/kafka-gateway:v0.12
          imagePullPolicy: Always
          command: ["/bin/kafka-gateway"]
      serviceAccountName: argo-events-sa
  subscribers:
    http:
      - "http://kafka-sensor.argo.svc:9300/"
kafka-gateway pod logs
INFO[2020-02-13 16:45:30] detected a new event-source...                event-source=kafka-event-source
INFO[2020-02-13 16:45:30] state of the connection to gateway server     name=testingArgo state=READY value="{kafka.testing-argo:9092 1 ArgoTester 0xc000724c00}"
INFO[2020-02-13 16:45:30] deleted event sources                         event-source="[]"
INFO[2020-02-13 16:45:30] new event sources                             event-source="[991916993]"
INFO[2020-02-13 16:45:30] activating new event source...                event-source=testingArgo
INFO[2020-02-13 16:45:30] event source is valid                         event-source=testingArgo
INFO[2020-02-13 16:45:30] listening to events from gateway server...    event-source=testingArgo
INFO[2020-02-13 16:45:30] received a event source state update notification 
INFO[2020-02-13 16:45:30] initializing the node                         node-name=testingArgo
INFO[2020-02-13 16:45:30] node is running                               node-message="event source is running" node-name=testingArgo
INFO[2020-02-13 16:45:30] gateway state updated successfully            phase=Running

I send kafka messages by exec’ing into the kafka pods and use the command

kafka-console-producer.sh --broker-list localhost:9092 --topic ArgoTester

And I use a consumer the same way to receive the messages

kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic ArgoTester

No messages appear on the gateway though. Kafka has been working so I’m sure there is no problem there. Anyone has any idea? Any advice is welcome.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 15 (7 by maintainers)

Most upvoted comments

Thanks for pointing out. I’ll update the example. Closing the issue for now.