npgsql: Unexpected timeouts in logical replication
Hi!
I am trying out the logical replication feature (https://www.npgsql.org/doc/replication.html) and I have a few questions. Hope for help.
- I have created a table, a publication and a replication slot. Then I copied the code from the documentation:
await foreach (var message in connection.StartReplication(slot, options, cancellationToken))
{
Console.WriteLine(message);
}
But every time I run the application, I get all messages from the beginning. Is there some way to confirm the processing of the message? I’ve tried using SendStatusUpdate but it doesn’t work:
await foreach (var message in connection.StartReplication(slot, options, cancellationToken))
{
Console.WriteLine(message);
await connection.SendStatusUpdate(cancellationToken);
}
- When the application does not receive messages for a long time, I get an exception:
Npgsql.NpgsqlException (0x80004005): Exception while reading from stream
---> System.TimeoutException: Timeout during reading attempt
at Npgsql.NpgsqlConnector.<ReadMessage>g__ReadMessageLong|194_0(NpgsqlConnector connector, Boolean async, DataRowLoadingMode dataRowLoadingMode, Boolean readingNotifications, Boolean isReadingPrepend
edMessage)
at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+MoveNext()
at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+MoveNext()
at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<Syst
em.Boolean>.GetResult()
at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+MoveNext()
at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+MoveNext()
at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
I’ve tried using an infinite loop like this:
while (true)
{
try
{
await foreach (var message in connection.StartReplication(slot, options, cancellationToken))
{
Console.WriteLine(message);
}
}
catch (NpgsqlException ex)
{
Console.WriteLine(ex);
continue;
}
}
But this again reads all the messages from the beginning. How to handle this situation correctly?
- Is there some way to get old values in updated and deleted rows? In this case, there is no way to understand which row was deleted and process it:
if (message is DeleteMessage deleteMessage)
{
// How to process this message?
}
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 16 (13 by maintainers)
Commits related to this issue
- Fix ReplicationConnection.SendFeedback reset of _requestFeedbackTimer Unconditionally resetting _requestFeedbackTimer led to cancellation of idle replication connections due to the expiring timeout b... — committed to Brar/npgsql by Brar 3 years ago
- Fix ReplicationConnection.SendFeedback reset of _requestFeedbackTimer (#3557) Unconditionally resetting _requestFeedbackTimer led to cancellation of idle replication connections due to the expiring ... — committed to npgsql/npgsql by Brar 3 years ago
- Fix ReplicationConnection.SendFeedback reset of _requestFeedbackTimer (#3557) Unconditionally resetting _requestFeedbackTimer led to cancellation of idle replication connections due to the expiring t... — committed to npgsql/npgsql by Brar 3 years ago
- Depreciate LastAppliedLsn and LastFlushedLsn setters After discussion in #3534 it became clear that the current API is less than ideal for two reasons: 1. Setting those two properties has (intended)... — committed to Brar/npgsql by Brar 3 years ago
- Depreciate LastAppliedLsn and LastFlushedLsn setters (#3755) After discussion in #3534 it became clear that the current API is less than ideal for two reasons: 1. Setting those two properties has... — committed to npgsql/npgsql by Brar 3 years ago
- Add ReplicationConnection.LastAppliedLsn and LastFlushedLsn wrapper After discussion in #3534 it became clear that the current API is less than ideal for two reasons: 1. Setting the LastAppliedLsn a... — committed to npgsql/npgsql by Brar 3 years ago
- Add ReplicationConnection.LastAppliedLsn and LastFlushedLsn wrapper After discussion in #3534 it became clear that the current API is less than ideal for two reasons: 1. Setting the LastAppliedLsn a... — committed to npgsql/npgsql by Brar 3 years ago
@Chakrygin I just want to let you know that we’ve released 5.0.4 which contains the fix for the problem described above.
It’s essentially two different levels of persistence that you can report back to the server.
Above I wrote that “I’d advise you to keep track of their log sequence number (LSN) in your consuming application” but I since have no idea what your application will do and what consistency guarantees it needs, I didn’t go any further. You might somehow process the transactions you received from the server in memory and report back, that you’ve successfully applied the transaction in your system (e. g. that it’s visible to users) via
LastAppliedLsn. On the other hand you may not want to persist the transaction to disk storage immediately (e. g. for performance reasons) using fsync (orFileStream.Flush()) but once you do so, you can report this back to the server viaLastFlushedLsn.In synchronous replication you can use the
synchronous_commitserver configuration option to configure the guarantees the server will await from the replication standby (your application) for transaction commits.You can have a look on our
SynchronousReplicationtest if you want to look at the details.I’d say yes, for asynchronous replication scenarios, but if you look at the documentation around
synchronous_commityou’ll probably see that it’s pretty confusing. Personally I’d always assign both of them. Either at the same time or independently, depending on whether the client has applied the transaction or has flushed it to the storage system.Hey @Chakrygin, thanks for providing feedback on the new replication feature. I’ll try to address your questions one by one.
The default for
LogicalReplicationConnection.StartReplication()is to start with the first transaction that is still available in the replication slot. You can query it viaSELECT restart_lsn FROM pg_replication_slots WHERE slot_name = '<your slot name>';(also see: https://www.postgresql.org/docs/current/view-pg-replication-slots.html).If you don’t want to reprocess transactions you already received, I’d advise you to keep track of their log sequence number (LSN) in your consuming application (use
ReplicationMessage.WalEndto get the LSN) and request the next transaction by using thewalLocationargument forLogicalReplicationConnection.StartReplication(), providing the appropriate LSN.Yes, there is. Just assign
ReplicationConnection.LastFlushedLsn. There’s alsoLastAppliedLsnwhich you can also assign in a more fine grained approach andLastReceivedLsnwhich is readonly and automatically updated by Npgsql upon receiving a message.Npgsql will automatically send feedback to the server if the server requests it or at least after the configured
ReplicationConnection.WalReceiverStatusIntervalhas passed. The feedback will always contain the values ofLastReceivedLsn,LastAppliedLsnandLastFlushedLsnwhich the server will use to update the replication slot. You can moitor the progress viaSELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '<your slot name>';.In common scenarios you probably don’t have to use
ReplicationConnection.SendStatusUpdate(). You can use it to force sending feedback to the server immediately, which is helpful in synchronous replication scenarios but otherwise you can and probably should rely on the automated feedback.This is interesting because it should not happen. The integrated feedback mechanism should keep the connection running forever and the only problem should be that, if you don’t update
LastFlushedLsn, the server can’t recycle the WAL and at some point, the size of the replication slot might outgrow the size of your hard disk, but I wouldn’t expect this in common testing scenarios.I’ve tried to reproduce your problem and have been running my test application for about an hour now, so my first question is: “How long is ‘a long time’?” and after that I might have a few questions regarding your server configuration (assuming that you kept the default timeouts in Npgsql).
Yes there is. You may want to use the appropriate subclasses of
UpdateMessageandDeleteMessage. Which one you’ll get depends on theREPLICA IDENTITYsetting you used when creating the table. You may query the value viaSELECT relreplident FROM pg_class WHERE oid = '"<your table name>"'::regclass;(meanings of the values ‘d’: default, ‘n’: ‘nothing’, ‘f’: ‘full’, ‘i’: ‘index’; See also https://www.postgresql.org/docs/current/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY). There’s:KeyDeleteMessageFullDeleteMessageIndexUpdateMessageFullUpdateMessageWe currently only have the API docs for those classes but their use shouldn’t be too hard since they’re mere wrappers of the protocol messages (https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html).
I hope this will help you to get started and understand some of the (frankly quite complicated) replication internals.
If the exception you mentioned above is reproducible, please let me know how to do so. Also if you run into other exceptions or weird behavior (the feature is new) I’m really interested in your feedback.
Hi @Chakrygin, I’ve located and fixed the bug that led to the problem you described above. Thank you again for reporting!
Since the fix is a one-liner I don’t think that anyone will disagree with backpatching it to 5.x, but unfortunately you’ll have to wait for the next minor release until you can get hold of a corrected package. Until then you should be able to work around the problem by setting
ReplicationConnection.WalReceiverTimeoutto a value that is less thanReplicationConnection.WalReceiverStatusInterval.@Chakrygin I think I can already pretty much confirm that this is a bug.
I managed to reproduce your problem on my local system and in the server logs (at DEBUG5 log level) I can see a few lines that show that we’re cancelling the replication connection because of a timeout.
Explanation:
The bug here is that for some reason we don’t seem to request a keepalive from the server before we kill the connection because of a timeout. IIRC we should be requesting feedback after half of the timeout period has passed but for some reason this doesn’t seem to happen. I’ll have to investigate a bit further but for now you can already pretty safely assume that it’s not your fault and that we’ll have to fix this on our part.
Pretty much, yes. Ideally your client application should always know about it’s state and tell the server at which LSN to start specifically when it (re-)starts replication. If it doesn’t, the server will always start at the earliest LSN it has around. This may even be one that you have already reported as applied and flushed. Essentially, if your application doesn’t tell the server where to start, it should be prepared to handle messages it has already received.
What you report back helps the server to decide when a WAL file is no longer needed an can be removed/recycled.
In that case I’d assign both
LastAppliedLsnandLastFlushedLsnat the same time or onlyLastFlushedLsn. Mind you, Postgres will not consider the Replication as “completely done” until you report a transaction as applied and flushed and keep the WAL around.Great minds think alike.
When I wrote the above, I already made a note for myself that says: “Update the getting started sample to increment
LastFlushedLsn”.Will fix this tonight.