diff --git a/sqlx-core/src/common/statement_cache.rs b/sqlx-core/src/common/statement_cache.rs index eb800ca623..0aad15354a 100644 --- a/sqlx-core/src/common/statement_cache.rs +++ b/sqlx-core/src/common/statement_cache.rs @@ -72,4 +72,8 @@ impl StatementCache { pub fn is_enabled(&self) -> bool { self.capacity() > 0 } + + pub fn iter(&self) -> impl Iterator { + self.inner.iter().map(|(_, v)| v) + } } diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs index 2b660b94b3..071ca8b95e 100644 --- a/sqlx-mysql/src/connection/executor.rs +++ b/sqlx-mysql/src/connection/executor.rs @@ -183,7 +183,11 @@ impl MySqlConnection { loop { // query response is a meta-packet which may be one of: // Ok, Err, ResultSet, or (unhandled) LocalInfileRequest - let mut packet = self.inner.stream.recv_packet().await?; + let mut packet = self.inner.stream.recv_packet().await.inspect_err(|_| { + // if a prepared statement vanished on the server side, we get an error here + // clear the statement cache in case the connection got reset to cause re-preparing + self.inner.cache_statement.clear(); + })?; if packet[0] == 0x00 || packet[0] == 0xff { // first packet in a query response is OK or ERR diff --git a/sqlx-mysql/src/connection/mod.rs b/sqlx-mysql/src/connection/mod.rs index 569ad32722..d5a23603e3 100644 --- a/sqlx-mysql/src/connection/mod.rs +++ b/sqlx-mysql/src/connection/mod.rs @@ -57,6 +57,19 @@ impl MySqlConnection { .status_flags .intersects(Status::SERVER_STATUS_IN_TRANS) } + + pub async fn nuke_cached_statements(&mut self) -> Result<(), Error> { + for (statement_id, _) in self.inner.cache_statement.iter() { + self.inner + .stream + .send_packet(StmtClose { + statement: *statement_id, + }) + .await?; + } + + Ok(()) + } } impl Debug for MySqlConnection { diff --git a/tests/mysql/mysql.rs b/tests/mysql/mysql.rs index 5d6a5ef233..fb07657bdc 100644 --- a/tests/mysql/mysql.rs +++ b/tests/mysql/mysql.rs @@ -54,6 +54,30 @@ async fn it_maths() -> anyhow::Result<()> { Ok(()) } +#[sqlx_macros::test] +async fn it_clears_statement_cache_on_error() -> anyhow::Result<()> { + setup_if_needed(); + + let query = "SELECT 1"; + + let mut conn = new::().await?; + let _ = sqlx::query(query).fetch_one(&mut conn).await?; + assert_eq!(1, conn.cached_statements_size()); + + // clear cached statements only on the server side + conn.nuke_cached_statements().await?; + assert_eq!(1, conn.cached_statements_size()); + + // one query fails as the statement is not cached server-side any more, client-side cache is cleared + assert!(sqlx::query(query).fetch_one(&mut conn).await.is_err()); + assert_eq!(0, conn.cached_statements_size()); + + // next query succeeds again + let _ = sqlx::query(query).fetch_one(&mut conn).await?; + + Ok(()) +} + #[sqlx_macros::test] async fn it_can_fail_at_querying() -> anyhow::Result<()> { let mut conn = new::().await?;