strimzi-kafka-operator: Takes too long for KafkaUser to be ready

Describe the bug When creating Kafka users say 1-5 users it works just fine, KafkaUsers are in Ready status in no time. But when I try to create bulk KafkaUser - say 50 users at a time, some users can take almost 10 mins to be “Ready”, so this means client cannot publish/consume messages using these KafkaUsers - wait can be very long as the number of KafkaUsers on a Kafka cluster increase.

To Reproduce Steps to reproduce the behavior:

  1. To repro - you need 1. Kafka cluster, 1. topic already created.

  2. Create 50 KafkaUsers on Strimzi Kafka cluster at the same time - via a script (I create it via K8 api)

  3. Right away with in a minute trying to use all these users to publish/read from using these users and you will get Topic Authorization Failed, Group Authorization Failed errors, because users are not yet “Ready”

  4. Create Custom Resource ‘Kafka User’ using the yaml below:

kind: KafkaUser
metadata:
  name: cog-reader
  labels:
    strimzi.io/cluster: pod-kafka-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: pod-notifications
          patternType: literal
        operation: Read
      - resource:
          type: topic
          name: pod-notifications
          patternType: literal
        operation: Describe
      - resource:
          type: group
          name: cog-group
          patternType: literal
        operation: Read
  1. Authorization simple, authentication SASL_SSL/SCRAM-SHA-512
  2. Apply 50 such users in parallel to Strimzi cluster.
  3. Check the logs of the user-operator and it is cluttered with TimeoutExceptions
2021-10-09 06:55:40 DEBUG KafkaAdminClient:815 - [AdminClient clientId=adminclient-1] Call(callName=describeAcls, deadlineMs=1633762539991, tries=1, nextAllowedTryMs=1633762540099) timed out at 1633762539999 after 1 attempt(s)
java.lang.Exception: TimeoutException: Timed out waiting to send the call. Call: describeAcls
	at org.apache.kafka.clients.admin.KafkaAdminClient$Call.failWithTimeout(KafkaAdminClient.java:816) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:789) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:912) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutCallsToSend(KafkaAdminClient.java:993) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1301) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
2021-10-09 06:55:40 DEBUG KafkaAdminClient:815 - [AdminClient clientId=adminclient-1] Call(callName=alterUserScramCredentials, deadlineMs=1633762539992, tries=1, nextAllowedTryMs=1633762540099) timed out at 1633762539999 after 1 attempt(s)
java.lang.Exception: TimeoutException: Timed out waiting to send the call. Call: alterUserScramCredentials
	at org.apache.kafka.clients.admin.KafkaAdminClient$Call.failWithTimeout(KafkaAdminClient.java:816) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:789) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:912) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutCallsToSend(KafkaAdminClient.java:993) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1301) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
rg.apache.kafka.common.errors.TimeoutException: Call(callName=describeClientQuotas, deadlineMs=1633758992264, tries=1, nextAllowedTryMs=1633758992365) timed out at 1633758992265 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: describeClientQuotas
2021-10-09 05:56:32 DEBUG StatusDiff:46 - Status differs: {"op":"add","path":"/conditions/0/reason","value":"TimeoutException"}
2021-10-09 05:56:32 DEBUG StatusDiff:48 - Desired Status path /conditions/0/reason has value "TimeoutException"
org.apache.kafka.common.errors.TimeoutException: Call(callName=describeClientQuotas, deadlineMs=1633758992264, tries=1, nextAllowedTryMs=1633758992365) timed out at 1633758992265 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: describeClientQuotas
java.lang.Exception: TimeoutException: Timed out waiting to send the call. Call: describeClientQuotas
java.lang.Exception: TimeoutException: Timed out waiting to send the call. Call: describeClientQuotas

