From a38fff6a7b87c57d8597eb6ae7d2e1b8c8685f26 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Fri, 19 Dec 2025 09:37:56 -0700 Subject: [PATCH 1/2] Move aws-local into file-store - Removes circular dependency on aws-local <-> file-store - provides a way for file-store to test itself going through a proper client - update mobile-verifier test --- Cargo.lock | 22 +--- Cargo.toml | 1 - aws_local/Cargo.toml | 20 ---- file_store/Cargo.toml | 4 +- .../src/lib.rs => file_store/src/aws_local.rs | 20 ++-- file_store/src/file_info_poller.rs | 106 +++++++++++++++--- file_store/src/lib.rs | 1 + file_store/tests/file_info_poller.rs | 93 --------------- mobile_verifier/Cargo.toml | 1 - .../tests/integrations/boosting_oracles.rs | 10 +- price_tracker/Cargo.toml | 1 - 11 files changed, 111 insertions(+), 168 deletions(-) delete mode 100644 aws_local/Cargo.toml rename aws_local/src/lib.rs => file_store/src/aws_local.rs (95%) delete mode 100644 file_store/tests/file_info_poller.rs diff --git a/Cargo.lock b/Cargo.lock index 0112d2318..d631f4eda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,24 +786,6 @@ dependencies = [ "libloading", ] -[[package]] -name = "aws-local" -version = "0.1.0" -dependencies = [ - "anyhow", - "aws-config", - "aws-sdk-s3", - "bytes", - "chrono", - "file-store", - "prost", - "tempfile", - "tokio", - "tonic", - "triggered", - "uuid", -] - [[package]] name = "aws-runtime" version = "1.5.12" @@ -2821,7 +2803,6 @@ dependencies = [ "async-compression", "async-trait", "aws-config", - "aws-local", "aws-sdk-s3", "aws-smithy-types-convert", "bytes", @@ -2846,6 +2827,7 @@ dependencies = [ "tokio-util", "tracing", "triggered", + "uuid", ] [[package]] @@ -5041,7 +5023,6 @@ dependencies = [ "anyhow", "async-compression", "async-trait", - "aws-local", "base64 0.22.1", "chrono", "clap", @@ -5850,7 +5831,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "aws-local", "chrono", "file-store", "file-store-oracles", diff --git a/Cargo.toml b/Cargo.toml index 10734f8ac..f558f3f85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ members = [ "solana", "task_manager", "hex_assignments", - "aws_local", "tls_init", ] resolver = "2" diff --git a/aws_local/Cargo.toml b/aws_local/Cargo.toml deleted file mode 100644 index 126302f96..000000000 --- a/aws_local/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "aws-local" -version = "0.1.0" -authors.workspace = true -license.workspace = true -edition.workspace = true - -[dependencies] -aws-config = { workspace = true } -aws-sdk-s3 = { workspace = true } -bytes = { workspace = true } -tokio = { workspace = true } -triggered = { workspace = true } -tonic = { workspace = true } -chrono = { workspace = true } -prost = { workspace = true } -anyhow = { workspace = true } -uuid = { workspace = true } -tempfile = { workspace = true } -file-store = { path = "../file_store"} diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 7cb23e288..588c794dc 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -30,14 +30,14 @@ derive_builder = { workspace = true } retainer = { workspace = true } task-manager = { path = "../task_manager" } tls-init = { path = "../tls_init" } +uuid = { workspace = true } +tempfile = { workspace = true } aws-config = { workspace = true } aws-sdk-s3 = { workspace = true } aws-smithy-types-convert = { workspace = true } [dev-dependencies] -tempfile = { workspace = true } -aws-local = { path = "../aws_local" } [features] default = ["sqlx-postgres"] diff --git a/aws_local/src/lib.rs b/file_store/src/aws_local.rs similarity index 95% rename from aws_local/src/lib.rs rename to file_store/src/aws_local.rs index ab9b1e845..6ebbec3c3 100644 --- a/aws_local/src/lib.rs +++ b/file_store/src/aws_local.rs @@ -1,9 +1,10 @@ -use anyhow::Result; +use crate::{BucketClient, GzippedFramedFile}; use chrono::{DateTime, Utc}; -use file_store::{BucketClient, GzippedFramedFile}; use std::env; use uuid::Uuid; +pub type Result = std::result::Result>; + pub const AWSLOCAL_ENDPOINT_ENV: &str = "AWSLOCAL_ENDPOINT"; pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://localhost:4566"; @@ -53,9 +54,9 @@ impl AwsLocal { .create_bucket() .bucket(&self.client.bucket) .send() - .await - .map(|_| ()) - .map_err(anyhow::Error::from) + .await?; + + Ok(()) } pub async fn delete_bucket(&self) -> Result<()> { @@ -67,8 +68,9 @@ impl AwsLocal { aws_sdk_s3::types::ObjectIdentifier::builder() .key(fi.key) .build() + .map_err(Into::into) }) - .collect::>()?; + .collect::>()?; self.client .client @@ -87,9 +89,9 @@ impl AwsLocal { .delete_bucket() .bucket(&self.client.bucket) .send() - .await - .map(|_| ()) - .map_err(anyhow::Error::from) + .await?; + + Ok(()) } pub async fn put_protos( diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index f0bd2df68..bce291e76 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -627,12 +627,86 @@ pub mod sqlx_postgres { #[cfg(test)] mod tests { + use crate::{aws_local::AwsLocal, file_source}; use sqlx::{Executor, PgPool}; use std::time::Duration; use tokio::time::timeout; use super::*; + #[sqlx::test] + async fn poller_filters_files_by_exact_prefix( + pool: sqlx::PgPool, + ) -> std::result::Result<(), Box> { + create_files_processed_table(&pool).await?; + + let awsl = AwsLocal::new().await; + awsl.create_bucket().await?; + + #[derive(Clone, prost::Message)] + struct TestV1 {} + + #[derive(Clone, prost::Message)] + struct TestV2 {} + + // Put 1 file of each type with overlapping prefixes + awsl.put_protos("file_type", vec![TestV1 {}]).await?; + awsl.put_protos("file_type_v2", vec![TestV2 {}]).await?; + + let (receiver_v1, server_v1) = file_source::Continuous::prost_source::() + .state(pool.clone()) + .bucket_client(awsl.bucket_client()) + .lookback_start_after(DateTime::UNIX_EPOCH) + .prefix("file_type") + .create() + .await?; + + let (receiver_v2, server_v2) = file_source::Continuous::prost_source::() + .state(pool.clone()) + .bucket_client(awsl.bucket_client()) + .lookback_start_after(DateTime::UNIX_EPOCH) + .prefix("file_type_v2") + .create() + .await?; + + let (trigger, listener) = triggered::trigger(); + let _handle_v1 = tokio::spawn(server_v1.run(listener.clone())); + let _handle_v2 = tokio::spawn(server_v2.run(listener.clone())); + + let mut v1 = consume_msgs(receiver_v1).await.into_iter(); + let mut v2 = consume_msgs(receiver_v2).await.into_iter(); + + assert!( + v1.all(|f| !f.file_info.key.starts_with("file_type_v2")), + "Expected no files with prefix 'file_type_v2'" + ); + assert!( + v2.all(|f| f.file_info.key.starts_with("file_type_v2")), + "Expected all files with prefix 'file_type_v2'" + ); + + trigger.trigger(); + awsl.cleanup().await?; + + Ok(()) + } + + async fn consume_msgs(mut receiver: Receiver) -> Vec { + use std::time::Duration; + use tokio::time::timeout; + + let mut msgs = Vec::with_capacity(10); + + // FileInfoPoller puts a single file into the channel at a time. It's easier + // to loop here with a timeout than sleep some arbitrary amount hoping it + // will have processed all it's files by then. + while let Ok(Some(msg)) = timeout(Duration::from_millis(100), receiver.recv()).await { + msgs.push(msg); + } + + msgs + } + struct TestParser; struct TestStore(Vec); @@ -682,19 +756,7 @@ pub mod sqlx_postgres { // Cleaning the files_processed table should not cause files within the // `FileInfoPoller.config.offset` window to be reprocessed. - // There is no auto-migration for tests in this lib workspace. - pool.execute( - r#" - CREATE TABLE files_processed ( - process_name TEXT NOT NULL DEFAULT 'default', - file_name VARCHAR PRIMARY KEY, - file_type VARCHAR NOT NULL, - file_timestamp TIMESTAMPTZ NOT NULL, - processed_at TIMESTAMPTZ NOT NULL - ); - "#, - ) - .await?; + create_files_processed_table(&pool).await?; // The important aspect of this test is that all the files to be // processed happen _within_ the lookback offset. @@ -779,6 +841,24 @@ pub mod sqlx_postgres { Ok(()) } + + // There is no auto-migration for tests in this lib workspace. + async fn create_files_processed_table(pool: &PgPool) -> sqlx::Result<()> { + pool.execute( + r#" + CREATE TABLE files_processed ( + process_name TEXT NOT NULL DEFAULT 'default', + file_name VARCHAR PRIMARY KEY, + file_type VARCHAR NOT NULL, + file_timestamp TIMESTAMPTZ NOT NULL, + processed_at TIMESTAMPTZ NOT NULL + ); + "#, + ) + .await?; + + Ok(()) + } } } diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 8fb546d0f..5ea6bcc0a 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -4,6 +4,7 @@ mod error; mod gzipped_framed_file; mod settings; +pub mod aws_local; pub mod bucket_client; pub mod file_info; pub mod file_info_poller; diff --git a/file_store/tests/file_info_poller.rs b/file_store/tests/file_info_poller.rs deleted file mode 100644 index 380a374b6..000000000 --- a/file_store/tests/file_info_poller.rs +++ /dev/null @@ -1,93 +0,0 @@ -use aws_local::AwsLocal; -use chrono::DateTime; -use file_store::file_source; -use sqlx::{Executor, PgPool}; - -// We would like this test to be moved to src/file_info_poller.rs. With -// aws-local currently being it's own workspace, the circular dependency on -// file-store fails to resolve to the same types. Until we can pull that -// functionality into file-store it will live here. -#[sqlx::test] -async fn poller_filters_files_by_exact_prefix( - pool: PgPool, -) -> Result<(), Box> { - // There is no auto-migration for tests in this lib workspace. - pool.execute( - r#" - CREATE TABLE files_processed ( - process_name TEXT NOT NULL DEFAULT 'default', - file_name VARCHAR PRIMARY KEY, - file_type VARCHAR NOT NULL, - file_timestamp TIMESTAMPTZ NOT NULL, - processed_at TIMESTAMPTZ NOT NULL - ); - "#, - ) - .await?; - - let awsl = AwsLocal::new().await; - awsl.create_bucket().await?; - - #[derive(Clone, prost::Message)] - struct TestV1 {} - - #[derive(Clone, prost::Message)] - struct TestV2 {} - - // Put 1 file of each type with overlapping prefixes - awsl.put_protos("file_type", vec![TestV1 {}]).await?; - awsl.put_protos("file_type_v2", vec![TestV2 {}]).await?; - - let (receiver_v1, server_v1) = file_source::Continuous::prost_source::() - .state(pool.clone()) - .bucket_client(awsl.bucket_client()) - .lookback_start_after(DateTime::UNIX_EPOCH) - .prefix("file_type") - .create() - .await?; - - let (receiver_v2, server_v2) = file_source::Continuous::prost_source::() - .state(pool.clone()) - .bucket_client(awsl.bucket_client()) - .lookback_start_after(DateTime::UNIX_EPOCH) - .prefix("file_type_v2") - .create() - .await?; - - let (trigger, listener) = triggered::trigger(); - let _handle_v1 = tokio::spawn(server_v1.run(listener.clone())); - let _handle_v2 = tokio::spawn(server_v2.run(listener.clone())); - - let mut v1 = consume_msgs(receiver_v1).await.into_iter(); - let mut v2 = consume_msgs(receiver_v2).await.into_iter(); - - assert!( - v1.all(|f| !f.file_info.key.starts_with("file_type_v2")), - "Expected no files with prefix 'file_type_v2'" - ); - assert!( - v2.all(|f| f.file_info.key.starts_with("file_type_v2")), - "Expected all files with prefix 'file_type_v2'" - ); - - trigger.trigger(); - awsl.cleanup().await?; - - Ok(()) -} - -async fn consume_msgs(mut receiver: tokio::sync::mpsc::Receiver) -> Vec { - use std::time::Duration; - use tokio::time::timeout; - - let mut msgs = Vec::with_capacity(10); - - // FileInfoPoller puts a single file into the channel at a time. It's easier - // to loop here with a timeout than sleep some arbitrary amount hoping it - // will have processed all it's files by then. - while let Ok(Some(msg)) = timeout(Duration::from_millis(100), receiver.recv()).await { - msgs.push(msg); - } - - msgs -} diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 19e077e26..7d5d0437c 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -66,6 +66,5 @@ solana = { path = "../solana" } task-manager = { path = "../task_manager" } [dev-dependencies] -aws-local = { path = "../aws_local" } proptest = "1.5.0" tempfile = "3" diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index 54926e203..f646891e3 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -1,7 +1,7 @@ use crate::common::{self, GatewayClientAllOwnersValid, MockHexBoostDataColl}; use anyhow::Context; use chrono::{DateTime, Duration, Utc}; -use file_store::file_sink; +use file_store::{aws_local::AwsLocal, file_sink}; use file_store_oracles::{ coverage::RadioHexSignalLevel, speedtest::CellSpeedtest, @@ -13,7 +13,7 @@ use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ CoverageObjectValidity, LocationSource, OracleBoostingHexAssignment, SignalLevel, }; -use hex_assignments::Assignment; +use hex_assignments::{Assignment, HexBoostData}; use mobile_config::boosted_hex_info::BoostedHexes; use mobile_verifier::{ banning::BannedRadios, @@ -34,7 +34,7 @@ use mobile_verifier::{ use rust_decimal::Decimal; use rust_decimal_macros::dec; use sqlx::PgPool; -use std::{collections::HashMap, pin::pin}; +use std::{collections::HashMap, path::PathBuf, pin::pin, str::FromStr}; use uuid::Uuid; #[derive(Clone)] @@ -101,10 +101,6 @@ fn hex_cell(loc: &str) -> hextree::Cell { hextree::Cell::from_raw(u64::from_str_radix(loc, 16).unwrap()).unwrap() } -use aws_local::*; -use hex_assignments::HexBoostData; -use std::{path::PathBuf, str::FromStr}; - pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool { sqlx::query_scalar::<_, bool>( r#" diff --git a/price_tracker/Cargo.toml b/price_tracker/Cargo.toml index a24ecec8f..56775babe 100644 --- a/price_tracker/Cargo.toml +++ b/price_tracker/Cargo.toml @@ -23,4 +23,3 @@ file-store-oracles = { path = "../file_store_oracles" } task-manager = { path = "../task_manager" } [dev-dependencies] -aws-local = { path = "../aws_local" } From 9f99cfeb24bcf603b56e031da804baa60a0921a4 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Fri, 19 Dec 2025 09:37:12 -0700 Subject: [PATCH 2/2] refactor file into poller sqlx tests to use aws-local --- file_store/src/file_info_poller.rs | 208 +++++++++++------------------ 1 file changed, 77 insertions(+), 131 deletions(-) diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index bce291e76..f3f644882 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -77,13 +77,11 @@ where } } - pub async fn into_stream( - self, - recorder: &mut impl FileInfoPollerStateRecorder, - ) -> Result> - where - T: 'static, - { + async fn mark_processed(&self, recorder: &mut impl FileInfoPollerStateRecorder) -> Result<()> { + recorder.record(&self.process_name, &self.file_info).await + } + + fn record_metrics(&self) { let latency = Utc::now() - self.file_info.timestamp; metrics::gauge!( "file-processing-latency", @@ -98,8 +96,17 @@ where "process-name" => self.process_name.clone(), ) .set(self.file_info.timestamp.timestamp_millis() as f64); + } - recorder.record(&self.process_name, &self.file_info).await?; + pub async fn into_stream( + self, + recorder: &mut impl FileInfoPollerStateRecorder, + ) -> Result> + where + T: 'static, + { + self.record_metrics(); + self.mark_processed(recorder).await?; Ok(futures::stream::iter(self.data.into_iter()).boxed()) } } @@ -629,11 +636,13 @@ pub mod sqlx_postgres { use crate::{aws_local::AwsLocal, file_source}; use sqlx::{Executor, PgPool}; - use std::time::Duration; use tokio::time::timeout; use super::*; + #[derive(Clone, prost::Message)] + struct TestMsg {} + #[sqlx::test] async fn poller_filters_files_by_exact_prefix( pool: sqlx::PgPool, @@ -643,17 +652,11 @@ pub mod sqlx_postgres { let awsl = AwsLocal::new().await; awsl.create_bucket().await?; - #[derive(Clone, prost::Message)] - struct TestV1 {} - - #[derive(Clone, prost::Message)] - struct TestV2 {} - // Put 1 file of each type with overlapping prefixes - awsl.put_protos("file_type", vec![TestV1 {}]).await?; - awsl.put_protos("file_type_v2", vec![TestV2 {}]).await?; + awsl.put_protos("file_type", vec![TestMsg {}]).await?; + awsl.put_protos("file_type_v2", vec![TestMsg {}]).await?; - let (receiver_v1, server_v1) = file_source::Continuous::prost_source::() + let (receiver_v1, server_v1) = file_source::Continuous::prost_source::() .state(pool.clone()) .bucket_client(awsl.bucket_client()) .lookback_start_after(DateTime::UNIX_EPOCH) @@ -661,7 +664,7 @@ pub mod sqlx_postgres { .create() .await?; - let (receiver_v2, server_v2) = file_source::Continuous::prost_source::() + let (receiver_v2, server_v2) = file_source::Continuous::prost_source::() .state(pool.clone()) .bucket_client(awsl.bucket_client()) .lookback_start_after(DateTime::UNIX_EPOCH) @@ -673,15 +676,19 @@ pub mod sqlx_postgres { let _handle_v1 = tokio::spawn(server_v1.run(listener.clone())); let _handle_v2 = tokio::spawn(server_v2.run(listener.clone())); - let mut v1 = consume_msgs(receiver_v1).await.into_iter(); - let mut v2 = consume_msgs(receiver_v2).await.into_iter(); + let files_v1 = consume_files_and_mark_processed(receiver_v1, &pool).await?; + let files_v2 = consume_files_and_mark_processed(receiver_v2, &pool).await?; assert!( - v1.all(|f| !f.file_info.key.starts_with("file_type_v2")), + files_v1 + .into_iter() + .all(|f| !f.key.starts_with("file_type_v2")), "Expected no files with prefix 'file_type_v2'" ); assert!( - v2.all(|f| f.file_info.key.starts_with("file_type_v2")), + files_v2 + .into_iter() + .all(|f| f.key.starts_with("file_type_v2")), "Expected all files with prefix 'file_type_v2'" ); @@ -691,64 +698,6 @@ pub mod sqlx_postgres { Ok(()) } - async fn consume_msgs(mut receiver: Receiver) -> Vec { - use std::time::Duration; - use tokio::time::timeout; - - let mut msgs = Vec::with_capacity(10); - - // FileInfoPoller puts a single file into the channel at a time. It's easier - // to loop here with a timeout than sleep some arbitrary amount hoping it - // will have processed all it's files by then. - while let Ok(Some(msg)) = timeout(Duration::from_millis(100), receiver.recv()).await { - msgs.push(msg); - } - - msgs - } - - struct TestParser; - struct TestStore(Vec); - - #[async_trait::async_trait] - impl FileInfoPollerParser for TestParser { - async fn parse(&self, _byte_stream: ByteStream) -> Result> { - Ok(vec![]) - } - } - - #[async_trait::async_trait] - impl FileInfoPollerStore for TestStore { - async fn list_all( - &self, - _file_type: &str, - after: A, - before: B, - ) -> Result> - where - A: Into>> + Send + Sync + Copy, - B: Into>> + Send + Sync + Copy, - { - let after = after.into(); - let before = before.into(); - - Ok(self - .0 - .clone() - .into_iter() - .filter(|file_info| after.is_none_or(|v| file_info.timestamp > v)) - .filter(|file_info| before.is_none_or(|v| file_info.timestamp <= v)) - .collect()) - } - - async fn get_raw(&self, _key: K) -> Result - where - K: Into + Send + Sync, - { - Ok(ByteStream::default()) - } - } - #[sqlx::test] async fn do_not_reprocess_files_when_offset_exceeds_earliest_file( pool: PgPool, @@ -758,28 +707,26 @@ pub mod sqlx_postgres { create_files_processed_table(&pool).await?; + let awsl = AwsLocal::new().await; + awsl.create_bucket().await?; + // The important aspect of this test is that all the files to be // processed happen _within_ the lookback offset. - const EXPECTED_FILE_COUNT: i64 = 150; - let mut infos = vec![]; + const EXPECTED_FILE_COUNT: usize = 150; + let now = Utc::now(); for seconds in 0..EXPECTED_FILE_COUNT { - let file_info = FileInfo { - key: format!("key-{seconds}"), - prefix: "file_type".to_string(), - timestamp: Utc::now() - chrono::Duration::seconds(seconds), - size: 42, - }; - infos.push(file_info); + let timestamp = now - Duration::from_secs(seconds as u64); + awsl.put_protos_at_time("file_type", vec![TestMsg {}], timestamp) + .await?; } // To simulate a restart, we're going to make a new FileInfoPoller. // This closure is to ensure they have the same settings. let file_info_builder = || { - let six_hours = chrono::Duration::hours(6).to_std().unwrap(); - FileInfoPollerConfigBuilder::::default() - .parser(TestParser) + let six_hours = Duration::from_hours(6); + file_source::Continuous::prost_source::() .state(pool.clone()) - .store(TestStore(infos.clone())) + .bucket_client(awsl.bucket_client()) .lookback_max(six_hours) .prefix("file_type".to_string()) .offset(six_hours) @@ -788,28 +735,13 @@ pub mod sqlx_postgres { // The first startup of the file info poller, there is nothing to clean. // And all file_infos will be returned to be processed. - let (mut receiver, ingest_server) = file_info_builder().await?; let (trigger, shutdown) = triggered::trigger(); - tokio::spawn(async move { - if let Err(status) = ingest_server.run(shutdown).await { - println!("ingest server went down unexpectedly: {status:?}"); - } - }); - - // "process" all the files. They are not recorded into the database - // until the file is consumed as a stream. - let mut processed = 0; - while processed < EXPECTED_FILE_COUNT { - match timeout(Duration::from_secs(1), receiver.recv()).await? { - Some(msg) => { - processed += 1; - let mut txn = pool.begin().await?; - let _x = msg.into_stream(&mut txn).await?; - txn.commit().await?; - } - err => panic!("something went wrong: {err:?}"), - }; - } + let (receiver, ingest_server) = file_info_builder().await?; + let _handle = tokio::spawn(ingest_server.run(shutdown)); + + // "process" all the files. + let files = consume_files_and_mark_processed(receiver, &pool).await?; + assert_eq!(files.len(), EXPECTED_FILE_COUNT); // Shutdown the ingest server, we're going to create a new one and start it. trigger.trigger(); @@ -818,27 +750,21 @@ pub mod sqlx_postgres { // have been processed. The initial clean should not remove processed // files in a way that causes us to re-receive any files within our // offset for processing. - let (mut receiver, ingest_server) = file_info_builder().await?; let (trigger, shutdown) = triggered::trigger(); - let _handle = tokio::spawn(async move { - if let Err(status) = ingest_server.run(shutdown).await { - println!("ingest server went down unexpectedly: {status:?}"); - } - }); - - // Attempting to receive files for processing. The timeout should fire, - // because all the files we have setup exist within the offset, and - // should still be in the database. - match timeout(Duration::from_secs(1), receiver.recv()).await { - Err(_err) => (), - Ok(msg) => { - panic!("we got something when we expected nothing.: {msg:?}"); - } - } + let (receiver, ingest_server) = file_info_builder().await?; + let _handle = tokio::spawn(ingest_server.run(shutdown)); + + // Attempting to receive files for processing. All the files we have + // setup exist within the offset, and should still be in the + // database. + let files = consume_files_and_mark_processed(receiver, &pool).await?; + assert!(files.is_empty()); // Shut down for great good trigger.trigger(); + awsl.cleanup().await?; + Ok(()) } @@ -859,6 +785,26 @@ pub mod sqlx_postgres { Ok(()) } + + async fn consume_files_and_mark_processed( + mut receiver: FileInfoStreamReceiver, + pool: &PgPool, + ) -> std::result::Result, Box> { + let mut msgs = Vec::with_capacity(10); + let mut txn = pool.begin().await?; + + // FileInfoPoller puts a single file into the channel at a time. It's easier + // to loop here with a timeout than sleep some arbitrary amount hoping it + // will have processed all it's files by then. + while let Ok(Some(msg)) = timeout(Duration::from_millis(250), receiver.recv()).await { + msgs.push(msg.file_info.clone()); + msg.mark_processed(&mut txn).await?; + } + + txn.commit().await?; + + Ok(msgs) + } } }