pulsar: [Bug] Throw exception though transaction committed successfully.

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.9

Minimal reproduce step

transactional produce 100 million messages, and restart brokers every 120s for 50 times.

What did you expect to see?

we expect that there is no any message duplication.

What did you see instead?

there exists message duplication.

Anything else?

i find some reasons accounting for this problem.

Commit transaction failed with exception :
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$InvalidTxnStatusException: Expect Txn `(7,24726)` to be in COMMITTING status but it is in COMMITTED status

when producer commit transaction to broker, the broker throw exceptions though the transaction have been committed successfully. Then the producer receive the exception and thought that it fail to commit the transaction, as a result producer resend the messages in the transaction, which account for the message duplication.

image image image

whole log in broker as follow:

2023-02-28T11:14:48,237+0800 [pulsar-transaction-timer-46-1] ERROR org.apache.pulsar.broker.TransactionMetadataStoreService - End transaction fail! TxnId : (1,50425), TxnAction : 0
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException$CoordinatorNotFoundException: Transaction coordinator with id 1 not found!
    at org.apache.pulsar.broker.TransactionMetadataStoreService.getTxnMeta(TransactionMetadataStoreService.java:343) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.endTransaction(TransactionMetadataStoreService.java:391) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$16(TransactionMetadataStoreService.java:416) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_92]

2023-02-28T11:15:14,441+0800 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR **org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore - TxnID : (1,50425) add update txn status error with TxnStatus : COMMITTED**
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException$InvalidTxnStatusException: Expect Txn `(1,50425)` to be in COMMITTING status but it is in COMMITTED status
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.checkTxnStatus(TxnMetaImpl.java:96) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.updateTxnStatus(TxnMetaImpl.java:145) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.updateTxnStatus(TxnMetaImpl.java:37) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.**MLTransactionMetadataStore.lambda$null$12(MLTransactionMetadataStore.java:392)** ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_92]
    at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_92]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_92]
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_92]
    at org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl$3.addComplete(MLTransactionLogImpl.java:186) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter$DisabledBatchCallback.addComplete(TxnLogBufferedWriter.java:642) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:228) ~[org.apache.pulsar-managed-ledger-2.9.4.jar:2.9.4]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[org.apache.bookkeeper-bookkeeper-common-4.14.5.jar:4.14.5]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_92]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_92]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_92]

2023-02-28T11:15:14,442+0800 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.TransactionMetadataStoreService - End transaction fail! TxnId : (1,50425), TxnAction : 0
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException$InvalidTxnStatusException: Expect Txn `(1,50425)` to be in COMMITTING status but it is in COMMITTED status
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.checkTxnStatus(TxnMetaImpl.java:96) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.updateTxnStatus(TxnMetaImpl.java:145) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.updateTxnStatus(TxnMetaImpl.java:37) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore.lambda$null$12(MLTransactionMetadataStore.java:392) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]


2023-02-28T11:15:14,442+0800 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.ServerCnx - Send response error for END_TXN request 4236144793649783193.
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException$InvalidTxnStatusException: Expect Txn `(1,50425)` to be in COMMITTING status but it is in COMMITTED status
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.checkTxnStatus(TxnMetaImpl.java:96) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.updateTxnStatus(TxnMetaImpl.java:145) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl.updateTxnStatus(TxnMetaImpl.java:37) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
image

exception is throw by line 392. Though line 374 check out whether the current state of transaction is Committed, there may be such a situation that, both two commit request pass the check in line 374, and the first request change the state of transaction to Committed successfully, so the second request will throw exceptions above when executing

txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);

Exceptions above can be fixed by #19662 .

But, there are other situations. exceptions throw as follows:

2023-02-28T13:13:09,100+0800 [pulsar-transaction-timer-46-1] ERROR org.apache.pulsar.broker.TransactionMetadataStoreService - End transaction fail! TxnId : (4,57640), TxnAction : 0
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException$CoordinatorNotFoundException: Transaction coordinator with id 4 not found!
    at org.apache.pulsar.broker.TransactionMetadataStoreService.getTxnMeta(TransactionMetadataStoreService.java:343) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.endTransaction(TransactionMetadataStoreService.java:391) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$16(TransactionMetadataStoreService.java:416) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
    at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_92]