java.lang.Exception: TimeoutException: Timed out waiting to send the call. Call: describeClientQuotas
        at org.apache.kafka.clients.admin.KafkaAdminClient$Call.failWithTimeout(KafkaAdminClient.java:816) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
        at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:789) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
        at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:912) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutCallsToSend(KafkaAdminClient.java:993) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1301) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264) [org.apache.kafka.kafka-clients-2.8.0.jar:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

Expected behavior Creating Kafka user with ACL’s should be a straightforward call and it should have deterministic output when 5 users are created or 50 users are created

Environment (please complete the following information):

  • Strimzi version: [e.g. main, 0.25.0]

  • Installation method:

    • kubectl get ns kafka || kubectl create ns kafka
    • helm repo add strimzi https://strimzi.io/charts/
    • helm upgrade --install strimzi-kafka strimzi/strimzi-kafka-operator --version=“0.25.0” -n kafka
  • Kubernetes cluster: EKS 1.18

  • Infrastructure: Amazon EKS

YAML files and logs Kafka -cluster yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: pod-kafka-cluster
  namespace: kafka
spec:
  kafka:
    version: 2.8.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: external
        authentication:
          type: scram-sha-512
        port: 9094
        tls: true
        type: loadbalancer
    authorization:
      type: simple
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.8"
    storage:
      type: persistent-claim
      size: 50Gi
      deleteClaim: true
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 50Gi
      deleteClaim: true
  entityOperator:
    userOperator: 
      reconciliationIntervalSeconds: 157680000
      logging:
        type: inline
        loggers:
          rootLogger.level: DEBUG      
    topicOperator: 
      reconciliationIntervalSeconds: 157680000
      logging:
        type: inline
        loggers:
          rootLogger.level: DEBUG      

Topic yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: pod-notifications
  labels:
    strimzi.io/cluster: pod-kafka-cluster
spec:
  partitions: 1
  replicas: 3

Publisher user (create multiple - this is just an example)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: pod-agent-writer
  labels:
    strimzi.io/cluster: pod-kafka-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: pod-notifications
          patternType: literal
        operation: Write
      - resource:
          type: topic
          name: pod-notifications
          patternType: literal
        operation: Describe

Consumer - create multiple this is jus an example

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: cog-reader
  labels:
    strimzi.io/cluster: pod-kafka-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: pod-notifications
          patternType: literal
        operation: Read
      - resource:
          type: topic
          name: pod-notifications
          patternType: literal
        operation: Describe
      - resource:
          type: group
          name: cog-group
          patternType: literal
        operation: Read

Additional context Use case: I have need to create Kafka. users/topics on demand, so I have a micro service that invokes K8’s api that in-turn invoke Strimzi API’s to create Kafka resources. topic operator works fine and topics are in ready status. KafkaUsers can take too long to be ready - and this causes some components that rely on this micro service to fail.

Earlier periodic reconciliation was causing too much issues, so I turned periodic reconciliation off. Now as the number of users have increased all those timeout errors are back. I don’t know. if Kafka. Admin client is already busy and ignoring all the calls?

Any help here is appreciated. Any pointers,

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 44 (20 by maintainers)

Commits related to this issue

Most upvoted comments

@chary1112004 You should probably start a new discussion for it (https://github.com/strimzi/strimzi-kafka-operator/discussions) where you share your full configurations and logs.

This should be fixed in Strimzi 0.33. The expected release date is currently around second half of December or early January.

I don’t think we have any timeline. But it is on our TODO list.

@GarimaBathla I did further testing with 500 KafkaUsers. I did find out that each KafkaUser does require 5 AdminClient calls each reconciliation. That means high number of KUs is basically DDoSing the kafka brokers in the matter of AC requests. I did try to increase number of kafka brokers (3 -> 7) which improved the situation a bit, but there are still KUs which are not ready after long time. I think we should try to batch the requests to save some broker’s load (@tombentley 's idea). What is your case for creating such a high number of KUs?

It is described in the docs => you just add the resources block with the appropriate settings to the YAML. This is my resource I use for development which should give you an idea:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  labels:
    app: my-cluster
spec:
  kafka:
    replicas: 3
    resources:
      requests:
        memory: 2Gi
        cpu: 500m
      limits:
        memory: 2Gi
        cpu: "1"
    jvmOptions:
      -Xms: 1024m
      -Xmx: 1024m
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
        authentication:
          type: scram-sha-512
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
    authorization:
      type: simple
    config:
      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 100Gi
  zookeeper:
    replicas: 3
    resources:
      requests:
        memory: 1Gi
        cpu: "0.3"
      limits:
        memory: 1Gi
        cpu: "0.5"
    jvmOptions:
      -Xms: 512m
      -Xmx: 512m
    storage:
      type: persistent-claim
      size: 100Gi
  entityOperator:
    topicOperator:
      resources:
        requests:
          memory: 256Mi
          cpu: "0.2"
        limits:
          memory: 256Mi
          cpu: "0.5"
    userOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: "0.2"
        limits:
          memory: 512Mi
          cpu: "0.5"
  kafkaExporter:
    resources:
      requests:
        memory: 256Mi
        cpu: "0.1"
      limits:
        memory: 256Mi
        cpu: "0.5"

You will need to adjust these for your use-cases.