news-crawl: news-crawl 2.x Broken when using multiple workers (across multiple hosts)

Discussed in https://github.com/commoncrawl/news-crawl/discussions/62

<div type='discussions-op-text'>

Originally posted by alextechnology December 8, 2023 I spent today merging our local news-crawl codebase to the 2.x branch. We run only 1 nimbus and 2 supervisors. While the topology seems to work fine with a single worker, it does not work at all with two workers despite all the configuration being the same and working fine on news-crawl 1.2.4.

The first issue seems to be related to local worker not being able to connect to the remote supervisor/worker. Once the topology is submitted to storm, the worker log will scroll for over a minute with 140+ attempts like the following:

2023-12-08 21:14:17.456 o.a.s.m.n.Client client-worker-1 [ERROR] connection attempt 147 to Netty-Client-xxxxxxxxxx failed: org.apache.storm.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: xxxxxxxxxxxxxxxx

It will eventually start just trying to do the crawl but there will be lots of the followings kinds of logs indicating failures:

2023-12-08 21:14:20.720 o.a.s.m.n.Client client-worker-1 [ERROR] failed to send 1 messages to Netty-Client-xxxxxxxxx java.nio.channels.ClosedChannelException AND

2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 1 messages
2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 2 messages
2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 2 messages
2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 2 messages

The second issue (which I assume is related to the first) is this will inevitably lead to an exception with the java.util.ConcurrentModificationException error and crash the worker:

2023-12-08 21:14:23.316 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 3 messages
2023-12-08 21:14:23.316 o.a.s.u.Utils Thread-14-feed-executor[70, 71] [ERROR] Async loop died!
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at org.apache.storm.executor.Executor.accept(Executor.java:301) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) [storm-client-2.5.0.jar:2.5.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	... 6 more
Caused by: java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1511) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1544) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1542) ~[?:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	... 6 more
2023-12-08 21:14:23.320 o.a.s.e.e.ReportError Thread-14-feed-executor[70, 71] [ERROR] Error
java.lang.RuntimeException: java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at org.apache.storm.utils.Utils$1.run(Utils.java:413) ~[storm-client-2.5.0.jar:2.5.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at org.apache.storm.executor.Executor.accept(Executor.java:301) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.5.0.jar:2.5.0]
	... 1 more
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.5.0.jar:2.5.0]
	... 1 more
Caused by: java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1511) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1544) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1542) ~[?:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.5.0.jar:2.5.0]
	... 1 more

Thankfully, a single topology worker appears to run much faster in 2.x than 2 workers did in 1.2.4, so we’re ok with that for the moment but would be nice to find out why this cannot run with 2 or more topology workers.

EDIT:

I forgot to mention that the remote worker/supervisor experiences similar errors, though it does receive the topology and tries to crawl the pages, but fails with dropped messages and during Worker-Transfer</div>

About this issue

  • Original URL
  • State: closed
  • Created 7 months ago
  • Comments: 17 (10 by maintainers)

Most upvoted comments

Thanks @alextechnology, this is very useful. I think this is what is happening.

If you look at the topology class (assuming that this is what you are using, the Flux setup should be the same anyway), you’ll see that the output of the FeedParser goes into 3 places:

  1. the dummy indexer on the default stream - its role is just to change the value of the status and pass it to the status updater bolt. This is done only if a page is not a feed i.e it is an actual article.
  2. the status updater on the status stream - writes to Elastic. This is if a page is a feed and is used for adding discovered links to the status index. Also used to handle the status in case of an error.
  3. the WARC bolt on the default stream - again only if a page is not a feed

So the problem happens when a page is an article (and not a feed). The metadata is sent simultaneously to both the DummyIndexer and the WARCBolt. The DummyIndexer does not do much and sends the content to the StatusUpdaterBolt, which adds a key value to the Metadata. The problem is that the WARC bolt is probably on the other worker and therefore the tuple needs serializing with Kryo, we are getting the exception because sooner or later there the StatusUpdaterBolt will be changing the metadata while the serialization is happening.

Now one interesting thing is that if you look at the Storm UI when the topology is running you should see that the spouts have as many instances as there are shards in the status index. The number of instances for all the bolts is equal to the number of workers - apart for the WARC bolt which should have a single instance. This means we are bound to need serialization for 50% of the tuples as they have no chance of running locally.

What we should do is set the number of instances of the WARC bolt to be the same as the number of workers, this way the tuple would remain data local as much as possible and we would avoid serializing and sending the data across to another node, especially given that in this particular instance what we send is the entire binary content of a page, which can be pretty heavy. I will open an issue to that effect.

This is a partial solution - we still need a way of copying or cloning the metadata when the topology sends the same data to two different outputs, I need to think about the right solution to do this. However, having an instance of the WARC bolt locally should alleviate the problem.

Looking a the code, I can also see that the topology is configured incorrectly for the status updater bolt. They should receive the tuples on the status stream hashed on the value of the URL field - like done in the archetype for Elastic. This is also incorrect in the flux file. This went unnoticed because the number of workers is set to 1 by default. It means however that with more than 1 worker, the caching in the status bolts would be missed, resulting in more URLs getting sent to Elastic than necessary.

I will open another issue to fix this.

Your reporting of this problem has been very useful, thank you very much @alextechnology for doing it as well as the detective work you put in.

I will fix the number of occurrences for the WARC first so that you can give it a try.

Just to check - Storm 2.5.0 is installed on these nodes, right?

Correct, pre-built binary archive downloaded from archives.apache.com. Nimbus and supervisor both appear to startup and connect to zookeeper correctly