kafka-docker: ERROR Error when sending message to topic XXX with key: null, value: X bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback

I run a local zookeeper-server and run the docker image as: docker run -e "KAFKA_ADVERTISED_PORT=9092" -e "KAFKA_ZOOKEEPER_CONNECT=localhost:2181" -p 9092:9092 --net=host -d wurstmeister/kafka Then I tried

# bin/kafka-console-producer.sh --broker-list localhost:2181 --topic test
# ERROR Error when sending message to topic test with key: null, value: 6 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

kafka logs:

[2016-06-08 09:53:07,783] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:os.version=4.4.0-22-generic (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,783] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,784] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1c93084c (org.apache.zookeeper.ZooKeeper)
[2016-06-08 09:53:07,793] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2016-06-08 09:53:07,795] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-06-08 09:53:07,834] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-06-08 09:53:07,851] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1552f558b520012, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-06-08 09:53:07,852] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-06-08 09:53:07,978] INFO Log directory '/kafka/kafka-logs-pao-H110M-TS' not found, creating it. (kafka.log.LogManager)
[2016-06-08 09:53:08,009] INFO Loading logs. (kafka.log.LogManager)
[2016-06-08 09:53:08,030] INFO Logs loading complete. (kafka.log.LogManager)
[2016-06-08 09:53:08,109] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2016-06-08 09:53:08,110] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2016-06-08 09:53:08,113] WARN No meta.properties file under dir /kafka/kafka-logs-pao-H110M-TS/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-06-08 09:53:08,205] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-06-08 09:53:08,207] INFO [Socket Server on Broker 1003], Started 1 acceptor threads (kafka.network.SocketServer)
[2016-06-08 09:53:08,251] INFO [ExpirationReaper-1003], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-08 09:53:08,252] INFO [ExpirationReaper-1003], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-08 09:53:08,277] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-06-08 09:53:08,293] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-06-08 09:53:08,294] INFO 1003 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-06-08 09:53:08,388] INFO New leader is 1003 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-06-08 09:53:08,389] INFO [ExpirationReaper-1003], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-08 09:53:08,390] INFO [ExpirationReaper-1003], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-06-08 09:53:08,398] INFO [GroupCoordinator 1003]: Starting up. (kafka.coordinator.GroupCoordinator)
[2016-06-08 09:53:08,399] INFO [GroupCoordinator 1003]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2016-06-08 09:53:08,400] INFO [Group Metadata Manager on Broker 1003]: Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-06-08 09:53:08,410] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-06-08 09:53:08,410] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-06-08 09:53:08,414] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-06-08 09:53:08,427] INFO Creating /brokers/ids/1003 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-06-08 09:53:08,450] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-06-08 09:53:08,451] INFO Registered broker 1003 at path /brokers/ids/1003 with addresses: PLAINTEXT -> EndPoint(pao-H110M-TS,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-06-08 09:53:08,451] WARN No meta.properties file under dir /kafka/kafka-logs-pao-H110M-TS/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-06-08 09:53:08,545] INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2016-06-08 09:53:08,545] INFO Kafka commitId : b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser)
[2016-06-08 09:53:08,545] INFO [Kafka Server 1003], started (kafka.server.KafkaServer)
[2016-06-08 10:03:08,395] INFO [Group Metadata Manager on Broker 1003]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)

Desribe topic “test”:

Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Reactions: 1
  • Comments: 52 (4 by maintainers)

Most upvoted comments

I was able to overcome the issue by modifying: listeners=PLAINTEXT://hostname:9092 property in the server.properties file to listeners=PLAINTEXT://0.0.0.0:9092

I don’t have any idea what it does but it works…

I’m having a similar problem here. I pretty much followed the tutorial steps for testing out the kafka running container but I get this error next time I restart the docker container and try to run producer:

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for <TOPIC_NAME>-3

When the docker container gets a new container id, kafka seems to create a new broker id so the kafka topic from the past is no longer accessible. If you create a brand new topic in the running container again, that topic is accessible as long as the container lives, but is no longer when the container is restarted.

Therefore, I think this is something to do with --no-recreate option in docker-compose in order to keep the same container name/id pair so that kafka keeps same broker_id in its kafka-log meta.properties.

@ssherwood

what’s the output of $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper $ZK and does the leader match the ID of the broker (based on your log that’s 1010) ? If this is not the case you will see this behaviour.

It might be worth starting with an clean environment (docker-compose rm)

I had the exact same problem as @n1207n mentioned. But @Shabirmeans fix did not help me.

I have kafka deployed in Kubernetes. One kafka broker and 3 zookepper nodes. I created a topic and was able to produce/consume messages. After a restart of the kafka container, i could not post to existing topics anymore but newly created topics worked fine. I found out, that the old topic was bound to broker 1001, but the new broker had the id 1006 (after some restarts).

