From c60b75667ffcbd128d6c99c201c63d58a1194432 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Fri, 6 Jun 2025 02:21:59 -0700 Subject: [PATCH 1/5] chore(cubestore): Set metastore deletion batch size using CUBESTORE_SNAPSHOTS_DELETION_BATCH_SIZE --- rust/cubestore/cubestore/src/config/mod.rs | 13 +++++++++++++ .../cubestore/src/metastore/rocks_fs.rs | 17 +++++++++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 4a7172d3546f7..a07e9aa443c6a 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -518,6 +518,8 @@ pub trait ConfigObj: DIService { fn dump_dir(&self) -> &Option; + fn snapshots_deletion_batch_size(&self) -> u64; + fn minimum_metastore_snapshots_count(&self) -> u64; fn metastore_snapshots_lifetime(&self) -> u64; @@ -630,6 +632,7 @@ pub struct ConfigObjImpl { pub drop_ws_processing_messages_after_secs: u64, pub drop_ws_complete_messages_after_secs: u64, pub skip_kafka_parsing_errors: bool, + pub snapshots_deletion_batch_size: u64, pub minimum_metastore_snapshots_count: u64, pub metastore_snapshots_lifetime: u64, pub minimum_cachestore_snapshots_count: u64, @@ -953,6 +956,10 @@ impl ConfigObj for ConfigObjImpl { &self.dump_dir } + fn snapshots_deletion_batch_size(&self) -> u64 { + self.snapshots_deletion_batch_size + } + fn minimum_metastore_snapshots_count(&self) -> u64 { self.minimum_metastore_snapshots_count } @@ -1486,6 +1493,11 @@ impl Config { 10 * 60, ), skip_kafka_parsing_errors: env_parse("CUBESTORE_SKIP_KAFKA_PARSING_ERRORS", false), + // Presently, not useful to make more than upload_concurrency times constant + snapshots_deletion_batch_size: env_parse( + "CUBESTORE_SNAPSHOTS_DELETION_BATCH_SIZE", + 80, + ), minimum_metastore_snapshots_count: env_parse( "CUBESTORE_MINIMUM_METASTORE_SNAPSHOTS_COUNT", 5, @@ -1652,6 +1664,7 @@ impl Config { drop_ws_processing_messages_after_secs: 60, drop_ws_complete_messages_after_secs: 10, skip_kafka_parsing_errors: false, + snapshots_deletion_batch_size: 80, minimum_metastore_snapshots_count: 3, metastore_snapshots_lifetime: 24 * 3600, minimum_cachestore_snapshots_count: 3, diff --git a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs index 109585ca85668..fbd62e09c14f0 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs @@ -55,7 +55,8 @@ pub struct BaseRocksStoreFs { name: &'static str, minimum_snapshots_count: u64, snapshots_lifetime: u64, - remote_files_cleanup_batch_size: u64, + // A copy of the upload-concurrency config -- we multiply this for our deletes. + snapshots_deletion_batch_size: u64, } impl BaseRocksStoreFs { @@ -65,13 +66,13 @@ impl BaseRocksStoreFs { ) -> Arc { let minimum_snapshots_count = config.minimum_metastore_snapshots_count(); let snapshots_lifetime = config.metastore_snapshots_lifetime(); - let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size(); + let snapshots_deletion_batch_size = config.snapshots_deletion_batch_size(); Arc::new(Self { remote_fs, name: "metastore", minimum_snapshots_count, snapshots_lifetime, - remote_files_cleanup_batch_size, + snapshots_deletion_batch_size, }) } pub fn new_for_cachestore( @@ -80,13 +81,13 @@ impl BaseRocksStoreFs { ) -> Arc { let minimum_snapshots_count = config.minimum_cachestore_snapshots_count(); let snapshots_lifetime = config.cachestore_snapshots_lifetime(); - let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size(); + let snapshots_deletion_batch_size = config.snapshots_deletion_batch_size(); Arc::new(Self { remote_fs, name: "cachestore", minimum_snapshots_count, snapshots_lifetime, - remote_files_cleanup_batch_size, + snapshots_deletion_batch_size, }) } @@ -145,8 +146,8 @@ impl BaseRocksStoreFs { name: &str, ) -> Result>, CubeError> { let existing_metastore_files = remote_fs.list(format!("{}-", name)).await?; - // Log a debug statement so that we can rule out the filename list itself being too large for memory. - log::debug!( + // Log an info statement so that we can rule out the filename list itself being too large for memory. + log::info!( "Listed existing {} files, count = {}", name, existing_metastore_files.len() @@ -215,7 +216,7 @@ impl BaseRocksStoreFs { } for batch in to_delete.chunks( - self.remote_files_cleanup_batch_size + self.snapshots_deletion_batch_size .try_into() .unwrap_or(usize::MAX), ) { From 93b2ba521e54f1d4977b235c3e927563e9965757 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Sat, 7 Jun 2025 00:29:09 -0700 Subject: [PATCH 2/5] perf(cubestore): Make S3RemoteFs::list use less intermediate memory --- rust/cubestore/cubestore/src/remotefs/s3.rs | 81 ++++++++++++--------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index a95297e34535f..477298048d189 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -306,46 +306,26 @@ impl RemoteFs for S3RemoteFs { } async fn list(&self, remote_prefix: String) -> Result, CubeError> { - Ok(self - .list_with_metadata(remote_prefix) - .await? - .into_iter() - .map(|f| f.remote_path) - .collect::>()) + let leading_subpath = self.leading_subpath_regex(); + self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| { + Ok(Self::object_key_to_remote_path(&leading_subpath, &o.key)) + }) + .await } async fn list_with_metadata( &self, remote_prefix: String, ) -> Result, CubeError> { - let path = self.s3_path(&remote_prefix); - let bucket = self.bucket.load(); - let list = bucket.list(path, None).await?; - let pages_count = list.len(); - app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( - pages_count as i64, - Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]), - ); - if pages_count > 100 { - log::warn!("S3 list returned more than 100 pages: {}", pages_count); - } - let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap(); - let result = list - .iter() - .flat_map(|res| { - res.contents - .iter() - .map(|o| -> Result { - Ok(RemoteFile { - remote_path: leading_slash.replace(&o.key, NoExpand("")).to_string(), - updated: DateTime::parse_from_rfc3339(&o.last_modified)? - .with_timezone(&Utc), - file_size: o.size, - }) - }) + let leading_subpath = self.leading_subpath_regex(); + self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| { + Ok(RemoteFile { + remote_path: Self::object_key_to_remote_path(&leading_subpath, &o.key), + updated: DateTime::parse_from_rfc3339(&o.last_modified)?.with_timezone(&Utc), + file_size: o.size, }) - .collect::, _>>()?; - Ok(result) + }) + .await } async fn local_path(&self) -> Result { @@ -359,7 +339,42 @@ impl RemoteFs for S3RemoteFs { } } +struct LeadingSubpath(Regex); + impl S3RemoteFs { + fn leading_subpath_regex(&self) -> LeadingSubpath { + LeadingSubpath(Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap()) + } + + fn object_key_to_remote_path(leading_subpath: &LeadingSubpath, o_key: &String) -> String { + leading_subpath.0.replace(o_key, NoExpand("")).to_string() + } + + async fn list_with_metadata_and_map( + &self, + remote_prefix: String, + f: F, + ) -> Result, CubeError> + where + F: FnMut(s3::serde_types::Object) -> Result + Copy, + { + let path = self.s3_path(&remote_prefix); + let bucket = self.bucket.load(); + let list = bucket.list(path, None).await?; + let pages_count = list.len(); + app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( + pages_count as i64, + Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]), + ); + if pages_count > 100 { + log::warn!("S3 list returned more than 100 pages: {}", pages_count); + } + let result = list + .into_iter() + .flat_map(|res| res.contents.into_iter().map(f)) + .collect::, _>>()?; + Ok(result) + } fn s3_path(&self, remote_path: &str) -> String { format!( "{}{}", From 858e5ebf4f45268660d7f4f8492a97f50ecd4488 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Sat, 7 Jun 2025 01:05:04 -0700 Subject: [PATCH 3/5] perf(cubestore): Make GCSRemoteFs::list use less intermediate memory --- rust/cubestore/cubestore/src/remotefs/gcs.rs | 77 +++++++++++++------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/rust/cubestore/cubestore/src/remotefs/gcs.rs b/rust/cubestore/cubestore/src/remotefs/gcs.rs index aea7ac9a7285b..dbfaa2ce5f54d 100644 --- a/rust/cubestore/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/cubestore/src/remotefs/gcs.rs @@ -261,31 +261,64 @@ impl RemoteFs for GCSRemoteFs { } async fn list(&self, remote_prefix: String) -> Result, CubeError> { - Ok(self - .list_with_metadata(remote_prefix) - .await? - .into_iter() - .map(|f| f.remote_path) - .collect::>()) + let leading_subpath = self.leading_subpath_regex(); + self.list_with_metadata_and_map(remote_prefix, |obj: Object| { + Self::object_key_to_remote_path(&leading_subpath, &obj.name) + }) + .await } async fn list_with_metadata( &self, remote_prefix: String, ) -> Result, CubeError> { + let leading_subpath = self.leading_subpath_regex(); + self.list_with_metadata_and_map(remote_prefix, |obj: Object| RemoteFile { + remote_path: Self::object_key_to_remote_path(&leading_subpath, &obj.name), + updated: obj.updated, + file_size: obj.size, + }) + .await + } + + async fn local_path(&self) -> Result { + Ok(self.dir.to_str().unwrap().to_owned()) + } + + async fn local_file(&self, remote_path: String) -> Result { + let buf = self.dir.join(remote_path); + fs::create_dir_all(buf.parent().unwrap()).await?; + Ok(buf.to_str().unwrap().to_string()) + } +} + +struct LeadingSubpath(Regex); + +impl GCSRemoteFs { + fn leading_subpath_regex(&self) -> LeadingSubpath { + LeadingSubpath(Regex::new(format!("^{}", self.gcs_path("")).as_str()).unwrap()) + } + + fn object_key_to_remote_path(leading_subpath: &LeadingSubpath, obj_name: &String) -> String { + leading_subpath + .0 + .replace(&obj_name, NoExpand("")) + .to_string() + } + + async fn list_with_metadata_and_map( + &self, + remote_prefix: String, + f: F, + ) -> Result, CubeError> + where + F: FnMut(Object) -> T + Copy, + { let prefix = self.gcs_path(&remote_prefix); let list = Object::list_prefix(self.bucket.as_str(), prefix.as_str()).await?; - let leading_slash = Regex::new(format!("^{}", self.gcs_path("")).as_str()).unwrap(); let result = list - .map(|objects| -> Result, CubeError> { - Ok(objects? - .into_iter() - .map(|obj| RemoteFile { - remote_path: leading_slash.replace(&obj.name, NoExpand("")).to_string(), - updated: obj.updated.clone(), - file_size: obj.size, - }) - .collect()) + .map(|objects| -> Result, CubeError> { + Ok(objects?.into_iter().map(f).collect()) }) .collect::>() .await @@ -310,18 +343,6 @@ impl RemoteFs for GCSRemoteFs { Ok(result) } - async fn local_path(&self) -> Result { - Ok(self.dir.to_str().unwrap().to_owned()) - } - - async fn local_file(&self, remote_path: String) -> Result { - let buf = self.dir.join(remote_path); - fs::create_dir_all(buf.parent().unwrap()).await?; - Ok(buf.to_str().unwrap().to_string()) - } -} - -impl GCSRemoteFs { fn gcs_path(&self, remote_path: &str) -> String { format!( "{}/{}", From cf4b6b34c0f1e2b1471f3c45b7140dadab48a5fa Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Sun, 8 Jun 2025 04:30:47 -0700 Subject: [PATCH 4/5] perf(cubestore): Make S3RemoteFs consume less memory when making listings --- rust/cubestore/cubestore/src/remotefs/s3.rs | 38 +++++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 477298048d189..4dac23ea2d6cd 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -307,7 +307,7 @@ impl RemoteFs for S3RemoteFs { async fn list(&self, remote_prefix: String) -> Result, CubeError> { let leading_subpath = self.leading_subpath_regex(); - self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| { + self.list_objects_and_map(remote_prefix, |o: s3::serde_types::Object| { Ok(Self::object_key_to_remote_path(&leading_subpath, &o.key)) }) .await @@ -318,7 +318,7 @@ impl RemoteFs for S3RemoteFs { remote_prefix: String, ) -> Result, CubeError> { let leading_subpath = self.leading_subpath_regex(); - self.list_with_metadata_and_map(remote_prefix, |o: s3::serde_types::Object| { + self.list_objects_and_map(remote_prefix, |o: s3::serde_types::Object| { Ok(RemoteFile { remote_path: Self::object_key_to_remote_path(&leading_subpath, &o.key), updated: DateTime::parse_from_rfc3339(&o.last_modified)?.with_timezone(&Utc), @@ -350,18 +350,37 @@ impl S3RemoteFs { leading_subpath.0.replace(o_key, NoExpand("")).to_string() } - async fn list_with_metadata_and_map( + async fn list_objects_and_map( &self, remote_prefix: String, - f: F, + mut f: F, ) -> Result, CubeError> where F: FnMut(s3::serde_types::Object) -> Result + Copy, { let path = self.s3_path(&remote_prefix); let bucket = self.bucket.load(); - let list = bucket.list(path, None).await?; - let pages_count = list.len(); + let mut mapped_results = Vec::new(); + let mut continuation_token = None; + let mut pages_count: i64 = 0; + + loop { + let (result, _) = bucket + .list_page(path.clone(), None, continuation_token, None, None) + .await?; + + pages_count += 1; + + for obj in result.contents.into_iter() { + mapped_results.push(f(obj)?); + } + + continuation_token = result.next_continuation_token; + if continuation_token.is_none() { + break; + } + } + app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( pages_count as i64, Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]), @@ -369,12 +388,9 @@ impl S3RemoteFs { if pages_count > 100 { log::warn!("S3 list returned more than 100 pages: {}", pages_count); } - let result = list - .into_iter() - .flat_map(|res| res.contents.into_iter().map(f)) - .collect::, _>>()?; - Ok(result) + Ok(mapped_results) } + fn s3_path(&self, remote_path: &str) -> String { format!( "{}{}", From 9b4f05ff5b356c940d3d6819bcbecdf53fd37d82 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Sun, 8 Jun 2025 05:41:37 -0700 Subject: [PATCH 5/5] feat(cubestore): Add ExtendedRemoteFs interface and make S3 use priority queue based implementation --- rust/cubestore/Cargo.lock | 25 ++- rust/cubestore/cubestore/Cargo.toml | 1 + rust/cubestore/cubestore/src/config/mod.rs | 8 +- rust/cubestore/cubestore/src/metastore/mod.rs | 15 +- .../cubestore/src/metastore/rocks_fs.rs | 200 +++++++++++++----- rust/cubestore/cubestore/src/remotefs/gcs.rs | 7 +- .../cubestore/cubestore/src/remotefs/minio.rs | 10 +- rust/cubestore/cubestore/src/remotefs/mod.rs | 27 ++- .../cubestore/cubestore/src/remotefs/queue.rs | 24 ++- rust/cubestore/cubestore/src/remotefs/s3.rs | 56 ++++- rust/cubestore/cubestore/src/sql/mod.rs | 5 +- 11 files changed, 305 insertions(+), 73 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 1df7d0ec3f1e5..6733a9de3c1a6 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -257,6 +257,28 @@ dependencies = [ "slab", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite 0.2.14", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "async-task" version = "1.3.1" @@ -1163,6 +1185,7 @@ dependencies = [ "arc-swap", "async-compression", "async-std", + "async-stream", "async-trait", "base64 0.13.0", "bigdecimal 0.2.0", @@ -5509,7 +5532,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand 0.8.4", "static_assertions", ] diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index cf20f802539bd..33f824ba84315 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -52,6 +52,7 @@ chrono-tz = "0.8.2" lazy_static = "1.4.0" mockall = "0.8.1" async-std = "0.99" +async-stream = "0.3.6" itertools = "0.11.0" bigdecimal = { version = "0.2.0", features = ["serde"] } # Right now, it's not possible to use the 0.33 release because it has bugs diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index a07e9aa443c6a..5bc27d27a5021 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -28,7 +28,7 @@ use crate::remotefs::gcs::GCSRemoteFs; use crate::remotefs::minio::MINIORemoteFs; use crate::remotefs::queue::QueueRemoteFs; use crate::remotefs::s3::S3RemoteFs; -use crate::remotefs::{LocalDirRemoteFs, RemoteFs}; +use crate::remotefs::{ExtendedRemoteFs, LocalDirRemoteFs, RemoteFs}; use crate::scheduler::SchedulerImpl; use crate::sql::cache::SqlResultCache; use crate::sql::{SqlService, SqlServiceImpl}; @@ -1907,7 +1907,8 @@ impl Config { self.injector .register("cachestore_fs", async move |i| { // TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call - let original_remote_fs = i.get_service("original_remote_fs").await; + let original_remote_fs: Arc = + i.get_service("original_remote_fs").await; let arc: Arc = BaseRocksStoreFs::new_for_cachestore( original_remote_fs, i.get_service_typed().await, @@ -1982,7 +1983,8 @@ impl Config { self.injector .register("metastore_fs", async move |i| { // TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call - let original_remote_fs = i.get_service("original_remote_fs").await; + let original_remote_fs: Arc = + i.get_service("original_remote_fs").await; let arc: Arc = BaseRocksStoreFs::new_for_metastore( original_remote_fs, i.get_service_typed().await, diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 5da496af1fcbd..f96848ce16ab6 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -6546,8 +6546,9 @@ mod tests { #[tokio::test] async fn delete_old_snapshots() { + let metastore_snapshots_lifetime_secs = 1; let config = Config::test("delete_old_snapshots").update_config(|mut obj| { - obj.metastore_snapshots_lifetime = 1; + obj.metastore_snapshots_lifetime = metastore_snapshots_lifetime_secs; obj.minimum_metastore_snapshots_count = 2; obj }); @@ -6616,14 +6617,22 @@ mod tests { .await .unwrap(); - assert_eq!(uploaded3.len(), 3); + assert_eq!( + uploaded3.len(), + 3, + "uploaded3 keys: {}", + uploaded3.keys().join(", ") + ); meta_store .create_schema("foo4".to_string(), false) .await .unwrap(); - tokio::time::sleep(Duration::from_millis(1100)).await; + tokio::time::sleep(Duration::from_millis( + metastore_snapshots_lifetime_secs * 1000 + 100, + )) + .await; meta_store.upload_check_point().await.unwrap(); let uploaded4 = diff --git a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs index fbd62e09c14f0..489061f828e28 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_fs.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_fs.rs @@ -1,14 +1,15 @@ use crate::config::ConfigObj; use crate::metastore::snapshot_info::SnapshotInfo; use crate::metastore::{RocksStore, RocksStoreDetails, WriteBatchContainer}; -use crate::remotefs::RemoteFs; +use crate::remotefs::ExtendedRemoteFs; use crate::CubeError; use async_trait::async_trait; use datafusion::cube_ext; use futures::future::join_all; +use futures::StreamExt; use log::{error, info}; use regex::Regex; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, BinaryHeap, HashSet}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; @@ -51,7 +52,7 @@ pub trait MetaStoreFs: Send + Sync { #[derive(Clone)] pub struct BaseRocksStoreFs { - remote_fs: Arc, + remote_fs: Arc, name: &'static str, minimum_snapshots_count: u64, snapshots_lifetime: u64, @@ -61,7 +62,7 @@ pub struct BaseRocksStoreFs { impl BaseRocksStoreFs { pub fn new_for_metastore( - remote_fs: Arc, + remote_fs: Arc, config: Arc, ) -> Arc { let minimum_snapshots_count = config.minimum_metastore_snapshots_count(); @@ -76,7 +77,7 @@ impl BaseRocksStoreFs { }) } pub fn new_for_cachestore( - remote_fs: Arc, + remote_fs: Arc, config: Arc, ) -> Arc { let minimum_snapshots_count = config.minimum_cachestore_snapshots_count(); @@ -101,7 +102,7 @@ impl BaseRocksStoreFs { Ok(meta_store_path) } - pub fn remote_fs(&self) -> Arc { + pub fn remote_fs(&self) -> Arc { self.remote_fs.clone() } @@ -140,11 +141,12 @@ impl BaseRocksStoreFs { Ok(upload_results) } - // Exposed for tests + // Currently, no longer used except by tests. + #[cfg(test)] pub async fn list_files_by_snapshot( - remote_fs: &dyn RemoteFs, + remote_fs: &dyn ExtendedRemoteFs, name: &str, - ) -> Result>, CubeError> { + ) -> Result>, CubeError> { let existing_metastore_files = remote_fs.list(format!("{}-", name)).await?; // Log an info statement so that we can rule out the filename list itself being too large for memory. log::info!( @@ -152,7 +154,7 @@ impl BaseRocksStoreFs { name, existing_metastore_files.len() ); - let mut snapshot_map = HashMap::>::new(); + let mut snapshot_map = std::collections::HashMap::>::new(); for existing in existing_metastore_files.into_iter() { let path = existing.split("/").nth(0).map(|p| { u128::from_str( @@ -171,21 +173,29 @@ impl BaseRocksStoreFs { Ok(snapshot_map) } - pub async fn delete_old_snapshots(&self) -> Result, CubeError> { - let candidates_map = - Self::list_files_by_snapshot(self.remote_fs.as_ref(), &self.name).await?; + fn metastore_file_snapshot_number( + remote_prefix: &String, + path_to_file_in_metastore: &String, + ) -> Option { + let p = path_to_file_in_metastore.split("/").nth(0)?; + u128::from_str( + &p.replace(remote_prefix, "") + .replace("-index-logs", "") + .replace("-logs", ""), + ) + .ok() + } - let lifetime_ms = (self.snapshots_lifetime as u128) * 1000; - let min_snapshots_count = self.minimum_snapshots_count as usize; + pub async fn delete_old_snapshots(&self) -> Result<(), CubeError> { + // We do two passes, to avoid building a giant list of metastore files that might cause + // memory exhaustion. The first pass figures out what snapshots we want to delete: all but + // the `min_snapshots_count` most recent, but only those before `cutoff_time_ms`. - // snapshots_list sorted by oldest first. - let mut snapshots_list: Vec = candidates_map.keys().cloned().collect::>(); - snapshots_list.sort_unstable(); + // We assume `list_by_page` does not stream file names in order. - if snapshots_list.len() <= min_snapshots_count { - return Ok(vec![]); - } - snapshots_list.truncate(snapshots_list.len() - min_snapshots_count); + let lifetime_ms = (self.snapshots_lifetime as u128) * 1000; + // Force min_snapshots_count to be nonzero. + let min_snapshots_count: usize = Ord::max(1, self.minimum_snapshots_count as usize); let cutoff_time_ms: u128 = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -193,47 +203,131 @@ impl BaseRocksStoreFs { .as_millis() - lifetime_ms; - while !snapshots_list.is_empty() && *snapshots_list.last().unwrap() >= cutoff_time_ms { - snapshots_list.pop(); - } + let remote_prefix = format!("{}-", &self.name); + + // A priority queue with element uniqueness maintained by `snapshots_hash`, which has the + // same set of values. Contains the top `min_snapshots_count` values. + let mut snapshots_priority_queue = BinaryHeap::>::new(); + let mut snapshots_hash = HashSet::::new(); + + let mut deletable_snapshot_present = false; + let mut files_count: i64 = 0; + { + let mut page_stream = self.remote_fs.list_by_page(remote_prefix.clone()).await?; - let snapshots_list = snapshots_list; + while let Some(names) = StreamExt::next(&mut page_stream).await { + let existing_metastore_files = names?; + files_count += existing_metastore_files.len() as i64; - if snapshots_list.is_empty() { - // Avoid empty join_all, iteration, etc. - return Ok(vec![]); + for existing in existing_metastore_files { + let Some(millis) = + Self::metastore_file_snapshot_number(&remote_prefix, &existing) else { continue; }; + + if snapshots_hash.contains(&millis) { + // Maintains uniqueness in snapshots_priority_queue. + continue; + } + + if snapshots_priority_queue.len() < min_snapshots_count { + snapshots_priority_queue.push(std::cmp::Reverse(millis)); + snapshots_hash.insert(millis); + continue; + } + + match snapshots_priority_queue.peek() { + None => { + unreachable!("snapshots_priority_queue.peek() returned None with queue length {}, min_snapshots_count {} (should be positive)", snapshots_priority_queue.len(), min_snapshots_count); + } + Some(std::cmp::Reverse(min_val)) => { + let min_val = *min_val; + if millis > min_val { + snapshots_priority_queue.pop(); + snapshots_hash.remove(&min_val); + snapshots_hash.insert(millis); + snapshots_priority_queue.push(std::cmp::Reverse(millis)); + deletable_snapshot_present |= min_val < cutoff_time_ms; + } else { + deletable_snapshot_present |= millis < cutoff_time_ms; + } + } + } + } + } } - let mut to_delete: Vec = Vec::new(); + log::info!( + "Listed {} files across all {} snapshots", + files_count, + self.name, + ); + + // We should delete everything less than the lesser of: cutoff_time_ms, or snapshots_priority_queue.peek(). + let deletion_cutoff_time_ms: u128; + { + let earliest_snapshot_in_queue: u128; + if let Some(earliest_snapshot) = snapshots_priority_queue.peek() { + earliest_snapshot_in_queue = earliest_snapshot.0; + deletion_cutoff_time_ms = Ord::min(cutoff_time_ms, earliest_snapshot.0); + } else { + log::warn!("No {} snapshot files found. Skipping deletion pass.", self.name); + return Ok(()); + }; - let mut candidates_map = candidates_map; - for ms in snapshots_list { - to_delete.append( - candidates_map - .get_mut(&ms) - .expect("delete_old_snapshots candidates_map lookup should succeed"), - ); + if !deletable_snapshot_present { + log::info!("Deleting no {} snapshots. cutoff_time_ms = {}, earliest_snapshot_in_queue = {}, queue length = {}, min_snapshots_count = {}", self.name, cutoff_time_ms, earliest_snapshot_in_queue, snapshots_priority_queue.len(), min_snapshots_count); + return Ok(()); + } } - for batch in to_delete.chunks( - self.snapshots_deletion_batch_size - .try_into() - .unwrap_or(usize::MAX), - ) { - for v in join_all( - batch - .iter() - .map(|f| self.remote_fs.delete_file(f.to_string())) - .collect::>(), - ) - .await - .into_iter() - { - v?; + std::mem::drop(snapshots_priority_queue); + std::mem::drop(snapshots_hash); + + log::info!( + "Deleting {} snapshots earlier than {}...", + self.name, + deletion_cutoff_time_ms, + ); + + { + let mut page_stream = self.remote_fs.list_by_page(remote_prefix.clone()).await?; + + while let Some(names) = StreamExt::next(&mut page_stream).await { + let existing_metastore_files = names?; + + let mut to_delete = Vec::::new(); + for existing in existing_metastore_files { + if let Some(millis) = + Self::metastore_file_snapshot_number(&remote_prefix, &existing) + { + if millis < deletion_cutoff_time_ms { + to_delete.push(existing); + } + } + } + + // This batching seems not necessary because we paginate reads, but some + // list_by_page implementations do not actually paginate. + for batch in to_delete.chunks( + self.snapshots_deletion_batch_size + .try_into() + .unwrap_or(usize::MAX), + ) { + for v in join_all( + batch + .iter() + .map(|f| self.remote_fs.delete_file(f.to_string())) + .collect::>(), + ) + .await + .into_iter() + { + v?; + } + } } } - Ok(to_delete) + Ok(()) } pub async fn is_remote_metadata_exists(&self) -> Result { diff --git a/rust/cubestore/cubestore/src/remotefs/gcs.rs b/rust/cubestore/cubestore/src/remotefs/gcs.rs index dbfaa2ce5f54d..933b40d57dea8 100644 --- a/rust/cubestore/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/cubestore/src/remotefs/gcs.rs @@ -1,5 +1,6 @@ use crate::app_metrics; use crate::di_service; +use crate::remotefs::ExtendedRemoteFs; use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs}; use crate::util::lock::acquire_lock; use crate::CubeError; @@ -115,7 +116,7 @@ impl GCSRemoteFs { } } -di_service!(GCSRemoteFs, [RemoteFs]); +di_service!(GCSRemoteFs, [RemoteFs, ExtendedRemoteFs]); #[async_trait] impl RemoteFs for GCSRemoteFs { @@ -292,6 +293,10 @@ impl RemoteFs for GCSRemoteFs { } } +// TODO: Make a faster implementation +#[async_trait] +impl ExtendedRemoteFs for GCSRemoteFs {} + struct LeadingSubpath(Regex); impl GCSRemoteFs { diff --git a/rust/cubestore/cubestore/src/remotefs/minio.rs b/rust/cubestore/cubestore/src/remotefs/minio.rs index 5f6dc7911067c..4ae47b90828c7 100644 --- a/rust/cubestore/cubestore/src/remotefs/minio.rs +++ b/rust/cubestore/cubestore/src/remotefs/minio.rs @@ -1,5 +1,7 @@ use crate::di_service; -use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs}; +use crate::remotefs::{ + CommonRemoteFsUtils, ExtendedRemoteFs, LocalDirRemoteFs, RemoteFile, RemoteFs, +}; use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; @@ -153,7 +155,7 @@ fn refresh_interval_from_env() -> Duration { Duration::from_secs(60 * mins) } -di_service!(MINIORemoteFs, [RemoteFs]); +di_service!(MINIORemoteFs, [RemoteFs, ExtendedRemoteFs]); #[async_trait] impl RemoteFs for MINIORemoteFs { @@ -337,6 +339,10 @@ impl RemoteFs for MINIORemoteFs { Ok(buf.to_str().unwrap().to_string()) } } + +#[async_trait] +impl ExtendedRemoteFs for MINIORemoteFs {} + //TODO impl MINIORemoteFs { fn s3_path(&self, remote_path: &str) -> String { diff --git a/rust/cubestore/cubestore/src/remotefs/mod.rs b/rust/cubestore/cubestore/src/remotefs/mod.rs index 18943f531efa9..cba78cd47aecc 100644 --- a/rust/cubestore/cubestore/src/remotefs/mod.rs +++ b/rust/cubestore/cubestore/src/remotefs/mod.rs @@ -12,6 +12,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use datafusion::cube_ext; use futures::future::BoxFuture; +use futures::stream::BoxStream; use futures::FutureExt; use log::debug; use serde::{Deserialize, Serialize}; @@ -82,6 +83,27 @@ pub trait RemoteFs: DIService + Send + Sync + Debug { async fn local_file(&self, remote_path: String) -> Result; } +/// This has `RemoteFs` methods that can't be used in a cuberpc::service. +#[async_trait] +pub trait ExtendedRemoteFs: DIService + RemoteFs { + /// Like `Remotefs::list` but returns the resulting set of strings with a Stream of filenames in + /// pages. Note that the default implementation returns all the pages in a single batch. + async fn list_by_page( + &self, + remote_prefix: String, + ) -> Result, CubeError>>, CubeError> { + // Note, this implementation doesn't actually paginate. + let list: Vec = self.list(remote_prefix).await?; + + let stream: BoxStream<_> = if list.is_empty() { + Box::pin(futures::stream::empty()) + } else { + Box::pin(futures::stream::once(async { Ok(list) })) + }; + Ok(stream) + } +} + pub struct CommonRemoteFsUtils; impl CommonRemoteFsUtils { @@ -184,7 +206,7 @@ impl LocalDirRemoteFs { } } -di_service!(LocalDirRemoteFs, [RemoteFs]); +di_service!(LocalDirRemoteFs, [RemoteFs, ExtendedRemoteFs]); di_service!(RemoteFsRpcClient, [RemoteFs]); #[async_trait] @@ -362,6 +384,9 @@ impl RemoteFs for LocalDirRemoteFs { } } +#[async_trait] +impl ExtendedRemoteFs for LocalDirRemoteFs {} + impl LocalDirRemoteFs { fn remove_empty_paths_boxed( root: PathBuf, diff --git a/rust/cubestore/cubestore/src/remotefs/queue.rs b/rust/cubestore/cubestore/src/remotefs/queue.rs index 6dd94cad30684..fb6388ee61d6f 100644 --- a/rust/cubestore/cubestore/src/remotefs/queue.rs +++ b/rust/cubestore/cubestore/src/remotefs/queue.rs @@ -1,6 +1,6 @@ use crate::config::ConfigObj; use crate::di_service; -use crate::remotefs::{CommonRemoteFsUtils, RemoteFile, RemoteFs}; +use crate::remotefs::{CommonRemoteFsUtils, ExtendedRemoteFs, RemoteFile, RemoteFs}; use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; @@ -8,6 +8,7 @@ use core::fmt; use datafusion::cube_ext; use deadqueue::unlimited; use futures::future::join_all; +use futures::stream::BoxStream; use log::error; use smallvec::alloc::fmt::Formatter; use std::collections::HashSet; @@ -18,7 +19,7 @@ use tokio::sync::{broadcast, watch, RwLock}; pub struct QueueRemoteFs { config: Arc, - remote_fs: Arc, + remote_fs: Arc, upload_queue: unlimited::Queue, download_queue: unlimited::Queue, // TODO not used @@ -56,10 +57,10 @@ pub enum RemoteFsOpResult { Download(String, Result), } -di_service!(QueueRemoteFs, [RemoteFs]); +di_service!(QueueRemoteFs, [RemoteFs, ExtendedRemoteFs]); impl QueueRemoteFs { - pub fn new(config: Arc, remote_fs: Arc) -> Arc { + pub fn new(config: Arc, remote_fs: Arc) -> Arc { let (stopped_tx, stopped_rx) = watch::channel(false); let (tx, rx) = broadcast::channel(16384); Arc::new(Self { @@ -340,6 +341,16 @@ impl RemoteFs for QueueRemoteFs { } } +#[async_trait] +impl ExtendedRemoteFs for QueueRemoteFs { + async fn list_by_page( + &self, + remote_prefix: String, + ) -> Result, CubeError>>, CubeError> { + self.remote_fs.list_by_page(remote_prefix).await + } +} + impl QueueRemoteFs { async fn check_file_size( remote_path: &str, @@ -386,7 +397,7 @@ mod test { } } - di_service!(MockFs, [RemoteFs]); + di_service!(MockFs, [RemoteFs, ExtendedRemoteFs]); #[async_trait] impl RemoteFs for MockFs { @@ -474,6 +485,9 @@ mod test { } } + #[async_trait] + impl ExtendedRemoteFs for MockFs {} + fn make_test_csv() -> std::path::PathBuf { let dir = env::temp_dir(); diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 4dac23ea2d6cd..32a0bb37a20bd 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -1,11 +1,13 @@ use crate::app_metrics; use crate::di_service; +use crate::remotefs::ExtendedRemoteFs; use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteFs}; use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; use chrono::{DateTime, Utc}; use datafusion::cube_ext; +use futures::stream::BoxStream; use log::{debug, info}; use regex::{NoExpand, Regex}; use s3::creds::Credentials; @@ -147,7 +149,7 @@ fn refresh_interval_from_env() -> Duration { Duration::from_secs(60 * mins) } -di_service!(S3RemoteFs, [RemoteFs]); +di_service!(S3RemoteFs, [RemoteFs, ExtendedRemoteFs]); #[async_trait] impl RemoteFs for S3RemoteFs { @@ -339,6 +341,44 @@ impl RemoteFs for S3RemoteFs { } } +#[async_trait] +impl ExtendedRemoteFs for S3RemoteFs { + async fn list_by_page( + &self, + remote_prefix: String, + ) -> Result, CubeError>>, CubeError> { + let path = self.s3_path(&remote_prefix); + let bucket = self.bucket.load(); + let leading_subpath = self.leading_subpath_regex(); + + let stream = async_stream::stream! { + let mut continuation_token = None; + let mut pages_count: i64 = 0; + + loop { + let (result, _) = bucket + .list_page(path.clone(), None, continuation_token, None, None) + .await?; + + pages_count += 1; + + let page: Vec = result.contents.into_iter().map(|obj| Self::object_key_to_remote_path(&leading_subpath, &obj.key)).collect(); + continuation_token = result.next_continuation_token; + + yield Ok(page); + + if continuation_token.is_none() { + break; + } + } + + Self::pages_count_app_metrics_and_logging(pages_count, "streaming"); + }; + + Ok(Box::pin(stream)) + } +} + struct LeadingSubpath(Regex); impl S3RemoteFs { @@ -381,14 +421,24 @@ impl S3RemoteFs { } } + Self::pages_count_app_metrics_and_logging(pages_count, "non-streaming"); + + Ok(mapped_results) + } + + fn pages_count_app_metrics_and_logging(pages_count: i64, log_op: &str) { app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( pages_count as i64, Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]), ); if pages_count > 100 { - log::warn!("S3 list returned more than 100 pages: {}", pages_count); + // Probably only "S3 list (non-streaming)" messages are of concern, not "S3 list (streaming)". + log::warn!( + "S3 list ({}) returned more than 100 pages: {}", + log_op, + pages_count + ); } - Ok(mapped_results) } fn s3_path(&self, remote_path: &str) -> String { diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 2ff2144db1037..793cece76fab8 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1679,7 +1679,7 @@ mod tests { use crate::metastore::{BaseRocksStoreFs, RocksMetaStore, RowKey, TableId}; use crate::queryplanner::query_executor::MockQueryExecutor; use crate::queryplanner::MockQueryPlanner; - use crate::remotefs::{LocalDirRemoteFs, RemoteFile, RemoteFs}; + use crate::remotefs::{ExtendedRemoteFs, LocalDirRemoteFs, RemoteFile, RemoteFs}; use crate::store::ChunkStore; use super::*; @@ -2055,6 +2055,9 @@ mod tests { } } + #[async_trait::async_trait] + impl ExtendedRemoteFs for FailingRemoteFs {} + #[tokio::test] async fn create_table_if_not_exists() { Config::test("create_table_if_not_exists").start_with_injector_override(async move |injector| {