kafka-backup: Failed to deserialize value for header 'kafka_replyPartition' on topic

Got another error on our QA cluster (kafka 2.4.0, kafka-backup built from commit f30b9ad9).

[2020-06-11 08:39:55,585] WARN Failed to deserialize value for header 'kafka_replyPartition' on topic 'cosmos-cs-reads', so using byte array (org.apache.kafka.connect.storage.SimpleHeaderConverter:68)
java.lang.StringIndexOutOfBoundsException: String index out of range: 0
        at java.base/java.lang.StringLatin1.charAt(Unknown Source)
        at java.base/java.lang.String.charAt(Unknown Source)
        at org.apache.kafka.connect.data.Values.parse(Values.java:822)
        at org.apache.kafka.connect.data.Values.parseString(Values.java:378)
        at org.apache.kafka.connect.storage.SimpleHeaderConverter.toConnectHeader(SimpleHeaderConverter.java:64)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:491)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
[2020-06-11 08:39:56,295] ERROR WorkerSinkTask{id=chrono_qa-backup-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
org.apache.kafka.connect.errors.DataException: cosmos-cs-reads error: Not a byte array! cosmos-cs-cmds
        at de.azapps.kafkabackup.common.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:19)
        at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:121)
        at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:75)
        at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
        at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
[2020-06-11 08:39:56,353] ERROR WorkerSinkTask{id=chrono_qa-backup-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.connect.errors.DataException: cosmos-cs-reads error: Not a byte array! cosmos-cs-cmds
        at de.azapps.kafkabackup.common.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:19)
        at de.azapps.kafkabackup.common.record.RecordSerde.write(RecordSerde.java:121)
        at de.azapps.kafkabackup.common.segment.SegmentWriter.append(SegmentWriter.java:75)
        at de.azapps.kafkabackup.common.partition.PartitionWriter.append(PartitionWriter.java:57)
        at de.azapps.kafkabackup.sink.BackupSinkTask.put(BackupSinkTask.java:68)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        ... 10 more
[2020-06-11 08:39:56,354] ERROR WorkerSinkTask{id=chrono_qa-backup-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-06-11 08:39:56,382] INFO Stopped BackupSinkTask (de.azapps.kafkabackup.sink.BackupSinkTask:139)

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 15 (15 by maintainers)

Commits related to this issue

Most upvoted comments

Hmm… I have tried to reproduce the bug in #97. The issue did not appear (on my local machine – GitHub Pipeline is running)…

Can you try to add your problematic header message here? https://github.com/itadventurer/kafka-backup/pull/97/files#diff-28c62e6ea255f4a9955c7be8c5d8a1cfR95 (obviously as hex-encoded data)

Hope we will be able to reproduce it here 😉

JFYI, I’m keeping this in “broken” state to test your possible fix 😄

Ok thank you… I will have to think about how to fix that. I see that this is quite critical 😱