diesel: R2d2 connection pool may hang on droped packages

Setup

Versions

  • Rust: rustc 1.56.1 (likely irrelevant)
  • Diesel: 1.4.8
  • Database: postgresql
  • Operating System Ubuntu 21.10 (likely irrelevant)

Feature Flags

  • diesel: postgres r2d2

Problem Description

Cargo.toml

[package]
name = "crates-io-hang-reproduction"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
diesel = { version = "1.4.8", features = ["r2d2", "postgres"] }
dotenv = "0.15.0"
parking_lot = "0.11.2"
thiserror = "1.0.30"
url = "2.2.2"

main.rs:

use std::time::{Instant, Duration};

use diesel::RunQueryDsl;

#[macro_use]
extern crate diesel;

pub mod db {
    use diesel::prelude::*;
    use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
    use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
    use std::sync::Arc;
    use std::{ops::Deref, time::Duration};
    use thiserror::Error;
    use url::Url;

    #[derive(Clone)]
    pub enum DieselPool {
        Pool {
            pool: r2d2::Pool<ConnectionManager<PgConnection>>,
        },
        Test(Arc<ReentrantMutex<PgConnection>>),
    }

    impl DieselPool {
        pub(crate) fn new(
            url: &str,
            config: r2d2::Builder<ConnectionManager<PgConnection>>,
        ) -> Result<DieselPool, PoolError> {
            let manager = ConnectionManager::new(connection_url(url));

            // For crates.io we want the behavior of creating a database pool to be slightly different
            // than the defaults of R2D2: the library's build() method assumes its consumers always
            // need a database connection to operate, so it blocks creating a pool until a minimum
            // number of connections is available.
            //
            // crates.io can actually operate in a limited capacity without a database connections,
            // especially by serving download requests to our users. Because of that we don't want to
            // block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid
            // serving errors for the first connections until the pool is initialized) and if we can't
            // establish any connection continue booting up the application. The database pool will
            // automatically be marked as unhealthy and the rest of the application will adapt.
            let pool = DieselPool::Pool {
                pool: config.build_unchecked(manager),
            };
            match pool.wait_until_healthy(Duration::from_secs(5)) {
                Ok(()) => {}
                Err(PoolError::UnhealthyPool) => {}
                Err(err) => return Err(err),
            }

            Ok(pool)
        }

        pub(crate) fn new_test(url: &str) -> DieselPool {
            let conn = PgConnection::establish(&connection_url(url))
                .expect("failed to establish connection");
            conn.begin_test_transaction()
                .expect("failed to begin test transaction");
            DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
        }

        pub fn get(&self) -> Result<DieselPooledConn<'_>, PoolError> {
            match self {
                DieselPool::Pool { pool } => {
                    if let Some(conn) = pool.try_get() {
                        Ok(DieselPooledConn::Pool(conn))
                    } else if !self.is_healthy() {
                        Err(PoolError::UnhealthyPool)
                    } else {
                        Ok(DieselPooledConn::Pool(pool.get()?))
                    }
                }
                DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())),
            }
        }

        pub fn state(&self) -> PoolState {
            match self {
                DieselPool::Pool { pool, .. } => {
                    let state = pool.state();
                    PoolState {
                        connections: state.connections,
                        idle_connections: state.idle_connections,
                    }
                }
                DieselPool::Test(_) => PoolState {
                    connections: 0,
                    idle_connections: 0,
                },
            }
        }

        pub fn wait_until_healthy(&self, timeout: Duration) -> Result<(), PoolError> {
            match self {
                DieselPool::Pool { pool, .. } => match pool.get_timeout(timeout) {
                    Ok(_) => Ok(()),
                    Err(_) if !self.is_healthy() => Err(PoolError::UnhealthyPool),
                    Err(err) => Err(PoolError::R2D2(err)),
                },
                DieselPool::Test(_) => Ok(()),
            }
        }

        fn is_healthy(&self) -> bool {
            self.state().connections > 0
        }
    }

    #[derive(Debug, Copy, Clone)]
    pub struct PoolState {
        pub connections: u32,
        pub idle_connections: u32,
    }

    pub enum DieselPooledConn<'a> {
        Pool(r2d2::PooledConnection<ConnectionManager<PgConnection>>),
        Test(ReentrantMutexGuard<'a, PgConnection>),
    }

    impl Deref for DieselPooledConn<'_> {
        type Target = PgConnection;

        fn deref(&self) -> &Self::Target {
            match self {
                DieselPooledConn::Pool(conn) => conn.deref(),
                DieselPooledConn::Test(conn) => conn.deref(),
            }
        }
    }

    pub fn connect_now() -> ConnectionResult<PgConnection> {
        let url = connection_url(&std::env::var("DATABASE_URL").unwrap());
        PgConnection::establish(&url)
    }

    pub fn connection_url(url: &str) -> String {
        let mut url = Url::parse(url).expect("Invalid database URL");
        if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") {
            url.query_pairs_mut().append_pair("sslmode", "require");
        }
        url.into()
    }

    #[derive(Debug, Clone, Copy)]
    pub struct ConnectionConfig {
        pub statement_timeout: u64,
        pub read_only: bool,
    }

    impl CustomizeConnection<PgConnection, r2d2::Error> for ConnectionConfig {
        fn on_acquire(&self, conn: &mut PgConnection) -> Result<(), r2d2::Error> {
            use diesel::sql_query;

            sql_query(format!(
                "SET statement_timeout = {}",
                self.statement_timeout * 1000
            ))
            .execute(conn)
            .map_err(r2d2::Error::QueryError)?;
            if self.read_only {
                sql_query("SET default_transaction_read_only = 't'")
                    .execute(conn)
                    .map_err(r2d2::Error::QueryError)?;
            }
            Ok(())
        }
    }

    #[cfg(test)]
    pub(crate) fn test_conn() -> PgConnection {
        let conn = PgConnection::establish(&crate::env("TEST_DATABASE_URL")).unwrap();
        conn.begin_test_transaction().unwrap();
        conn
    }

    #[derive(Debug, Error)]
    pub enum PoolError {
        #[error(transparent)]
        R2D2(#[from] r2d2::PoolError),
        #[error("unhealthy database pool")]
        UnhealthyPool,
    }
}

