From aa512d04bd8149484a0b8295b315baa60f939b72 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 27 May 2025 20:01:31 +0800 Subject: [PATCH 1/5] transaction: Handle commit ts expired error Signed-off-by: Ping Yu --- src/transaction/transaction.rs | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e153889a..53834b47 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -10,8 +10,8 @@ use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; -use log::debug; use log::warn; +use log::{debug, info}; use tokio::time::Duration; use crate::backoff::Backoff; @@ -1260,7 +1260,7 @@ impl Committer { // FIXME: min_commit_ts == 0 => fallback to normal 2PC min_commit_ts.unwrap() } else { - match self.commit_primary().await { + match self.commit_primary_with_retry().await { Ok(commit_ts) => commit_ts, Err(e) => { return if self.undetermined { @@ -1377,6 +1377,30 @@ impl Committer { Ok(commit_version) } + async fn commit_primary_with_retry(&mut self) -> Result { + loop { + match self.commit_primary().await { + Ok(commit_version) => return Ok(commit_version), + Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { + Some(Error::KeyError(key_err)) => { + if let Some(commit_ts_expired) = key_err.commit_ts_expired { + info!( + "commit primary meet commit_ts_expired error: {:?}", + commit_ts_expired + ); + continue; + } else { + return Err(Error::KeyError(key_err)); + } + } + Some(err) => return Err(err), + None => unreachable!(), + }, + Err(err) => return Err(err), + } + } + } + async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> { debug!("committing secondary"); let mutations_len = self.mutations.len(); From c2c927cb49711efee4b6e5d215bb30b3b3a6eb7c Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 27 May 2025 21:36:02 +0800 Subject: [PATCH 2/5] debug log Signed-off-by: Ping Yu --- src/transaction/transaction.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 53834b47..275788b8 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1379,7 +1379,11 @@ impl Committer { async fn commit_primary_with_retry(&mut self) -> Result { loop { - match self.commit_primary().await { + let res = self.commit_primary().await; + if let Err(e) = &res { + info!("commit primary error: {:?}", e); + } + match res { Ok(commit_version) => return Ok(commit_version), Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { Some(Error::KeyError(key_err)) => { From cbf4052ae1a874571ac90f042b21c6f9639501bc Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 28 May 2025 11:13:49 +0800 Subject: [PATCH 3/5] Fix string error Signed-off-by: Ping Yu --- src/common/errors.rs | 8 ++++++-- src/kv/mod.rs | 2 +- src/region_cache.rs | 6 +++--- src/store/errors.rs | 7 +++---- src/transaction/transaction.rs | 33 +++++++++++++++++++++++++-------- tests/common/ctl.rs | 6 +++--- 6 files changed, 41 insertions(+), 21 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index 1798be70..163b784f 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -9,9 +9,13 @@ use crate::region::RegionVerId; use crate::BoundRange; /// An error originating from the TiKV client or dependencies. -#[derive(Debug, Error)] +#[derive(Debug, Error, Default)] #[allow(clippy::large_enum_variant)] pub enum Error { + /// Placeholder for no error (taken away). + #[default] + #[error("No error")] + NoError, /// Feature is not implemented. #[error("Unimplemented feature")] Unimplemented, @@ -103,7 +107,7 @@ pub enum Error { #[error("{}", message)] InternalError { message: String }, #[error("{0}")] - StringError(String), + OtherError(String), #[error("PessimisticLock error: {:?}", inner)] PessimisticLockError { inner: Box, diff --git a/src/kv/mod.rs b/src/kv/mod.rs index a10b6fed..60c2a064 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -13,7 +13,7 @@ pub use key::Key; pub use kvpair::KvPair; pub use value::Value; -struct HexRepr<'a>(pub &'a [u8]); +pub struct HexRepr<'a>(pub &'a [u8]); impl fmt::Display for HexRepr<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/src/region_cache.rs b/src/region_cache.rs index cdf30c99..a76864ba 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -117,7 +117,7 @@ impl RegionCache { return self.read_through_region_by_id(id).await; } } - Err(Error::StringError(format!( + Err(Error::OtherError(format!( "Concurrent PD requests failed for {MAX_RETRY_WAITING_CONCURRENT_REQUEST} times" ))) } @@ -315,7 +315,7 @@ mod test { .filter(|(_, r)| r.contains(&key.clone().into())) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) } async fn get_region_by_id( @@ -330,7 +330,7 @@ mod test { .filter(|(id, _)| id == &®ion_id) .map(|(_, r)| r.clone()) .next() - .ok_or_else(|| Error::StringError("MockRetryClient: region not found".to_owned())) + .ok_or_else(|| Error::OtherError("MockRetryClient: region not found".to_owned())) } async fn get_store( diff --git a/src/store/errors.rs b/src/store/errors.rs index c9d6c774..e15d464d 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -1,7 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use std::fmt::Display; - use crate::proto::kvrpcpb; use crate::Error; @@ -162,11 +160,12 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse { } } -impl HasKeyErrors for Result { +impl HasKeyErrors for Result { fn key_errors(&mut self) -> Option> { match self { Ok(x) => x.key_errors(), - Err(e) => Some(vec![Error::StringError(e.to_string())]), + Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)), + Err(e) => Some(vec![std::mem::take(e)]), } } } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 275788b8..ccf76b20 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -10,13 +10,16 @@ use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; +use log::debug; +use log::error; +use log::info; use log::warn; -use log::{debug, info}; use tokio::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::codec::ApiV1TxnCodec; +use crate::kv::HexRepr; use crate::pd::PdClient; use crate::pd::PdRpcClient; use crate::proto::kvrpcpb; @@ -1246,7 +1249,7 @@ impl Committer { let min_commit_ts = self.prewrite().await?; fail_point!("after-prewrite", |_| { - Err(Error::StringError( + Err(Error::OtherError( "failpoint: after-prewrite return error".to_owned(), )) }); @@ -1387,11 +1390,25 @@ impl Committer { Ok(commit_version) => return Ok(commit_version), Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { Some(Error::KeyError(key_err)) => { - if let Some(commit_ts_expired) = key_err.commit_ts_expired { - info!( - "commit primary meet commit_ts_expired error: {:?}", - commit_ts_expired - ); + if let Some(expired) = key_err.commit_ts_expired { + // Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go + info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}", + self.start_version.version()); + + let primary_key = self.primary_key.as_ref().unwrap(); + if primary_key != expired.key.as_ref() { + error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}", + self.start_version.version(), HexRepr(&expired.key), primary_key); + return Err(Error::OtherError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string())); + } + + // Do not retry for a txn which has a too large min_commit_ts. + // 3600000 << 18 = 943718400000 + if expired.min_commit_ts - expired.attempted_commit_ts > 943718400000 { + let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}", + expired.min_commit_ts, expired.attempted_commit_ts); + return Err(Error::OtherError(msg)); + } continue; } else { return Err(Error::KeyError(key_err)); @@ -1422,7 +1439,7 @@ impl Committer { let percent = percent.unwrap().parse::().unwrap(); new_len = mutations_len * percent / 100; if new_len == 0 { - Err(Error::StringError( + Err(Error::OtherError( "failpoint: before-commit-secondary return error".to_owned(), )) } else { diff --git a/tests/common/ctl.rs b/tests/common/ctl.rs index 092c32bb..32781405 100644 --- a/tests/common/ctl.rs +++ b/tests/common/ctl.rs @@ -10,16 +10,16 @@ use crate::common::Result; pub async fn get_region_count() -> Result { let res = reqwest::get(format!("http://{}/pd/api/v1/regions", pd_addrs()[0])) .await - .map_err(|e| Error::StringError(e.to_string()))?; + .map_err(|e| Error::OtherError(e.to_string()))?; let body = res .text() .await - .map_err(|e| Error::StringError(e.to_string()))?; + .map_err(|e| Error::OtherError(e.to_string()))?; let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| { panic!("invalid body: {:?}, error: {:?}", body, err); }); value["count"] .as_u64() - .ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned())) + .ok_or_else(|| Error::OtherError("pd region count does not return an integer".to_owned())) } From 7b32254ae73648b25e2e42fa0581d080592959a7 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 28 May 2025 16:26:44 +0800 Subject: [PATCH 4/5] polish Signed-off-by: Ping Yu --- src/transaction/transaction.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index ccf76b20..ddc32bf9 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1368,6 +1368,11 @@ impl Committer { .plan(); plan.execute() .inspect_err(|e| { + debug!( + "commit primary error: {:?}, start_ts: {}", + e, + self.start_version.version() + ); // We don't know whether the transaction is committed or not if we fail to receive // the response. Then, we mark the transaction as undetermined and propagate the // error to the user. @@ -1382,11 +1387,7 @@ impl Committer { async fn commit_primary_with_retry(&mut self) -> Result { loop { - let res = self.commit_primary().await; - if let Err(e) = &res { - info!("commit primary error: {:?}", e); - } - match res { + match self.commit_primary().await { Ok(commit_version) => return Ok(commit_version), Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { Some(Error::KeyError(key_err)) => { @@ -1404,7 +1405,11 @@ impl Committer { // Do not retry for a txn which has a too large min_commit_ts. // 3600000 << 18 = 943718400000 - if expired.min_commit_ts - expired.attempted_commit_ts > 943718400000 { + if expired + .min_commit_ts + .saturating_sub(expired.attempted_commit_ts) + > 943718400000 + { let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}", expired.min_commit_ts, expired.attempted_commit_ts); return Err(Error::OtherError(msg)); From 62044faed92a54d4c3eee77959e5fc69a36ff7d3 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 28 May 2025 17:08:32 +0800 Subject: [PATCH 5/5] remove no error Signed-off-by: Ping Yu --- src/common/errors.rs | 6 +----- src/store/errors.rs | 5 ++++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index 163b784f..c1ed78bb 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -9,13 +9,9 @@ use crate::region::RegionVerId; use crate::BoundRange; /// An error originating from the TiKV client or dependencies. -#[derive(Debug, Error, Default)] +#[derive(Debug, Error)] #[allow(clippy::large_enum_variant)] pub enum Error { - /// Placeholder for no error (taken away). - #[default] - #[error("No error")] - NoError, /// Feature is not implemented. #[error("Unimplemented feature")] Unimplemented, diff --git a/src/store/errors.rs b/src/store/errors.rs index e15d464d..e9fc56a9 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -165,7 +165,10 @@ impl HasKeyErrors for Result { match self { Ok(x) => x.key_errors(), Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)), - Err(e) => Some(vec![std::mem::take(e)]), + Err(e) => Some(vec![std::mem::replace( + e, + Error::OtherError("".to_string()), // placeholder, no use. + )]), } } }