reactor-core: Apparent memory leak in pipeline combining flatMaps ParallelFlux and map to same object type
Expected Behavior
I’m running a pipeline that potentially processes a large amount of data from different types of sources to a single type of object. The pipeline makes use of several flatMap
s then runs a computationally heavy part in parallel using ParallelFlux
. In the end the resulting items in the Flux
will be either written to some OutputStream
or processed further using doOnNext
or map
.
Actual Behavior
The pipeline works correctly. However, when running, eventually the items in the Flux
stop being garbage collected in young gen and end up in old gen. This sometimes happens right after starting the processing, but sometimes as much as a minute after starting the processing. Eventually the heap space is consumed fully due to old gen filling up.
I’ve played around with setting the parallelism
and prefetch
amount on the parallel
method. When set to a low amount the effect is slowed, but eventually it leads to the same result in my pipeline (it seems to help better in the simplified reproduction below).
Interestingly, if the items in flux are mapped to another type of object after calling sequential()
, there seems to be no leak.
Steps to Reproduce
I’ve created a simplified version of my pipeline with which I can reproduce the behavior. A github repo is available here.
Using a map
or after sequential()
that produces objects of the same type that are already in the Flux. Interestingly, if we map to a different type of object the leak does not seem to take place.
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
class MemLeakTest {
@Test
void memLeakTest() {
Flux.just("a", "b", "c", "d", "e", "f", "g")
.flatMap(id -> generateTuplesFor(id, (Integer.MAX_VALUE / 7)))
.parallel(6)
.runOn(Schedulers.parallel())
.flatMap(this::generatePairsFor)
.map(this::mapPair)
.sequential()
.map(this::mapPair) // If you remove this call, it seems to run without leaking.
// .map(this::toTriple) // Commenting out the above line and just mapping to a different object doesn't seem to leak.
.blockLast();
}
private Flux<Tuple2<String, String>> generateTuplesFor(String id, int amount) {
return Flux.fromStream(Stream.iterate(0, i -> i + 1)
.limit(amount)
.map(i -> generateTupleFor(id, i)));
}
private Tuple2<String, String> generateTupleFor(String id, int number) {
return Tuples.of(String.format("left-%s-%s", id, number), String.format("right-%s-%s", id, number));
}
private Flux<Pair> generatePairsFor(Tuple2<String, String> tuple2) {
return Flux.just(new Pair(tuple2.getT1(), tuple2.getT2()));
}
private Pair mapPair(Pair pair) {
var left = transformValue(pair.getLeft());
var right = transformValue(pair.getRight());
return new Pair(left, right);
}
private Triple toTriple(Pair pair) {
return new Triple(pair.left, pair.getLeft() + pair.getRight(), pair.getRight());
}
private String transformValue(String value) {
return String.format("transformed-%s", value);
}
static class Pair {
private final String left;
private final String right;
Pair(String left, String right) {
this.left = left;
this.right = right;
}
String getLeft() {
return left;
}
String getRight() {
return right;
}
}
static class Triple {
private final String left;
private final String middle;
private final String right;
Triple(String left, String middle, String right) {
this.left = left;
this.middle = middle;
this.right = right;
}
String getLeft() {
return left;
}
String getMiddle() {
return middle;
}
String getRight() {
return right;
}
}
}
Your Environment
- Reactor version(s) used: reactor-bom 2020.0.22
- Other relevant libraries versions (eg.
netty
, …): none - JVM version (
java -version
): OpenJDK 64-Bit Server VM (11.0.16.1+1, mixed mode) - OS and version (eg
uname -a
): Microsoft Windows 10 Pro 10.0.19044 N/A Build 19044
About this issue
- Original URL
- State: open
- Created 2 years ago
- Comments: 16 (6 by maintainers)
@pmaria we keep this issue open so it will be investigated once there is more availability! However, I doubt there will any success.
The challenging part here is that it can be correct that memory grows:
.parallel(6)
in combination withrunOn
allocates 6 queues. (parameter 6 sets specify the number of independent downstreams.runOn
under the hood is just normalpublishOn
so it allocates a queue by default.)flatMap
has a queue + every innerPublisher
flatten inflatMapo
has a subscriber with a small queue of 32 elements in size. By default main queue has a size of 256 and a concurrency of 256. It means 256 inner queues of max 32 elements will be potentially allocated..flatMap
downstream. We need to add an extra 6 * 256 queues.sequentially
which has a queue under the hood as well. So an extra queueIn total, around
~(6 + 6 + 6 * 256 + 1) * 256
queues (the outer multiplier 256 here is because everything is within the outerflatMap
) where every element can be stored for a long enough period of time so it moves from Eden to survival and then the old gen.Thus the memory leak you see can be just values stored in those queues and that could be expected. The performance gain you see can be just a wrong first impression (illusion during the first few mins of run) since over time GC will make your app as slow as possible…
@OlegDokuka yes, it is the
Pair
object that leaks. I can try to post more detail tomorrow, if necessary.