diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 2809dc0..beb7eb0 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -54,6 +54,38 @@ impl Recycler { eof: false, } } + + fn conn_return(&mut self, conn: Conn, pool_is_closed: bool) { + let mut exchange = self.inner.exchange.lock().unwrap(); + if pool_is_closed || exchange.available.len() >= self.pool_opts.active_bound() { + drop(exchange); + self.inner + .metrics + .discarded_superfluous_connection + .fetch_add(1, Ordering::Relaxed); + self.discard.push(conn.close_conn().boxed()); + } else { + self.inner + .metrics + .connection_returned_to_pool + .fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "hdrhistogram")] + self.inner + .metrics + .connection_active_duration + .lock() + .unwrap() + .saturating_record(conn.inner.active_since.elapsed().as_micros() as u64); + exchange.available.push_back(conn.into()); + self.inner + .metrics + .connections_in_pool + .store(exchange.available.len(), Ordering::Relaxed); + if let Some(w) = exchange.waiting.pop() { + w.wake(); + } + } + } } impl Future for Recycler { @@ -62,44 +94,6 @@ impl Future for Recycler { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut close = self.inner.close.load(Ordering::Acquire); - macro_rules! conn_return { - ($self:ident, $conn:ident, $pool_is_closed: expr) => {{ - let mut exchange = $self.inner.exchange.lock().unwrap(); - if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() { - drop(exchange); - $self - .inner - .metrics - .discarded_superfluous_connection - .fetch_add(1, Ordering::Relaxed); - $self.discard.push($conn.close_conn().boxed()); - } else { - $self - .inner - .metrics - .connection_returned_to_pool - .fetch_add(1, Ordering::Relaxed); - #[cfg(feature = "hdrhistogram")] - $self - .inner - .metrics - .connection_active_duration - .lock() - .unwrap() - .saturating_record($conn.inner.active_since.elapsed().as_micros() as u64); - exchange.available.push_back($conn.into()); - $self - .inner - .metrics - .connections_in_pool - .store(exchange.available.len(), Ordering::Relaxed); - if let Some(w) = exchange.waiting.pop() { - w.wake(); - } - } - }}; - } - macro_rules! conn_decision { ($self:ident, $conn:ident) => { if $conn.inner.stream.is_none() || $conn.inner.disconnected { @@ -132,7 +126,7 @@ impl Future for Recycler { .fetch_add(1, Ordering::Relaxed); $self.reset.push($conn.reset_for_pool().boxed()); } else { - conn_return!($self, $conn, false); + $self.conn_return($conn, false); } }; } @@ -165,9 +159,12 @@ impl Future for Recycler { // if we've been asked to close, reclaim any idle connections if close || self.eof { - while let Some(IdlingConn { conn, .. }) = - self.inner.exchange.lock().unwrap().available.pop_front() - { + loop { + let Some(IdlingConn { conn, .. }) = + self.inner.exchange.lock().unwrap().available.pop_front() + else { + break; + }; assert!(conn.inner.pool.is_none()); conn_decision!(self, conn); } @@ -199,7 +196,7 @@ impl Future for Recycler { loop { match Pin::new(&mut self.reset).poll_next(cx) { Poll::Pending | Poll::Ready(None) => break, - Poll::Ready(Some(Ok(conn))) => conn_return!(self, conn, close), + Poll::Ready(Some(Ok(conn))) => self.conn_return(conn, close), Poll::Ready(Some(Err(e))) => { // an error during reset. // replace with a new connection