diesel: R2d2 connection pool may hang on droped packages
Setup
Versions
- Rust: rustc 1.56.1 (likely irrelevant)
- Diesel: 1.4.8
- Database: postgresql
- Operating System Ubuntu 21.10 (likely irrelevant)
Feature Flags
- diesel: postgres r2d2
Problem Description
Cargo.toml
[package]
name = "crates-io-hang-reproduction"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
diesel = { version = "1.4.8", features = ["r2d2", "postgres"] }
dotenv = "0.15.0"
parking_lot = "0.11.2"
thiserror = "1.0.30"
url = "2.2.2"
main.rs:
use std::time::{Instant, Duration};
use diesel::RunQueryDsl;
#[macro_use]
extern crate diesel;
pub mod db {
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
use std::sync::Arc;
use std::{ops::Deref, time::Duration};
use thiserror::Error;
use url::Url;
#[derive(Clone)]
pub enum DieselPool {
Pool {
pool: r2d2::Pool<ConnectionManager<PgConnection>>,
},
Test(Arc<ReentrantMutex<PgConnection>>),
}
impl DieselPool {
pub(crate) fn new(
url: &str,
config: r2d2::Builder<ConnectionManager<PgConnection>>,
) -> Result<DieselPool, PoolError> {
let manager = ConnectionManager::new(connection_url(url));
// For crates.io we want the behavior of creating a database pool to be slightly different
// than the defaults of R2D2: the library's build() method assumes its consumers always
// need a database connection to operate, so it blocks creating a pool until a minimum
// number of connections is available.
//
// crates.io can actually operate in a limited capacity without a database connections,
// especially by serving download requests to our users. Because of that we don't want to
// block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid
// serving errors for the first connections until the pool is initialized) and if we can't
// establish any connection continue booting up the application. The database pool will
// automatically be marked as unhealthy and the rest of the application will adapt.
let pool = DieselPool::Pool {
pool: config.build_unchecked(manager),
};
match pool.wait_until_healthy(Duration::from_secs(5)) {
Ok(()) => {}
Err(PoolError::UnhealthyPool) => {}
Err(err) => return Err(err),
}
Ok(pool)
}
pub(crate) fn new_test(url: &str) -> DieselPool {
let conn = PgConnection::establish(&connection_url(url))
.expect("failed to establish connection");
conn.begin_test_transaction()
.expect("failed to begin test transaction");
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
}
pub fn get(&self) -> Result<DieselPooledConn<'_>, PoolError> {
match self {
DieselPool::Pool { pool } => {
if let Some(conn) = pool.try_get() {
Ok(DieselPooledConn::Pool(conn))
} else if !self.is_healthy() {
Err(PoolError::UnhealthyPool)
} else {
Ok(DieselPooledConn::Pool(pool.get()?))
}
}
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())),
}
}
pub fn state(&self) -> PoolState {
match self {
DieselPool::Pool { pool, .. } => {
let state = pool.state();
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
DieselPool::Test(_) => PoolState {
connections: 0,
idle_connections: 0,
},
}
}
pub fn wait_until_healthy(&self, timeout: Duration) -> Result<(), PoolError> {
match self {
DieselPool::Pool { pool, .. } => match pool.get_timeout(timeout) {
Ok(_) => Ok(()),
Err(_) if !self.is_healthy() => Err(PoolError::UnhealthyPool),
Err(err) => Err(PoolError::R2D2(err)),
},
DieselPool::Test(_) => Ok(()),
}
}
fn is_healthy(&self) -> bool {
self.state().connections > 0
}
}
#[derive(Debug, Copy, Clone)]
pub struct PoolState {
pub connections: u32,
pub idle_connections: u32,
}
pub enum DieselPooledConn<'a> {
Pool(r2d2::PooledConnection<ConnectionManager<PgConnection>>),
Test(ReentrantMutexGuard<'a, PgConnection>),
}
impl Deref for DieselPooledConn<'_> {
type Target = PgConnection;
fn deref(&self) -> &Self::Target {
match self {
DieselPooledConn::Pool(conn) => conn.deref(),
DieselPooledConn::Test(conn) => conn.deref(),
}
}
}
pub fn connect_now() -> ConnectionResult<PgConnection> {
let url = connection_url(&std::env::var("DATABASE_URL").unwrap());
PgConnection::establish(&url)
}
pub fn connection_url(url: &str) -> String {
let mut url = Url::parse(url).expect("Invalid database URL");
if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") {
url.query_pairs_mut().append_pair("sslmode", "require");
}
url.into()
}
#[derive(Debug, Clone, Copy)]
pub struct ConnectionConfig {
pub statement_timeout: u64,
pub read_only: bool,
}
impl CustomizeConnection<PgConnection, r2d2::Error> for ConnectionConfig {
fn on_acquire(&self, conn: &mut PgConnection) -> Result<(), r2d2::Error> {
use diesel::sql_query;
sql_query(format!(
"SET statement_timeout = {}",
self.statement_timeout * 1000
))
.execute(conn)
.map_err(r2d2::Error::QueryError)?;
if self.read_only {
sql_query("SET default_transaction_read_only = 't'")
.execute(conn)
.map_err(r2d2::Error::QueryError)?;
}
Ok(())
}
}
#[cfg(test)]
pub(crate) fn test_conn() -> PgConnection {
let conn = PgConnection::establish(&crate::env("TEST_DATABASE_URL")).unwrap();
conn.begin_test_transaction().unwrap();
conn
}
#[derive(Debug, Error)]
pub enum PoolError {
#[error(transparent)]
R2D2(#[from] r2d2::PoolError),
#[error("unhealthy database pool")]
UnhealthyPool,
}
}
table! {
users {
id -> Integer,
name -> Text,
}
}
fn main() {
let db_url = std::env::var("DATABASE_URL").unwrap();
let pool = self::db::DieselPool::new(&db_url, Default::default()).unwrap();
loop {
println!("Start loop");
let loop_start = Instant::now();
let conn = pool.get().unwrap();
println!("Took {} ms to get connection", loop_start.elapsed().as_millis());
let _: (i32, String) = users::table.first(&*conn).unwrap();
println!("Sleep 1s");
std::thread::sleep(Duration::from_secs(1));
println!("Finish sleep");
}
}
Using the following reproduction steps produces an application freeze:
DATABASE_URL=… cargo runsudo iptables -A OUTPUT -p tcp --dport 5432 -j DROP
What is the expected output?
I expect a panic due to the unwrap in line 199.
What is the actual output?
The output stops + the application seems to hang
Steps to reproduce
Using the following reproduction steps produces an application freeze:
DATABASE_URL=… cargo runsudo iptables -A OUTPUT -p tcp --dport 5432 -j ACCEPT
Checklist
- I have already looked over the issue tracker and the disussion forum for similar possible closed issues.
- This issue can be reproduced on Rust’s stable channel. (Your issue will be closed if this is not the case)
- This issue can be reproduced without requiring a third party crate
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 1
- Comments: 15 (9 by maintainers)
Commits related to this issue
- Fix #3016 This demonstrate a potential fix for #3016 by using the async libpq interface + busy polling. This allows us to abort polling as soon as we reach the timeout. I see multiple issues with thi... — committed to diesel-rs/diesel by weiznich 2 years ago
- Auto merge of #4645 - ferrous-systems:pa-tcp-user-timeout, r=Turbo87 Set `TCP_USER_TIMEOUT` to 15 seconds for database connections In the December 22, 2021 crates.io outage, we experienced full pack... — committed to rust-lang/crates.io by bors 2 years ago
I played around with
tcp_user_timeout, and indeed it allows to configure the overall timeout to whatever value we find acceptable in production. That sounds great, if you want to close the issue please do!By the way, I also remember trying
tcp_user_timeoutback then in my investigation, and I also apparently did something wrong when I tested it. Somehow reassuring I wasn’t alone in trying it wrong 😅Thanks for your comment here.
Diesel and r2d2 are two different crates. r2d2 already has an issue for this: https://github.com/sfackler/r2d2/issues/116. The short answer is: They cannot do much there, as there is no way to stop in the middle of executing stuff.
To be clear:
tcp_user_timeout=200is just an example. In the end you need to adjust that value to your needs, not just because I wrote that value works in my tests. I’ve chosen 200, just because I did not want to wait that long 🤷 . I’ve also verified now that this works just fine with larger value, even without changing something at the operating system level.To be clear here: There is really not much we can do here. You cannot just abort operations at operating system level, so if something is blocking it just blocks till it’s done. That’s how these things are designed. If you don’t want that you need to use an
asyncapproach. For diesel that would bediesel-async. That allow you to control such things in detail. And to be clear here: There is exactly one reason I haven’t closed that issue as “won’t fix”, works as designed yet. That’s because the crates.io folks do require a solution with sync diesel here for technical reasons.Just want to comment, that we are also users of diesel r2d2 postgres, and we are also affected by this issue! would love to have a fix upstream
I’ll check with the rest of the team and get back to you in the coming days!