keda: Unable to connect to confluent kafka

I am trying to implement KEDA for Confluent Kafka. I have tested out different scenarios but the error i get is

2020-10-12T14:39:02.788Z	ERROR	controllers.ScaledObject	Failed to ensure HPA is correctly created for ScaledObject	{"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
	/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128
github.com/kedacore/keda/controllers.(*ScaledObjectReconciler).Reconcile
	/workspace/controllers/scaledobject_controller.go:146
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:235
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:90
2020-10-12T14:39:02.796Z	ERROR	controller	Reconciler error	{"reconcilerGroup": "keda.sh", "reconcilerKind": "ScaledObject", "controller": "scaledobject", "name": "kafka-scaledobject", "namespace": "keda", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
	/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:237
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:90

I deployed a scaled object as follows.

Scenario 1 without TriggerAuthentication

triggers:
- type: kafka
  metadata:
    bootstrapServers: kafka.svc:9092
    consumerGroup: my-group
    topic: test-topic
    lagThreshold: '5'

Scenario 2 with TriggerAuthentication

triggers:
- type: kafka
  metadata:
    bootstrapServers: kafka.svc:9092
    consumerGroup: my-group
    topic: test-topic
    lagThreshold: '5'   
 authenticationRef:
      name: keda-trigger-auth-kafka-credential

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-secrets
  namespace: keda
data:
  sasl: "plaintext"
  username: "My confluent kafka API key"
  password: "My confluent kafka API key secret"

apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-kafka-credential
  namespace: keda
spec:
  secretTargetRef:
  - parameter: sasl
    name: keda-kafka-secrets
    key: sasl
  - parameter: username
    name: keda-kafka-secrets
    key: username
  - parameter: password
    name: keda-kafka-secrets
    key: password

Expected Behavior

After deploying the scaled object, it should get the hpa.

Actual Behavior

ERROR controllers.ScaledObject Failed to ensure HPA is correctly created for ScaledObject {"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}

Steps to Reproduce the Problem

  1. Helm version is 3.3.x, Installed KEDA from https://github.com/kedacore/keda/releases/tag/v2.0.0-beta
  2. Deployed ScaledObject as mentioned above
  3. Checked the logs of KEDA operator and it errors out as mentioned above in the issue

Specifications

  • KEDA Version: 2.0.0-beta
  • Platform & Version: Azure Kubernetes Service
  • Kubernetes Version: 1.17.9
  • Scaler(s): Apache Kafka targeting Confluent Kafka

The API version on Scaledobject and TriggerAuthentication is keda.sh/v1alpha1

Scaledobjectsandtriggerauthentications.txt

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 25 (11 by maintainers)

Most upvoted comments

@zroubalik Thank you for the solution. So here’s what I changed in the scaledobject.

  1. I added the parameter “tls” and set the value as “enable”. This was not required with the older versions (1.4)
  2. The value for sasl parameter was set as “plaintext”

Except for the above changes and few template changes, rest all remains same.

Also, wanted to know what does “ACTIVE” and “FALLBACK” mean? This shows up once we try to list the Scaledobjects once we deploy them.

Thanks again @zroubalik

I’m not a Confluent Kafka expert, does it run inside your cluster or do they manage it? From the looks of it, the address (kafka.svc:9092) points to a local service (which is also the one from our docs), is that correct?