npgsql: Unexpected timeouts in logical replication

Hi!

I am trying out the logical replication feature (https://www.npgsql.org/doc/replication.html) and I have a few questions. Hope for help.

  1. I have created a table, a publication and a replication slot. Then I copied the code from the documentation:
await foreach (var message in connection.StartReplication(slot, options, cancellationToken))
{
    Console.WriteLine(message);
}

But every time I run the application, I get all messages from the beginning. Is there some way to confirm the processing of the message? I’ve tried using SendStatusUpdate but it doesn’t work:

await foreach (var message in connection.StartReplication(slot, options, cancellationToken))
{
    Console.WriteLine(message);

    await connection.SendStatusUpdate(cancellationToken);
}
  1. When the application does not receive messages for a long time, I get an exception:
Npgsql.NpgsqlException (0x80004005): Exception while reading from stream
 ---> System.TimeoutException: Timeout during reading attempt
   at Npgsql.NpgsqlConnector.<ReadMessage>g__ReadMessageLong|194_0(NpgsqlConnector connector, Boolean async, DataRowLoadingMode dataRowLoadingMode, Boolean readingNotifications, Boolean isReadingPrepend
edMessage)
   at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<Syst
em.Boolean>.GetResult()
   at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()

I’ve tried using an infinite loop like this:

while (true)
{
    try
    {
        await foreach (var message in connection.StartReplication(slot, options, cancellationToken))
        {
            Console.WriteLine(message);
        }
    }
    catch (NpgsqlException ex)
    {
        Console.WriteLine(ex);
        continue;
    }
}

But this again reads all the messages from the beginning. How to handle this situation correctly?

  1. Is there some way to get old values in updated and deleted rows? In this case, there is no way to understand which row was deleted and process it:
if (message is DeleteMessage deleteMessage)
{
    // How to process this message?
}

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 16 (13 by maintainers)

Commits related to this issue

Most upvoted comments

@Chakrygin I just want to let you know that we’ve released 5.0.4 which contains the fix for the problem described above.

What is the difference between LastAppliedLsn and LastFlushedLsn? In what scenarios will the LastAppliedLsn update be useful?

It’s essentially two different levels of persistence that you can report back to the server.

Above I wrote that “I’d advise you to keep track of their log sequence number (LSN) in your consuming application” but I since have no idea what your application will do and what consistency guarantees it needs, I didn’t go any further. You might somehow process the transactions you received from the server in memory and report back, that you’ve successfully applied the transaction in your system (e. g. that it’s visible to users) via LastAppliedLsn. On the other hand you may not want to persist the transaction to disk storage immediately (e. g. for performance reasons) using fsync (or FileStream.Flush()) but once you do so, you can report this back to the server via LastFlushedLsn.

In synchronous replication you can use the synchronous_commit server configuration option to configure the guarantees the server will await from the replication standby (your application) for transaction commits.

You can have a look on our SynchronousReplication test if you want to look at the details.

Am I correct in understanding that updating LastAppliedLsn is optional?

I’d say yes, for asynchronous replication scenarios, but if you look at the documentation around synchronous_commit you’ll probably see that it’s pretty confusing. Personally I’d always assign both of them. Either at the same time or independently, depending on whether the client has applied the transaction or has flushed it to the storage system.

Hey @Chakrygin, thanks for providing feedback on the new replication feature. I’ll try to address your questions one by one.

But every time I run the application, I get all messages from the beginning.

The default for LogicalReplicationConnection.StartReplication() is to start with the first transaction that is still available in the replication slot. You can query it via SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = '<your slot name>'; (also see: https://www.postgresql.org/docs/current/view-pg-replication-slots.html).

If you don’t want to reprocess transactions you already received, I’d advise you to keep track of their log sequence number (LSN) in your consuming application (use ReplicationMessage.WalEnd to get the LSN) and request the next transaction by using the walLocation argument for LogicalReplicationConnection.StartReplication(), providing the appropriate LSN.

Is there some way to confirm the processing of the message?

Yes, there is. Just assign ReplicationConnection.LastFlushedLsn. There’s also LastAppliedLsn which you can also assign in a more fine grained approach and LastReceivedLsn which is readonly and automatically updated by Npgsql upon receiving a message.

Npgsql will automatically send feedback to the server if the server requests it or at least after the configured ReplicationConnection.WalReceiverStatusInterval has passed. The feedback will always contain the values of LastReceivedLsn, LastAppliedLsn and LastFlushedLsn which the server will use to update the replication slot. You can moitor the progress via SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '<your slot name>'; .

I’ve tried using SendStatusUpdate but it doesn’t work

In common scenarios you probably don’t have to use ReplicationConnection.SendStatusUpdate(). You can use it to force sending feedback to the server immediately, which is helpful in synchronous replication scenarios but otherwise you can and probably should rely on the automated feedback.

When the application does not receive messages for a long time, I get an exception

This is interesting because it should not happen. The integrated feedback mechanism should keep the connection running forever and the only problem should be that, if you don’t update LastFlushedLsn, the server can’t recycle the WAL and at some point, the size of the replication slot might outgrow the size of your hard disk, but I wouldn’t expect this in common testing scenarios.

I’ve tried to reproduce your problem and have been running my test application for about an hour now, so my first question is: “How long is ‘a long time’?” and after that I might have a few questions regarding your server configuration (assuming that you kept the default timeouts in Npgsql).

Is there some way to get old values in updated and deleted rows? In this case, there is no way to understand which row was deleted and process it

Yes there is. You may want to use the appropriate subclasses of UpdateMessage and DeleteMessage. Which one you’ll get depends on the REPLICA IDENTITY setting you used when creating the table. You may query the value via SELECT relreplident FROM pg_class WHERE oid = '"<your table name>"'::regclass; (meanings of the values ‘d’: default, ‘n’: ‘nothing’, ‘f’: ‘full’, ‘i’: ‘index’; See also https://www.postgresql.org/docs/current/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY). There’s:

  • KeyDeleteMessage
  • FullDeleteMessage
  • IndexUpdateMessage
  • FullUpdateMessage

We currently only have the API docs for those classes but their use shouldn’t be too hard since they’re mere wrappers of the protocol messages (https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html).

I hope this will help you to get started and understand some of the (frankly quite complicated) replication internals.

If the exception you mentioned above is reproducible, please let me know how to do so. Also if you run into other exceptions or weird behavior (the feature is new) I’m really interested in your feedback.

Hi @Chakrygin, I’ve located and fixed the bug that led to the problem you described above. Thank you again for reporting!

Since the fix is a one-liner I don’t think that anyone will disagree with backpatching it to 5.x, but unfortunately you’ll have to wait for the next minor release until you can get hold of a corrected package. Until then you should be able to work around the problem by setting ReplicationConnection.WalReceiverTimeout to a value that is less than ReplicationConnection.WalReceiverStatusInterval.

I created a test application where the exception issue is reproducible well. Errors occur at intervals of 1-2 minutes.

@Chakrygin I think I can already pretty much confirm that this is a bug.

I managed to reproduce your problem on my local system and in the server logs (at DEBUG5 log level) I can see a few lines that show that we’re cancelling the replication connection because of a timeout.

2021-02-20 19:02:19.527 CET [25496] DEBUG:  forked new backend, pid=32536 socket=5652
2021-02-20 19:02:19.545 CET [32536] DEBUG:  processing cancel request: sending SIGINT to process 16408
2021-02-20 19:02:19.549 CET [32536] DEBUG:  shmem_exit(0): 0 before_shmem_exit callbacks to make
2021-02-20 19:02:19.549 CET [32536] DEBUG:  shmem_exit(0): 0 on_shmem_exit callbacks to make
2021-02-20 19:02:19.549 CET [32536] DEBUG:  proc_exit(0): 1 callbacks to make
2021-02-20 19:02:19.549 CET [32536] DEBUG:  exit(0)
2021-02-20 19:02:19.549 CET [32536] DEBUG:  shmem_exit(-1): 0 before_shmem_exit callbacks to make
2021-02-20 19:02:19.549 CET [32536] DEBUG:  shmem_exit(-1): 0 on_shmem_exit callbacks to make
2021-02-20 19:02:19.549 CET [32536] DEBUG:  proc_exit(-1): 0 callbacks to make
2021-02-20 19:02:19.550 CET [16408] ERROR:  canceling statement due to user request
2021-02-20 19:02:19.552 CET [25496] DEBUG:  reaping dead processes
2021-02-20 19:02:19.552 CET [25496] DEBUG:  server process (PID 32536) exited with exit code 0
2021-02-20 19:02:19.601 CET [16408] DEBUG:  shmem_exit(0): 1 before_shmem_exit callbacks to make
2021-02-20 19:02:19.601 CET [16408] DEBUG:  shmem_exit(0): 7 on_shmem_exit callbacks to make
2021-02-20 19:02:19.601 CET [16408] DEBUG:  proc_exit(0): 3 callbacks to make
2021-02-20 19:02:19.601 CET [16408] DEBUG:  exit(0)
2021-02-20 19:02:19.601 CET [16408] DEBUG:  shmem_exit(-1): 0 before_shmem_exit callbacks to make
2021-02-20 19:02:19.601 CET [16408] DEBUG:  shmem_exit(-1): 0 on_shmem_exit callbacks to make
2021-02-20 19:02:19.601 CET [16408] DEBUG:  proc_exit(-1): 0 callbacks to make
2021-02-20 19:02:19.604 CET [25496] DEBUG:  reaping dead processes
2021-02-20 19:02:19.604 CET [25496] DEBUG:  server process (PID 16408) exited with exit code 0

Explanation:

  • 25496 is my postmaster process
  • 16408 is my replication connection

The bug here is that for some reason we don’t seem to request a keepalive from the server before we kill the connection because of a timeout. IIRC we should be requesting feedback after half of the timeout period has passed but for some reason this doesn’t seem to happen. I’ll have to investigate a bit further but for now you can already pretty safely assume that it’s not your fault and that we’ll have to fix this on our part.

Am I getting this right @Brar?

Pretty much, yes. Ideally your client application should always know about it’s state and tell the server at which LSN to start specifically when it (re-)starts replication. If it doesn’t, the server will always start at the earliest LSN it has around. This may even be one that you have already reported as applied and flushed. Essentially, if your application doesn’t tell the server where to start, it should be prepared to handle messages it has already received.

What you report back helps the server to decide when a WAL file is no longer needed an can be removed/recycled.

Basically I am looking at logical replication to push changes to Kafka for example. I think that using LastAppliedLsn will be enough in this case.

In that case I’d assign both LastAppliedLsn and LastFlushedLsn at the same time or only LastFlushedLsn. Mind you, Postgres will not consider the Replication as “completely done” until you report a transaction as applied and flushed and keep the WAL around.

@Brar should we update the replication docs to at least show people that they need to update LastFlushedLsn? That seems like a necessary part of a “getting started” code sample.

Great minds think alike.

When I wrote the above, I already made a note for myself that says: “Update the getting started sample to increment LastFlushedLsn”.

Will fix this tonight.