table! {
    users {
        id -> Integer,
        name -> Text,
    }
}

fn main() {
    let db_url = std::env::var("DATABASE_URL").unwrap();
    let pool = self::db::DieselPool::new(&db_url, Default::default()).unwrap();
    loop {
        println!("Start loop");
        let loop_start = Instant::now();
        let conn = pool.get().unwrap();
        println!("Took {} ms to get connection", loop_start.elapsed().as_millis());

        let _: (i32, String) = users::table.first(&*conn).unwrap();
        println!("Sleep 1s");
        std::thread::sleep(Duration::from_secs(1));
        println!("Finish sleep");
    }
}

Using the following reproduction steps produces an application freeze:

  1. DATABASE_URL=… cargo run
  2. sudo iptables -A OUTPUT -p tcp --dport 5432 -j DROP

What is the expected output?

I expect a panic due to the unwrap in line 199.

What is the actual output?

The output stops + the application seems to hang

Steps to reproduce

Using the following reproduction steps produces an application freeze:

  1. DATABASE_URL=… cargo run
  2. sudo iptables -A OUTPUT -p tcp --dport 5432 -j ACCEPT

Checklist

  • This issue can be reproduced on Rust’s stable channel. (Your issue will be closed if this is not the case)
  • This issue can be reproduced without requiring a third party crate

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 15 (9 by maintainers)

Commits related to this issue

Most upvoted comments

So long story short setting DATABASE_URL="postgres://localhost/diesel_test?tcp_user_timeout=200" works for me, so not sure what went wrong with that earlier on.

I played around with tcp_user_timeout, and indeed it allows to configure the overall timeout to whatever value we find acceptable in production. That sounds great, if you want to close the issue please do!

By the way, I also remember trying tcp_user_timeout back then in my investigation, and I also apparently did something wrong when I tested it. Somehow reassuring I wasn’t alone in trying it wrong 😅

Thanks for your comment here.

We were surprised that our apps were blocking on that timeout before diesel/r2d2 timeouts were taking affect.

Diesel and r2d2 are two different crates. r2d2 already has an issue for this: https://github.com/sfackler/r2d2/issues/116. The short answer is: They cannot do much there, as there is no way to stop in the middle of executing stuff.

Setting TCP_USER_TIMEOUT to 200ms seems really aggressive, but that might be an example vs a recommendation. The TCP_USER_TIMEOUT will take precedence over other TCP timeouts like keepalive and TCP retry settings. TCP retries start a just over 200ms on a low latency network, so if you’ve just lost a packet or two the system wouldn’t have time to recover. If you set this to be really aggressive, and your db connection went idle, you would probably end up killing the connection before a reasonable keepalive period would pass. If you’re pooling I think that would cause the connections to flap and might exhaust TCP connection limits on the server side. At very least you would end up paying the overhead of establishing the TCP/TLS session a lot more often.

If the workaround is to “fail fast” at the TCP layer, we are accomplishing that by reducing the number of TCP retries from the default of 15 to 3 or 4 by setting net.ipv4.tcp_retries2=4 at the system level. This isn’t ideal since its a system wide setting that requires elevated privileges. More annoying if you’re using containers since it need to be set in each container.

Here’s a good article on linux tcp retries and how the timeout works.

To be clear: tcp_user_timeout=200 is just an example. In the end you need to adjust that value to your needs, not just because I wrote that value works in my tests. I’ve chosen 200, just because I did not want to wait that long 🤷 . I’ve also verified now that this works just fine with larger value, even without changing something at the operating system level.

Our wish is that the diesel/r2d2 wouldn’t be blocked by the network IO and would move on to retry on a healthy connection after a timeout period so we wouldn’t have to do system level changes to reduce the impact of a less than ideal network connection.

To be clear here: There is really not much we can do here. You cannot just abort operations at operating system level, so if something is blocking it just blocks till it’s done. That’s how these things are designed. If you don’t want that you need to use an async approach. For diesel that would be diesel-async. That allow you to control such things in detail. And to be clear here: There is exactly one reason I haven’t closed that issue as “won’t fix”, works as designed yet. That’s because the crates.io folks do require a solution with sync diesel here for technical reasons.

Just want to comment, that we are also users of diesel r2d2 postgres, and we are also affected by this issue! would love to have a fix upstream

I’ll check with the rest of the team and get back to you in the coming days!