kotlinx.coroutines: Kotlin 1.3.72 - Race Condition when using FlowAsPublisher with a multi-threaded consumer
I’m facing some weird race condition when wrapping a Flow into a Publisher. Since I was first suspecting the issue on the consumer side (using S3 client of the AWS SDK 2) there is also a ticket in their repo:
https://github.com/aws/aws-sdk-java-v2/issues/953
My flow is created like this:
fun readFromFile(file: Path): Flow<ByteBuffer> = flow {
val channel = AsynchronousFileChannel.open(file)
channel.use {
var filePosition = 0L
while(true) {
val buf = ByteBuffer.allocate(4096)
val bytesRead = it.aRead(buf, filePosition)
if(bytesRead <= 0)
break
filePosition += bytesRead
buf.flip()
// the following delay seems to suppress the race-condition
// delay(10)
emit(buf)
}
}
}
suspend fun AsynchronousFileChannel.aRead(buf: ByteBuffer, position: Long): Int =
suspendCoroutine { cont ->
read(buf, position, Unit, object : CompletionHandler<Int, Unit> {
override fun completed(bytesRead: Int, attachment: Unit) {
cont.resume(bytesRead)
}
override fun failed(exception: Throwable, attachment: Unit) {
cont.resumeWithException(exception)
}
})
}
And consumed like this:
s3client.putObject(PutObjectRequest.builder()
.bucket(s3config.bucket.get())
.key("test")
.contentLength(inputFile.length())
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.build(),
AsyncRequestBody.fromPublisher(readFromFile(inputFile.toPath()).asPublisher()))
However, on the consumer side I can observe that the items of the flow are emitted by multiple threads and even concurrently which results in transmission failure since the ByteBuffers might be written in the wrong order.
Delaying the producing flow seems to suppress the race condition and the ByteBuffers are written in the correct order. Also when first collecting the flow like in the following, it is working:
val chunks = ArrayList<ByteBuffer>()
val c = readFromFile(inputFile.toPath())
c.collect {
chunks.add(it)
}
... and then creating a Publisher out of it
AsyncRequestBody.fromPublisher(Flowable.fromIterable(chunks))
While debugging the S3 client implementation I saw that underlying IO threads (of netty) might invoke Subscription::request while another thread is executing Subscriber::onNext at the same time. Might this be the cause of the misbehavior? Can it somehow happen that under these circumstances multiple threads could run multiple Continuations of the same coroutine in-parallel?
I would highly appreciate any advice.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 16 (9 by maintainers)
Commits related to this issue
- Fix race in Flow.asPublisher The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too Fixes #2109 — committed to Kotlin/kotlinx.coroutines by elizarov 4 years ago
- Fix race in Flow.asPublisher (#2124) The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too Fixes #2109 — committed to Kotlin/kotlinx.coroutines by elizarov 4 years ago
- Upgraded to kotlinx 1.3.8 to validate https://github.com/Kotlin/kotlinx.coroutines/issues/2109 — committed to marc-christian-schulze/kotlinx-reproducer-2109 by marc-christian-schulze 4 years ago
- Fix race in Flow.asPublisher (#2124) The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too Fixes #2109 — committed to recheej/kotlinx.coroutines by elizarov 4 years ago
Update: I’ve found how to fix this problem. Now figuring out what exactly is wrong, what part of the
Publishercontract is violated (if any) and how to test for it.@mhernand40 using
Thread.yield()andkotlinx.coroutines.yield()instead ofdelay()does not mitigate the issue. The same race condition will still be observable.