kotlinx.coroutines: Kotlin 1.3.72 - Race Condition when using FlowAsPublisher with a multi-threaded consumer

I’m facing some weird race condition when wrapping a Flow into a Publisher. Since I was first suspecting the issue on the consumer side (using S3 client of the AWS SDK 2) there is also a ticket in their repo: https://github.com/aws/aws-sdk-java-v2/issues/953

My flow is created like this:

fun readFromFile(file: Path): Flow<ByteBuffer> = flow {
    val channel = AsynchronousFileChannel.open(file)
    channel.use {
        var filePosition = 0L
        while(true) {
            val buf = ByteBuffer.allocate(4096)
            val bytesRead = it.aRead(buf, filePosition)
            if(bytesRead <= 0)
                break
            filePosition += bytesRead
            buf.flip()
            // the following delay seems to suppress the race-condition
            // delay(10)
            emit(buf)
        }
    }
}

suspend fun AsynchronousFileChannel.aRead(buf: ByteBuffer, position: Long): Int =
        suspendCoroutine { cont ->
            read(buf, position, Unit, object : CompletionHandler<Int, Unit> {
                override fun completed(bytesRead: Int, attachment: Unit) {
                    cont.resume(bytesRead)
                }

                override fun failed(exception: Throwable, attachment: Unit) {
                    cont.resumeWithException(exception)
                }
            })
        }

And consumed like this:

s3client.putObject(PutObjectRequest.builder()
    .bucket(s3config.bucket.get())
    .key("test")
    .contentLength(inputFile.length())
    .contentType(MediaType.APPLICATION_OCTET_STREAM)
    .build(),
    AsyncRequestBody.fromPublisher(readFromFile(inputFile.toPath()).asPublisher()))

However, on the consumer side I can observe that the items of the flow are emitted by multiple threads and even concurrently which results in transmission failure since the ByteBuffers might be written in the wrong order.

Delaying the producing flow seems to suppress the race condition and the ByteBuffers are written in the correct order. Also when first collecting the flow like in the following, it is working:

val chunks = ArrayList<ByteBuffer>()
val c = readFromFile(inputFile.toPath())
c.collect {
  chunks.add(it)
}
... and then creating a Publisher out of it
AsyncRequestBody.fromPublisher(Flowable.fromIterable(chunks))

While debugging the S3 client implementation I saw that underlying IO threads (of netty) might invoke Subscription::request while another thread is executing Subscriber::onNext at the same time. Might this be the cause of the misbehavior? Can it somehow happen that under these circumstances multiple threads could run multiple Continuations of the same coroutine in-parallel?

I would highly appreciate any advice.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 16 (9 by maintainers)

Commits related to this issue

Most upvoted comments

Update: I’ve found how to fix this problem. Now figuring out what exactly is wrong, what part of the Publisher contract is violated (if any) and how to test for it.

@mhernand40 using Thread.yield() and kotlinx.coroutines.yield() instead of delay() does not mitigate the issue. The same race condition will still be observable.