2023-02-28T13:13:34,111+0800 [pulsar-io-8-3] ERROR org.apache.pulsar.broker.TransactionMetadataStoreService - End transaction fail! TxnId : (4,57640), TxnAction : 0
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException$TransactionNotFoundException: The transaction with this txdID `(4,57640)`not found
    at org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore.getTxnMeta(MLTransactionMetadataStore.java:219) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.getTxnMeta(TransactionMetadataStoreService.java:345) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.endTransaction(TransactionMetadataStoreService.java:391) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.endTransaction(TransactionMetadataStoreService.java:370) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.service.ServerCnx.handleEndTxn(ServerCnx.java:2187) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:408) ~[org.apache.pulsar-pulsar-common-2.9.4.jar:2.9.4]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]


2023-02-28T13:13:34,113+0800 [pulsar-io-8-3] ERROR org.apache.pulsar.broker.service.ServerCnx - Send response error for END_TXN request 2533659891027037444.
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException$TransactionNotFoundException: The transaction with this txdID `(4,57640)`not found
    at org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore.getTxnMeta(MLTransactionMetadataStore.java:219) ~[org.apache.pulsar-pulsar-transaction-coordinator-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.getTxnMeta(TransactionMetadataStoreService.java:345) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.endTransaction(TransactionMetadataStoreService.java:391) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.TransactionMetadataStoreService.endTransaction(TransactionMetadataStoreService.java:370) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.broker.service.ServerCnx.handleEndTxn(ServerCnx.java:2187) ~[org.apache.pulsar-pulsar-broker-2.9.4.jar:2.9.4]
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:408) ~[org.apache.pulsar-pulsar-common-2.9.4.jar:2.9.4]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) ~[io.netty-netty-handler-4.1.77.Final.jar:4.1.77.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.77.Final.jar:4.1.77.Final]

At such situation, the first commit request may have completed the execution and remove the txnMeta from txnMetaMap. As a result, the second commit request will throw TransactionNotFoundException. To fix this problem, i think that we should hold the txnMeta for transaction timeout, or we can’t know the state of terminated transactions. I think we need a PIP to design this.

Finally, the analysis above do not explain why there are two commit request arrive to broker. I will explain this following. Note that the first log is executed in the pulsar-transaction-time-46-1 thread, which means it is a timer task arranged by the previous call of method TransactionMetadataStoreService#endTransaction. Then we get the whole picture. First of all, the first commit request arrive to broker, which change the state of transaction to Committing. when the broker try to commit the transaction next, the broker restarts, which close many resources. As a result, exception that can be retry judged by method TransactionMetadataStoreService#isRetryableException is throw and a timer task trying to commit again is arranged in HashedWheelTimer. Secondly, when the timer task is executed, the tc have been closed, which result into the first error log above, that is CoordinatorException$CoordinatorNotFoundException. When the client receive this exceptions, it decide to resend the commit request internally due to method TransactionMetaStoreHandler#checkIfNeedRetryByError. So the second commit request is issued to broker.

Are you willing to submit a PR?

  • I’m willing to submit a PR!

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 16 (15 by maintainers)

Most upvoted comments

I don’t quite understand, does the client need to configure two timeouts?

I have implemented this in PR #19662.

can you show me your use example? As far as I guess, your example doesn’t contain any consumers? If you want to ensure multi-topic produce exactly-once semantics without consumer, now pulsar transaction can’t ensure.

yes we do not contain consumers in our experiment. We want to use pulsar transaction to ensure producer-side exactly-once semantics, and use flink-connector to provide consumer-side exactly-once semantics. Later i will issue a PR for supporting checkout whether transactional producer ensure exactly-once semantics.

I understand what you mean, but this may be a new feature. #19662 fix some situations but it can’t resolve all problems. we may need retention the ended transactions in TC and provide a search API to check the txn status. It is a new feature, are you interested in doing it?