druid: Kafka indexing tasks occasionally fail with 'Stream closed'
Hi all,
I seeing sometimes failed Kafka indexing tasks, the task log look quite empty (~180 lines) - ends with this -
2018-02-05T03:32:28,045 INFO [main] io.druid.guice.JsonConfigurator - Loaded class[class io.druid.query.search.search.SearchQueryConfig] from props[druid.query.search.] as [io.druid.query.search.search.SearchQueryConfig@7c3e4b1a]
2018-02-05T03:32:28,048 INFO [main] io.druid.guice.JsonConfigurator - Loaded class[class io.druid.query.metadata.SegmentMetadataQueryConfig] from props[druid.query.segmentMetadata.] as [io.druid.query.metadata.SegmentMetadataQueryConfig@41a374be]
2018-02-05T03:32:28,052 INFO [main] io.druid.guice.JsonConfigurator - Loaded class[class io.druid.query.groupby.GroupByQueryConfig] from props[druid.query.groupBy.] as [io.druid.query.groupby.GroupByQueryConfig@5f96f6a2]
2018-02-05T03:32:28,071 INFO [main] io.druid.offheap.OffheapBufferGenerator - Allocating new intermediate processing buffer[0] of size[536,870,912]
2018-02-05T03:32:41,994 INFO [main] io.druid.offheap.OffheapBufferGenerator - Allocating new intermediate processing buffer[1] of size[536,870,912]
2018-02-05T03:32:56,979 INFO [main] io.druid.offheap.OffheapBufferGenerator - Allocating new result merging buffer[0] of size[536,870,912]
2018-02-05T03:33:12,481 INFO [main] io.druid.offheap.OffheapBufferGenerator - Allocating new result merging buffer[1] of size[536,870,912]
Looking at the middle manager, I see this exception -
2018-02-05T03:32:20,956 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Submitting runnable for task[index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa]
2018-02-05T03:32:20,960 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Affirmative. Running task [index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa]
2018-02-05T03:32:20,963 INFO [forking-task-runner-3] io.druid.indexing.overlord.ForkingTaskRunner - Running command: java -cp /opt/kava/conf/druid/_common:/opt/kava/conf/druid/middleManager:lib/jetty-continuation-9.3.16.v20170120.jar:lib/druid-aws-common-0.10.0.jar:lib/java-xmlbuilder-1.1.jar:lib/...
2018-02-05T03:32:20,965 INFO [forking-task-runner-3] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa] location changed to [TaskLocation{host='ip-x-y-z-w.node.us-west-2.consul', port=8102}].
2018-02-05T03:32:20,965 INFO [forking-task-runner-3] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa] status changed to [RUNNING].
2018-02-05T03:32:20,965 INFO [forking-task-runner-3] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa output to: var/druid/task/index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa/log
2018-02-05T03:32:20,965 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Updating task [index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa] announcement with location [TaskLocation{host='ip-x-y-z-w.node.us-west-2.consul', port=8102}]
2018-02-05T03:33:14,778 INFO [HttpPostEmitter-1-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://json-push.service.us-west-2.consul:7000
2018-02-05T03:33:27,265 INFO [qtp81907268-47] io.druid.indexing.overlord.ForkingTaskRunner - Killing process for task: index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa
2018-02-05T03:33:27,286 INFO [qtp81907268-44] io.druid.indexing.overlord.ForkingTaskRunner - Killing process for task: index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa
2018-02-05T03:33:27,332 INFO [qtp81907268-57] io.druid.indexing.overlord.ForkingTaskRunner - Killing process for task: index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa
2018-02-05T03:33:27,466 INFO [forking-task-runner-3] io.druid.storage.s3.S3TaskLogs - Pushing task log var/druid/task/index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa/log to: druid/indexing-logs/index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa/log
2018-02-05T03:33:27,567 INFO [forking-task-runner-3] io.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_131]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_131]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_131]
at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_131]
at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:438) [druid-indexing-service-0.10.0.jar:0.10.0]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:220) [druid-indexing-service-0.10.0.jar:0.10.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
2018-02-05T03:33:27,568 INFO [forking-task-runner-3] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: var/druid/task/index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa
2018-02-05T03:33:27,572 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Job's finished. Completed [index_kafka_player-events-realtime_f99c5d8777e2890_igkdbboa] with status [FAILED]
I saw this exception was reported in https://github.com/druid-io/druid/issues/3054, but this seems to be different, as here it’s sporadic. I’m mainly concerned about whether I may be losing events when it happens. If the code just spawns another task that continues from the same position, then I guess it can be ignored…
Thanks!
Eran
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 19 (7 by maintainers)
I am using druid version .12.0, I am getting this same error, here is the error log from middle-manager,
2018-04-18T09:45:10,159 INFO [forking-task-runner-2] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_test1_4454ee7eece418b_ficeeefj] location changed to [TaskLocation{host=‘ubuntu16.04’, port=8100, tlsPort=-1}]. 2018-04-18T09:45:10,159 INFO [forking-task-runner-2] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_test1_4454ee7eece418b_ficeeefj] status changed to [RUNNING]. 2018-04-18T09:45:10,159 INFO [forking-task-runner-2] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_kafka_test1_4454ee7eece418b_ficeeefj output to: var/druid/task/index_kafka_test1_4454ee7eece418b_ficeeefj/log 2018-04-18T09:45:10,159 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Updating task [index_kafka_test1_4454ee7eece418b_ficeeefj] announcement with location [TaskLocation{host=‘ubuntu16.04’, port=8100, tlsPort=-1}] 2018-04-18T09:45:15,161 INFO [qtp1733903473-26] io.druid.indexing.overlord.ForkingTaskRunner - Killing process for task: index_kafka_test1_4454ee7eece418b_ficeeefj 2018-04-18T09:45:15,162 INFO [forking-task-runner-2] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: var/druid/indexing-logs/index_kafka_test1_4454ee7eece418b_ficeeefj.log 2018-04-18T09:45:15,163 INFO [forking-task-runner-2] io.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_161] at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_161] at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_161] at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_161] at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?] at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:452) [druid-indexing-service-0.12.0.jar:0.12.0] at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:224) [druid-indexing-service-0.12.0.jar:0.12.0] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161] 2018-04-18T09:45:15,165 INFO [forking-task-runner-2] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: var/druid/task/index_kafka_test1_4454ee7eece418b_ficeeefj 2018-04-18T09:45:15,166 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Job’s finished. Completed [index_kafka_test1_4454ee7eece418b_ficeeefj] with status [FAILED]
I am also using druid version .12.0 and I am getting this same error!
@suhassumukhv thanks for the logs! I could see the below logs from the overlord logs.
So, in your case, the supervisor killed the kafka index task because its offset (
[{0={0=48}}]
) was much behind the one stored in your metastore ([{0=1436430}]
). Would you try resetting the supervisor (http://druid.io/docs/latest/development/extensions-core/kafka-ingestion.html#reset-supervisor)?