What i did was to reset the broker.id back to 1001 and then i was able to produce messages on the old topic again. I set the brocker.id back by setting the environment variable KAFKA_BROKER_ID to 1001.

I’m not sure if this is the correct way to fix the issue, but it worked.

Edit: You’ll have to set the environment variable KAFKA_RESERVED_BROKER_MAX_ID to 1001 to be allowed to set the broker id to 1001.

+1

I still encounter this problem, but I think this is a kubernetes issue.

Hi,

your kafka-console-producer.sh command line does not look correct. the --broker-list argument should point to a broker, not zookeeper like in your example

please see: http://kafka.apache.org/0100/quickstart.html#quickstart_send

Ah, stupid me, when compaction it turned on key is required, so correct way to fill such channel from bash is:

echo "hello:world" | kafka-console-producer \
  --broker-list "localhost:9092" \
  --topic "topicB" \
  --property "parse.key=true" \
  --property "key.separator=:"

Just found @irisrain comment about that in this thread, thank you!

You may be setting log.cleanup.policy=compact . This must use key and value kafka-console-producer.sh nonsupport key=null … so use api test

@wurstmeister Hi,

I am facing similar issue

[2016-12-01 20:27:44,849] WARN Error while fetching metadata with correlation id 0 : {topic02=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

When i describe the topic:

Topic:topic02	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: topic02	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001

And the Docker logs shows

Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-12-01 21:55:13,988] INFO 1001 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-12-01 21:55:14,154] INFO [ExpirationReaper-1001], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-01 21:55:14,160] INFO [ExpirationReaper-1001], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-01 21:55:14,162] INFO [ExpirationReaper-1001], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-12-01 21:55:14,220] INFO [GroupCoordinator 1001]: Starting up. (kafka.coordinator.GroupCoordinator)
[2016-12-01 21:55:14,270] INFO [GroupCoordinator 1001]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2016-12-01 21:55:14,275] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 2 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-12-01 21:55:14,351] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-12-01 21:55:14,476] INFO Creating /brokers/ids/1001 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-12-01 21:55:14,482] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-12-01 21:55:14,507] INFO Registered broker 1001 at path /brokers/ids/1001 with addresses: PLAINTEXT -> EndPoint(52.53.216.107,32771,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-12-01 21:55:14,509] WARN No meta.properties file under dir /kafka/kafka-logs-ab0d02b4ca06/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-12-01 21:55:14,640] INFO New leader is 1001 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-12-01 21:55:14,735] INFO Kafka version : 0.10.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2016-12-01 21:55:14,736] INFO Kafka commitId : 3402a74efb23d1d4 (org.apache.kafka.common.utils.AppInfoParser)
[2016-12-01 21:55:14,738] INFO [Kafka Server 1001], started (kafka.server.KafkaServer)
bash-4.3# ./broker-list.sh 
:32771
bash-4.3# docker ps
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
ab0d02b4ca06        wurstmeister/kafka       "start-kafka.sh"         12 minutes ago      Up 12 minutes       0.0.0.0:32771->9092/tcp                              ec2user_kafka_1
e74c49b8003f        wurstmeister/zookeeper   "/bin/sh -c '/usr/sbi"   12 minutes ago      Up 12 minutes       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   ec2user_zookeeper_1

debug steps followed -

  • Created brand new topic after starting the docker.
  • Cleaned up the environment and even tried reinstalling everything from scratch on a different Ec2.
  • Weird thing is that , the same setup was running just fine, for past 2 weeks.

@BlackRider97 the more information you provide the more likely I will be able to help you. e.g. steps to reproduce, logs, environment, full error message, …

Not working for me. Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

bash-4.3# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181/kafka --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: analytics-logs   Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

docker-compose rm also not working

I have tried @MoJo2600 's solution and it works! Just set the KAFKA_BROKER_ID in docker-compose.yml.

+1

@wurstmeister Issue got fixed when I deleted data from zookeeper. I tried to upgrade from 0.9 to 0.10 via simply updating docker image.

@wurstmeister I’m getting the same error but I’m pretty much following the guide verbatim.

docker-compose up

Then in another tab

./start-kafka-shell.sh 192.168.99.104 192.168.99.104:2181
bash-4.3# $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic --broker-list=`broker-list.sh`
abc
[2016-06-10 14:08:56,280] ERROR Error when sending message to topic topic with key: null, value:   3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for topic-3

My consumer looks like this:

./start-kafka-shell.sh 192.168.99.104 192.168.99.104:2181
bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK

My zookeeper logs look like this during the exchange:

zookeeper_1  | 2016-06-10 14:10:15,537 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@617] -     Established session 0x1553aa1d9950006 with negotiated timeout 6000 for client /172.18.0.1:51044
zookeeper_1  | 2016-06-10 14:10:15,598 [myid:] - INFO  [ProcessThread(sid:0   cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException when processing   sessionid:0x1553aa1d9950006 type:create cxid:0x2 zxid:0x1c2 txntype:-1 reqpath:n/a Error   Path:/consumers Error:KeeperErrorCode = NodeExists for /consumers
zookeeper_1  | 2016-06-10 14:10:15,877 [myid:] - INFO  [ProcessThread(sid:0   cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException when processing   sessionid:0x1553aa1d9950006 type:create cxid:0x1c zxid:0x1c6 txntype:-1 reqpath:n/a Error   Path:/consumers/console-consumer-39205/owners/topic Error:KeeperErrorCode = NoNode for   /consumers/console-consumer-39205/owners/topic
zookeeper_1  | 2016-06-10 14:10:15,879 [myid:] - INFO  [ProcessThread(sid:0   cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException when processing   sessionid:0x1553aa1d9950006 type:create cxid:0x1d zxid:0x1c7 txntype:-1 reqpath:n/a Error   Path:/consumers/console-consumer-39205/owners Error:KeeperErrorCode = NoNode for   /consumers/console-consumer-39205/owners

And my docker-compose ps:

docker-compose ps
         Name                        Command               State                         Ports
---------------------------------------------------------------------------------------------------------------------
kafkadocker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32777->9092/tcp
kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

Found issue in my case - I’m using cleanup.policy=compact config in one channel and for some reason kafka-console-producer doesn’t work with such channels, am I doing something wrong? Normal channels works just fine

$ kafka-topics --create --zookeeper "zookeeper-0.zookeeper" --partitions 1 --replication-factor 1 --topic topicA
Created topic "topicA".
$ kafka-topics --describe --zookeeper "zookeeper-0.zookeeper" --topic topicA
Topic:topicA	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: topicA	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
$ echo hello | kafka-console-producer --broker-list "localhost:9092" --topic "topicA" 
$ kafka-topics --create --zookeeper "zookeeper-0.zookeeper" --partitions 1 --replication-factor 1 --topic topicB --config cleanup.policy=compact
Created topic "topicB".
$ kafka-topics --describe --zookeeper "zookeeper-0.zookeeper" --topic topicB
Topic:topicB	PartitionCount:1	ReplicationFactor:1	Configs:cleanup.policy=compact
	Topic: topicB	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
$ echo hello | kafka-console-producer --broker-list "localhost:9092" --topic "topicB" 
[2018-01-13 10:46:39,295] WARN [Producer clientId=console-producer] Got error produce response with correlation id 3 on topic-partition topicB-0, retrying (2 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender)
[2018-01-13 10:46:39,397] WARN [Producer clientId=console-producer] Got error produce response with correlation id 4 on topic-partition topicB-0, retrying (1 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender)
[2018-01-13 10:46:39,503] WARN [Producer clientId=console-producer] Got error produce response with correlation id 5 on topic-partition topicB-0, retrying (0 attempts left). Error: CORRUPT_MESSAGE (org.apache.kafka.clients.producer.internals.Sender)
[2018-01-13 10:46:39,611] ERROR Error when sending message to topic topicB with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.

working fine for me when removing compact policy

I finally make it work in kubernetes v1.6 with this image, but with minor modifications. My lesson learnt as below:

  1. advertised.listeners should provide 2 listeners, one with internal ip and port 9093 for controller to connect with broker in the same pod, one external service ip for client to connect
  2. check the configuration and make sure zookeeper and kafka run successfully, then redeploy them to have a clean environment, it might be the reason for timeout exception during message producing
-e KAFKA_ADVERTISED_HOST_NAME=kafka

works for me.

@steverhoades You are able to set the maximum allowed broker id by setting the KAFKA_RESERVED_BROKER_MAX_ID environment variable. I updated my answer above, maybe it will help someone else 😃

[cloudera@quickstart kafka_2.11-0.10.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test hi [2017-04-13 18:13:17,793] ERROR Error when sending message to topic test with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1504 ms has passed since batch creation plus linger time ^C[cloudera@quickstart kafka_2.11-0.10.2.0]$

It starts working when I removed zookeeper’s data dir /var/lib/zookeeper/version-2.

+1

+1

It looks like it doesn’t match:

➜ kafka-docker git:(master) ✗ ./start-kafka-shell.sh 192.168.99.104 192.168.99.104:2181 bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper $ZK Topic:topic PartitionCount:4 ReplicationFactor:2 Configs: Topic: topic Partition: 0 Leader: 1003 Replicas: 1004,1003 Isr: 1003 Topic: topic Partition: 1 Leader: 1003 Replicas: 1003,1004 Isr: 1003 Topic: topic Partition: 2 Leader: 1003 Replicas: 1004,1003 Isr: 1003 Topic: topic Partition: 3 Leader: 1003 Replicas: 1003,1004 Isr: 1003

I did the docker-compose rm and recreated and everything appears to be working now as expected. Thanks!