diff --git a/Cargo.lock b/Cargo.lock index b0f5d5eb8..5617daae2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4926,6 +4926,7 @@ dependencies = [ "csv", "custom-tracing", "db-store", + "derive_builder", "file-store", "file-store-oracles", "futures", diff --git a/mobile_config/Cargo.toml b/mobile_config/Cargo.toml index 8efce3730..879dee525 100644 --- a/mobile_config/Cargo.toml +++ b/mobile_config/Cargo.toml @@ -60,6 +60,7 @@ task-manager = { path = "../task_manager" } tls-init = { path = "../tls_init" } [dev-dependencies] +derive_builder = "0.20" rand = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } tempfile = "3" diff --git a/mobile_config/migrations/20251209173943_drop_mobile_radio_tracker.sql b/mobile_config/migrations/20251209173943_drop_mobile_radio_tracker.sql new file mode 100644 index 000000000..8f6af103b --- /dev/null +++ b/mobile_config/migrations/20251209173943_drop_mobile_radio_tracker.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS mobile_radio_tracker; diff --git a/mobile_config/migrations/20251215132218_add_owner_to_gateways.sql b/mobile_config/migrations/20251215132218_add_owner_to_gateways.sql new file mode 100644 index 000000000..241c70380 --- /dev/null +++ b/mobile_config/migrations/20251215132218_add_owner_to_gateways.sql @@ -0,0 +1,3 @@ +ALTER TABLE gateways +ADD COLUMN IF NOT EXISTS owner VARCHAR(255), +ADD COLUMN IF NOT EXISTS owner_changed_at TIMESTAMPTZ; diff --git a/mobile_config/src/cli/import.rs b/mobile_config/src/cli/import.rs deleted file mode 100644 index 0479aac3e..000000000 --- a/mobile_config/src/cli/import.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::{ - gateway::db::{Gateway, LocationChangedAtUpdate}, - settings::Settings, -}; -use chrono::{DateTime, Utc}; -use h3o::{error::InvalidCellIndex, CellIndex}; -use helium_crypto::PublicKey; -use serde::Deserialize; -use std::{ - fs::File, - path::{Path, PathBuf}, - str::FromStr, - time::Instant, -}; - -#[derive(Debug, clap::Parser)] -pub struct Import { - #[clap(short = 'f')] - file: PathBuf, - - #[clap(subcommand)] - cmd: Cmd, -} - -#[derive(Debug, clap::Subcommand)] -pub enum Cmd { - HotspotsAssertions, -} - -impl Import { - pub async fn run(self, settings: &Settings) -> anyhow::Result<()> { - match self.cmd { - Cmd::HotspotsAssertions => { - custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; - - tracing::info!("started"); - - let pool = settings.database.connect("mobile-config-store").await?; - - let start = Instant::now(); - - let updates = read_csv(self.file)? - .into_iter() - .filter_map(|row| row.try_into().ok()) - .collect::>(); - - tracing::info!("file read, updating {} records", updates.len()); - - let updated = Gateway::update_bulk_location_changed_at(&pool, &updates).await?; - - let elapsed = start.elapsed(); - tracing::info!(?elapsed, updated, "finished"); - - Ok(()) - } - } - } -} - -#[derive(Debug, Deserialize)] -struct CsvRow { - public_key: PublicKey, - // serialnumber: String, - time: DateTime, - // latitude: f64, - // longitude: f64, - h3: String, - // assertion_type: String, -} - -#[derive(Debug, thiserror::Error)] -pub enum CsvRowError { - #[error("H3 index parse error: {0}")] - H3IndexParseError(#[from] InvalidCellIndex), -} - -impl TryFrom for LocationChangedAtUpdate { - type Error = CsvRowError; - - fn try_from(row: CsvRow) -> Result { - let cell = CellIndex::from_str(&row.h3)?; - - Ok(Self { - address: row.public_key.into(), - location_changed_at: row.time, - location: cell.into(), - }) - } -} - -fn read_csv>(path: P) -> anyhow::Result> { - let file = File::open(path)?; - let mut rdr = csv::Reader::from_reader(file); - let mut rows = Vec::new(); - - for result in rdr.deserialize() { - let record: CsvRow = result?; - rows.push(record); - } - - Ok(rows) -} diff --git a/mobile_config/src/cli/mod.rs b/mobile_config/src/cli/mod.rs index edd76242f..243ff971a 100644 --- a/mobile_config/src/cli/mod.rs +++ b/mobile_config/src/cli/mod.rs @@ -1,5 +1,5 @@ use crate::{ - cli::{api::Api, import::Import, server::Server}, + cli::{api::Api, server::Server}, settings::Settings, }; use base64::{engine::general_purpose, Engine}; @@ -9,7 +9,6 @@ use std::io::Write; use std::{fs::File, path::PathBuf}; pub mod api; -pub mod import; pub mod server; #[derive(Debug, clap::Parser)] @@ -55,10 +54,6 @@ impl Cli { Ok(()) } - Cmd::Import(import) => { - let settings = Settings::new(self.config)?; - import.run(&settings).await - } Cmd::Server(server) => { let settings = Settings::new(self.config)?; server.run(&settings).await @@ -71,6 +66,5 @@ impl Cli { pub enum Cmd { Api(Api), GenerateKey, - Import(Import), Server(Server), } diff --git a/mobile_config/src/gateway/db.rs b/mobile_config/src/gateway/db.rs index 7821f752e..78ee9dc86 100644 --- a/mobile_config/src/gateway/db.rs +++ b/mobile_config/src/gateway/db.rs @@ -95,6 +95,8 @@ pub struct Gateway { // When location last changed, set to refreshed_at (updated via SQL query see Gateway::insert) pub location_changed_at: Option>, pub location_asserts: Option, + pub owner: Option, + pub owner_changed_at: Option>, } #[derive(Debug)] @@ -122,7 +124,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + owner_changed_at ) ", ); @@ -138,7 +142,9 @@ impl Gateway { .push_bind(g.azimuth.map(|v| v as i64)) .push_bind(g.location.map(|v| v as i64)) .push_bind(g.location_changed_at) - .push_bind(g.location_asserts.map(|v| v as i64)); + .push_bind(g.location_asserts.map(|v| v as i64)) + .push_bind(g.owner.as_deref()) + .push_bind(g.owner_changed_at); }); let res = qb.build().execute(pool).await?; @@ -160,11 +166,13 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + owner_changed_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, - $8, $9, $10, $11, $12 + $8, $9, $10, $11, $12, $13, $14 ) "#, ) @@ -180,6 +188,8 @@ impl Gateway { .bind(self.location.map(|v| v as i64)) .bind(self.location_changed_at) .bind(self.location_asserts.map(|v| v as i64)) + .bind(self.owner.as_deref()) + .bind(self.owner_changed_at) .execute(pool) .await?; @@ -205,7 +215,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + owner_changed_at FROM gateways WHERE address = $1 ORDER BY inserted_at DESC @@ -240,7 +252,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + owner_changed_at FROM gateways WHERE address = ANY($1) ORDER BY address, inserted_at DESC @@ -273,7 +287,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + owner_changed_at FROM gateways WHERE address = $1 AND inserted_at <= $2 @@ -311,7 +327,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + owner_changed_at FROM gateways WHERE address = ANY($1) AND last_changed_at >= $2 @@ -346,7 +364,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + owner_changed_at FROM gateways WHERE gateway_type = ANY($1) AND last_changed_at >= $2 @@ -381,7 +401,7 @@ impl Gateway { r#" UPDATE gateways AS g SET location_changed_at = v.location_changed_at - FROM ( + FROM ( "#, ); @@ -406,6 +426,86 @@ impl Gateway { Ok(total) } + + /// Get gateways where owner is NULL (for post-migration backfill) + /// Only returns addresses where the MOST RECENT record has a NULL owner + pub async fn get_null_owners(pool: &PgPool) -> anyhow::Result> { + let gateways = sqlx::query_as::<_, Self>( + r#" + WITH latest_gateways AS ( + SELECT DISTINCT ON (address) + address, + gateway_type, + created_at, + inserted_at, + refreshed_at, + last_changed_at, + hash, + antenna, + elevation, + azimuth, + location, + location_changed_at, + location_asserts, + owner, + owner_changed_at + FROM gateways + ORDER BY address, inserted_at DESC + ) + SELECT * FROM latest_gateways + WHERE owner IS NULL + "#, + ) + .fetch_all(pool) + .await?; + + Ok(gateways) + } + + /// Update owner and owner_changed_at for multiple gateways + pub async fn update_owners(pool: &PgPool, gateways: &[Gateway]) -> anyhow::Result { + if gateways.is_empty() { + return Ok(0); + } + + const MAX_ROWS: usize = 20000; + let mut total = 0; + + for chunk in gateways.chunks(MAX_ROWS) { + let mut qb = QueryBuilder::::new( + r#" + UPDATE gateways AS g + SET + owner = v.owner, + owner_changed_at = v.owner_changed_at + FROM ( + "#, + ); + + qb.push_values(chunk, |mut b, gw| { + b.push_bind(gw.address.as_ref()) + .push_bind(gw.owner.as_deref()) + .push_bind(gw.owner_changed_at); + }); + + qb.push( + r#" + ) AS v(address, owner, owner_changed_at) + WHERE g.address = v.address + AND g.inserted_at = ( + SELECT MAX(inserted_at) + FROM gateways + WHERE address = v.address + ) + "#, + ); + + let res = qb.build().execute(pool).await?; + total += res.rows_affected(); + } + + Ok(total) + } } impl FromRow<'_, PgRow> for Gateway { @@ -428,6 +528,8 @@ impl FromRow<'_, PgRow> for Gateway { location: to_u64(row.try_get("location")?), location_changed_at: row.try_get("location_changed_at")?, location_asserts: to_u32(row.try_get("location_asserts")?), + owner: row.try_get("owner")?, + owner_changed_at: row.try_get("owner_changed_at")?, }) } } diff --git a/mobile_config/src/gateway/metadata_db.rs b/mobile_config/src/gateway/metadata_db.rs index 983382532..5d0796e7b 100644 --- a/mobile_config/src/gateway/metadata_db.rs +++ b/mobile_config/src/gateway/metadata_db.rs @@ -21,6 +21,7 @@ pub struct MobileHotspotInfo { dc_onboarding_fee_paid: Option, device_type: DeviceType, deployment_info: Option, + owner: Option, } impl MobileHotspotInfo { @@ -88,10 +89,13 @@ impl MobileHotspotInfo { mhi.is_active, mhi.dc_onboarding_fee_paid::bigint, mhi.device_type::text, - mhi.deployment_info::text + mhi.deployment_info::text, + ao.owner FROM key_to_assets kta INNER JOIN mobile_hotspot_infos mhi ON kta.asset = mhi.asset + LEFT JOIN asset_owners ao ON + kta.asset = ao.asset WHERE kta.entity_key IS NOT NULL AND mhi.refreshed_at IS NOT NULL ORDER BY kta.entity_key, refreshed_at DESC @@ -140,6 +144,8 @@ impl MobileHotspotInfo { None }, location_asserts: self.num_location_asserts.map(|n| n as u32), + owner: self.owner.clone(), + owner_changed_at: Some(refreshed_at), })) } } @@ -171,6 +177,7 @@ impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for MobileHotspotInfo { num_location_asserts: row.get::, &str>("num_location_asserts"), is_active: row.get::, &str>("is_active"), dc_onboarding_fee_paid: row.get::, &str>("dc_onboarding_fee_paid"), + owner: row.get::, &str>("owner"), device_type, deployment_info, }) diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index afdd171d3..b1e0eff90 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -39,6 +39,10 @@ impl Tracker { tracing::info!("starting with interval: {:?}", self.interval); let mut interval = tokio::time::interval(self.interval); + if let Err(e) = backfill_gateway_owners(&self.pool, &self.metadata).await { + tracing::error!("backfill_gateway_owners is failed. {e}"); + } + loop { tokio::select! { biased; @@ -57,6 +61,78 @@ impl Tracker { } } +/// Post-migration function to backfill owner data for gateways with NULL owner. +/// Gets gateways where owner is NULL, streams mobile hotspot info, +/// and updates gateways.owner with owner_changed_at = last_changed_at +pub async fn backfill_gateway_owners( + pool: &Pool, + metadata: &Pool, +) -> anyhow::Result<()> { + tracing::info!("starting owner backfill"); + let start = Instant::now(); + + const BATCH_SIZE: usize = 1_000; + + // Get gateways where owner is NULL + let null_owner_gateways = Gateway::get_null_owners(pool).await?; + let null_owner_addresses: HashMap = null_owner_gateways + .into_iter() + .map(|gw| (gw.address.to_string(), gw)) + .collect(); + + tracing::info!( + "found {} gateways with null owners", + null_owner_addresses.len() + ); + if null_owner_addresses.is_empty() { + return Ok(()); + } + + // Stream mobile hotspot info and update matching gateways + let total: u64 = MobileHotspotInfo::stream(metadata) + .map_err(anyhow::Error::from) + .try_filter_map(|mhi| async move { + match mhi.to_gateway() { + Ok(Some(gw)) => Ok(Some(gw)), + Ok(None) => Ok(None), + Err(e) => { + tracing::error!(?e, "error converting gateway"); + Err(e) + } + } + }) + .try_chunks(BATCH_SIZE) + .map_err(|TryChunksError(_gateways, err)| err) + .try_fold(0, |total, batch| { + let null_owner_addresses = &null_owner_addresses; + async move { + let mut to_update = Vec::with_capacity(BATCH_SIZE); + + for gw in batch { + if let Some(existing_gw) = null_owner_addresses.get(&gw.address.to_string()) { + if let Some(owner) = gw.owner { + // Update gateway with owner from mobile hotspot info + let mut updated_gw = existing_gw.clone(); + updated_gw.owner = Some(owner); + // Set owner_changed_at = last_changed_at + updated_gw.owner_changed_at = Some(existing_gw.last_changed_at); + to_update.push(updated_gw); + } + } + } + + let affected = Gateway::update_owners(pool, &to_update).await?; + Ok(total + affected) + } + }) + .await?; + + let elapsed = start.elapsed(); + tracing::info!(?elapsed, updated = total, "done owner backfill"); + + Ok(()) +} + pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { tracing::info!("starting execute"); let start = Instant::now(); @@ -95,9 +171,18 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow } Some(last_gw) => { let loc_changed = gw.location != last_gw.location; + // FYI hash includes location + // owner (at this moment) is not included in hash let hash_changed = gw.hash != last_gw.hash; - // FYI hash includes location + let owner_changed = if gw.owner.is_none() { + false + } else { + gw.owner != last_gw.owner + }; + + // TODO at the second stage of implementing owner and owner_changed at + // last_changed_at should also be affected if owner was changed gw.last_changed_at = if hash_changed { gw.refreshed_at } else { @@ -110,9 +195,15 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow last_gw.location_changed_at }; + gw.owner_changed_at = if owner_changed { + Some(gw.refreshed_at) + } else { + last_gw.owner_changed_at + }; + // We only add record if something changed // FYI hash includes location - if hash_changed { + if hash_changed || owner_changed { to_insert.push(gw); } } diff --git a/mobile_config/tests/integrations/common/gateway_db.rs b/mobile_config/tests/integrations/common/gateway_db.rs index 7ca9dd2d0..7103ecaa7 100644 --- a/mobile_config/tests/integrations/common/gateway_db.rs +++ b/mobile_config/tests/integrations/common/gateway_db.rs @@ -1,8 +1,64 @@ use chrono::{DateTime, Utc}; -use helium_crypto::PublicKeyBinary; -use mobile_config::gateway::db::GatewayType; +use derive_builder::Builder; +use helium_crypto::{PublicKey, PublicKeyBinary}; +use mobile_config::gateway::db::{Gateway, GatewayType}; use sqlx::PgPool; +#[derive(Builder)] +#[builder(setter(into))] +pub struct TestGateway { + pub address: PublicKey, + pub gateway_type: GatewayType, + #[builder(default)] + created_at: DateTime, + #[builder(default)] + inserted_at: DateTime, + #[builder(default)] + refreshed_at: DateTime, + #[builder(default)] + last_changed_at: DateTime, + #[builder(default)] + hash: String, + #[builder(default = "Some(18)")] + antenna: Option, + #[builder(default)] + elevation: Option, + #[builder(default)] + azimuth: Option, + #[builder(default)] + location: Option, + #[builder(default)] + location_changed_at: Option>, + #[builder(default)] + location_asserts: Option, + #[builder(default)] + owner: Option, + #[builder(default)] + owner_changed_at: Option>, +} + +impl From for Gateway { + fn from(tg: TestGateway) -> Self { + Gateway { + address: tg.address.into(), + gateway_type: tg.gateway_type, + created_at: tg.created_at, + inserted_at: tg.inserted_at, + refreshed_at: tg.refreshed_at, + last_changed_at: tg.last_changed_at, + hash: tg.hash, + antenna: tg.antenna, + elevation: tg.elevation, + azimuth: tg.azimuth, + location: tg.location, + location_changed_at: tg.location_changed_at, + location_asserts: tg.location_asserts, + owner: tg.owner, + owner_changed_at: tg.owner_changed_at, + } + } +} + #[derive(Debug, Clone)] pub struct PreHistoricalGateway { pub address: PublicKeyBinary, diff --git a/mobile_config/tests/integrations/common/gateway_metadata_db.rs b/mobile_config/tests/integrations/common/gateway_metadata_db.rs index 40ea78a5e..d47a43a59 100644 --- a/mobile_config/tests/integrations/common/gateway_metadata_db.rs +++ b/mobile_config/tests/integrations/common/gateway_metadata_db.rs @@ -108,6 +108,54 @@ pub async fn update_gateway( Ok(()) } +pub async fn insert_asset_owner( + pool: &PgPool, + asset: &str, + owner: &str, + created_at: DateTime, + updated_at: DateTime, +) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO asset_owners ( + asset, owner, created_at, updated_at, last_block + ) + VALUES ($1, $2, $3, $4, 0) + "#, + ) + .bind(asset) + .bind(owner) + .bind(created_at) + .bind(updated_at) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn update_asset_owner( + pool: &PgPool, + asset: &str, + owner: &str, + updated_at: DateTime, +) -> anyhow::Result<()> { + sqlx::query( + r#" + UPDATE asset_owners + SET owner = $1, + updated_at = $2 + WHERE asset = $3 + "#, + ) + .bind(owner) + .bind(updated_at) + .bind(asset) + .execute(pool) + .await?; + + Ok(()) +} + pub async fn create_tables(pool: &PgPool) { sqlx::query( r#" @@ -138,4 +186,18 @@ pub async fn create_tables(pool: &PgPool) { .execute(pool) .await .unwrap(); + + sqlx::query( + r#" + CREATE TABLE asset_owners ( + asset character varying(255) NULL, + owner character varying(255) NULL, + created_at timestamptz, + updated_at timestamptz, + last_block integer + );"#, + ) + .execute(pool) + .await + .unwrap(); } diff --git a/mobile_config/tests/integrations/gateway_db.rs b/mobile_config/tests/integrations/gateway_db.rs index 4df7844fa..bb85a478e 100644 --- a/mobile_config/tests/integrations/gateway_db.rs +++ b/mobile_config/tests/integrations/gateway_db.rs @@ -201,5 +201,7 @@ fn gw(address: PublicKeyBinary, gateway_type: GatewayType, t: chrono::DateTime anyhow::Result<()> { let now = Utc::now(); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(now), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now) + .antenna(None) + .location(Some(loc1)) + .location_changed_at(Some(now)) + .location_asserts(Some(1)) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_plus_10, - inserted_at: now_plus_10, - refreshed_at: now_plus_10, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_plus_10), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_plus_10) + .inserted_at(now_plus_10) + .refreshed_at(now_plus_10) + .last_changed_at(now_plus_10) + .antenna(None) + .location(Some(loc2)) + .location_changed_at(Some(now_plus_10)) + .location_asserts(Some(1)) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -178,38 +174,34 @@ async fn gateway_stream_info_v2_by_type(pool: PgPool) -> anyhow::Result<()> { let now = Utc::now(); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(now), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now) + .antenna(None) + .location(Some(loc1)) + .location_changed_at(Some(now)) + .location_asserts(Some(1)) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_plus_10, - inserted_at: now_plus_10, - refreshed_at: now_plus_10, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_plus_10), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_plus_10) + .inserted_at(now_plus_10) + .refreshed_at(now_plus_10) + .last_changed_at(now_plus_10) + .antenna(None) + .location(Some(loc2)) + .location_changed_at(Some(now_plus_10)) + .location_asserts(Some(1)) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -249,72 +241,66 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { let created_at = Utc::now() - Duration::hours(5); let inserted_at = Utc::now() - Duration::hours(3); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .antenna(None) + .location(Some(loc1)) + .location_changed_at(Some(inserted_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: Some(1), - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .antenna(Some(1u32)) + .location(Some(loc2)) + .location_changed_at(Some(inserted_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway2.insert(&pool).await?; - let gateway3 = Gateway { - address: address3.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: Some(1), - elevation: Some(2), - azimuth: Some(3), - location: Some(loc3), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - }; + let gateway3: Gateway = TestGatewayBuilder::default() + .address(address3.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .antenna(Some(1)) + .elevation(Some(2)) + .azimuth(Some(3)) + .location(Some(loc3)) + .location_changed_at(Some(inserted_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway3.insert(&pool).await?; - let gateway4 = Gateway { - address: address4.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at, - inserted_at: created_at, - refreshed_at: created_at, - last_changed_at: created_at, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc4), - location_changed_at: Some(created_at), - location_asserts: Some(1), - }; + let gateway4: Gateway = TestGatewayBuilder::default() + .address(address4.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(created_at) + .inserted_at(created_at) + .refreshed_at(created_at) + .last_changed_at(created_at) + .antenna(None) + .location(Some(loc4)) + .location_changed_at(Some(created_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway4.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -415,6 +401,8 @@ async fn gateway_info_batch_v1(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -432,6 +420,8 @@ async fn gateway_info_batch_v1(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -501,6 +491,8 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -518,6 +510,8 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -610,6 +604,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc1), location_changed_at: Some(refreshed_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -627,6 +623,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -644,6 +642,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc3), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway3.insert(&pool).await?; @@ -661,6 +661,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc4), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway4.insert(&pool).await?; @@ -740,6 +742,8 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(refreshed_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -757,6 +761,8 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -837,6 +843,8 @@ async fn gateway_info_at_timestamp(pool: PgPool) -> anyhow::Result<()> { location: Some(loc_original), location_changed_at: Some(refreshed_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway_original.insert(&pool).await?; @@ -862,6 +870,8 @@ async fn gateway_info_at_timestamp(pool: PgPool) -> anyhow::Result<()> { location: Some(loc_recent), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + owner_changed_at: Some(created_at), }; gateway_recent.insert(&pool).await?; diff --git a/mobile_config/tests/integrations/gateway_service_v3.rs b/mobile_config/tests/integrations/gateway_service_v3.rs index 1ee5b311e..704e71c60 100644 --- a/mobile_config/tests/integrations/gateway_service_v3.rs +++ b/mobile_config/tests/integrations/gateway_service_v3.rs @@ -1,4 +1,4 @@ -use crate::common::{make_keypair, spawn_gateway_service}; +use crate::common::{gateway_db::TestGatewayBuilder, make_keypair, spawn_gateway_service}; use chrono::{Duration, Utc}; use futures::stream::StreamExt; use helium_crypto::{Keypair, Sign}; @@ -25,38 +25,34 @@ async fn gateway_stream_info_v3_basic(pool: PgPool) -> anyhow::Result<()> { let now_plus_5 = now + Duration::seconds(5); let now_plus_10 = now + Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: Some(18), - elevation: Some(2), - azimuth: Some(161), - location: Some(loc1), - location_changed_at: Some(now_plus_5), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now_plus_10) + .elevation(2) + .azimuth(161) + .location(loc1) + .location_changed_at(now_plus_5) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiOutdoor, - created_at: now_plus_10, - inserted_at: now_plus_10, - refreshed_at: now_plus_10, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_plus_10), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiOutdoor) + .created_at(now_plus_10) + .inserted_at(now_plus_10) + .refreshed_at(now_plus_10) + .last_changed_at(now_plus_10) + .location(loc2) + .location_changed_at(now_plus_10) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -103,21 +99,15 @@ async fn gateway_stream_info_v3_no_metadata(pool: PgPool) -> anyhow::Result<()> let now = Utc::now(); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: None, - location_changed_at: None, - location_asserts: None, - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now_plus_10) + .build()? + .into(); gateway1.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -149,21 +139,19 @@ async fn gateway_stream_info_v3_no_deployment_info(pool: PgPool) -> anyhow::Resu let now_plus_5 = now + chrono::Duration::seconds(5); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(now_plus_5), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now_plus_10) + .antenna(None) + .location(loc1) + .location_changed_at(now_plus_5) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -207,38 +195,34 @@ async fn gateway_stream_info_v3_updated_at(pool: PgPool) -> anyhow::Result<()> { let created_at = Utc::now() - Duration::hours(5); let inserted_at = Utc::now() - Duration::hours(3); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: Some(18), - elevation: Some(2), - azimuth: Some(161), - location: Some(loc1), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .elevation(2) + .azimuth(161) + .location(loc1) + .location_changed_at(inserted_at) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at, - inserted_at: created_at, - refreshed_at: created_at, - last_changed_at: created_at, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(created_at), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(created_at) + .inserted_at(created_at) + .refreshed_at(created_at) + .last_changed_at(created_at) + .location(loc2) + .location_changed_at(created_at) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -295,38 +279,29 @@ async fn gateway_stream_info_v3_min_location_changed_at_zero(pool: PgPool) -> an let now_minus_four = now - Duration::hours(4); let now_minus_three = now - Duration::hours(3); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now_minus_six, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: None, - location_changed_at: None, - location_asserts: None, - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now_minus_six) + .last_changed_at(now_minus_three) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now_minus_six, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_minus_four), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now_minus_six) + .last_changed_at(now_minus_three) + .location(loc2) + .location_changed_at(now_minus_four) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -366,38 +341,34 @@ async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) -> anyhow::Res // asset_2 location changed at now - 4 hours // request min_location_changed_at location changed at now - 5 hour - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: Some(18), - elevation: Some(2), - azimuth: Some(161), - location: Some(loc1), - location_changed_at: Some(now_minus_six), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now) + .last_changed_at(now_minus_three) + .elevation(2) + .azimuth(161) + .location(loc1) + .location_changed_at(now_minus_six) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_minus_four), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now) + .last_changed_at(now_minus_three) + .location(loc2) + .location_changed_at(now_minus_four) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; diff --git a/mobile_config/tests/integrations/gateway_tracker.rs b/mobile_config/tests/integrations/gateway_tracker.rs index c356553ec..7f05ac1fb 100644 --- a/mobile_config/tests/integrations/gateway_tracker.rs +++ b/mobile_config/tests/integrations/gateway_tracker.rs @@ -118,3 +118,263 @@ async fn count_gateways(pool: &PgPool) -> anyhow::Result { Ok(count) } + +#[sqlx::test] +async fn test_gateway_tracker_owner_tracking(pool: PgPool) -> anyhow::Result<()> { + let now = Utc::now() + .with_nanosecond(Utc::now().timestamp_subsec_micros() * 1000) + .unwrap(); + + // ensure tables exist + gateway_metadata_db::create_tables(&pool).await; + + // Create a gateway with owner information + let pubkey: helium_crypto::PublicKeyBinary = make_keypair().public_key().clone().into(); + let asset = "test_asset_001".to_string(); + let initial_owner = "owner1_address".to_string(); + let hex_val = 631_711_281_837_647_359_i64; + + let gateway = gateway_metadata_db::GatewayInsert { + asset: asset.clone(), + location: Some(hex_val), + device_type: "\"wifiIndoor\"".to_string(), + key: pubkey.clone(), + created_at: now, + refreshed_at: Some(now), + deployment_info: None, + }; + + // Insert the gateway into mobile_hotspot_infos and key_to_assets + gateway_metadata_db::insert_gateway_bulk(&pool, &[gateway], 1000).await?; + + // Insert the owner into asset_owners table + gateway_metadata_db::insert_asset_owner(&pool, &asset, &initial_owner, now, now).await?; + + // Run the tracker execute function + tracker::execute(&pool, &pool).await?; + + // Verify the gateway was created with the correct owner + let retrieved_gateway = Gateway::get_by_address(&pool, &pubkey) + .await? + .expect("gateway not found"); + + assert_eq!(retrieved_gateway.address, pubkey.clone()); + assert_eq!(retrieved_gateway.gateway_type, GatewayType::WifiIndoor); + assert_eq!(retrieved_gateway.owner, Some(initial_owner.clone())); + assert_eq!(retrieved_gateway.owner_changed_at, Some(now)); + + // Count gateways before owner change + let count_before = count_gateways(&pool).await?; + assert_eq!(1, count_before); + + // Update the owner in asset_owners table + let new_owner = "owner2_address".to_string(); + let update_time = now + chrono::Duration::hours(1); + + // Update the refreshed_at time in mobile_hotspot_infos to simulate a new update + gateway_metadata_db::update_gateway(&pool, &asset, hex_val, update_time, 1).await?; + + gateway_metadata_db::update_asset_owner(&pool, &asset, &new_owner, update_time).await?; + + // Run tracker::execute again + tracker::execute(&pool, &pool).await?; + + // Verify a new record was created after owner change + let count_after = count_gateways(&pool).await?; + assert_eq!( + 2, count_after, + "A new record should be created when owner changes" + ); + + // Verify the owner and owner_changed_at were updated + let updated_gateway = Gateway::get_by_address(&pool, &pubkey) + .await? + .expect("gateway not found"); + + assert_eq!(updated_gateway.address, pubkey.clone()); + assert_eq!(updated_gateway.owner, Some(new_owner.clone())); + assert_eq!(updated_gateway.owner_changed_at, Some(update_time)); + // last_changed_at should also be updated since owner changed (In next owner implementing stage). + // But currently, it stays the same. After full implementation change `now` to `update_time` + assert_eq!(updated_gateway.last_changed_at, now); + + Ok(()) +} + +#[sqlx::test] +async fn test_backfill_gateway_owners(pool: PgPool) -> anyhow::Result<()> { + let now = Utc::now() + .with_nanosecond(Utc::now().timestamp_subsec_micros() * 1000) + .unwrap(); + + // ensure tables exist + gateway_metadata_db::create_tables(&pool).await; + + // Create gateways without owners initially + let gateway1_pubkey: helium_crypto::PublicKeyBinary = + make_keypair().public_key().clone().into(); + let gateway2_pubkey: helium_crypto::PublicKeyBinary = + make_keypair().public_key().clone().into(); + + let asset1 = "test_asset_backfill_001".to_string(); + let asset2 = "test_asset_backfill_002".to_string(); + let hex_val = 631_711_281_837_647_359_i64; + + let gateways = vec![ + gateway_metadata_db::GatewayInsert { + asset: asset1.clone(), + location: Some(hex_val), + device_type: "\"wifiIndoor\"".to_string(), + key: gateway1_pubkey.clone(), + created_at: now, + refreshed_at: Some(now), + deployment_info: None, + }, + gateway_metadata_db::GatewayInsert { + asset: asset2.clone(), + location: Some(hex_val + 1), + device_type: "\"wifiOutdoor\"".to_string(), + key: gateway2_pubkey.clone(), + created_at: now, + refreshed_at: Some(now), + deployment_info: None, + }, + ]; + + // Directly insert gateways into the gateways table WITHOUT owners + // This simulates the state before the owner tracking feature was added + let gw1 = Gateway { + address: gateway1_pubkey.clone(), + gateway_type: GatewayType::WifiIndoor, + created_at: now, + inserted_at: now, + refreshed_at: now, + last_changed_at: now, + hash: "test_hash_1".to_string(), + antenna: None, + elevation: None, + azimuth: None, + location: Some(hex_val as u64), + location_changed_at: Some(now), + location_asserts: Some(1), + owner: None, + owner_changed_at: None, + }; + + let gw2 = Gateway { + address: gateway2_pubkey.clone(), + gateway_type: GatewayType::WifiOutdoor, + created_at: now, + inserted_at: now, + refreshed_at: now, + last_changed_at: now, + hash: "test_hash_2".to_string(), + antenna: None, + elevation: None, + azimuth: None, + location: Some((hex_val + 1) as u64), + location_changed_at: Some(now), + location_asserts: Some(1), + owner: None, + owner_changed_at: None, + }; + + Gateway::insert_bulk(&pool, &[gw1.clone(), gw2.clone()]).await?; + + // Verify gateways were created with NULL owners + let gw1_before = Gateway::get_by_address(&pool, &gateway1_pubkey) + .await? + .expect("gateway 1 not found"); + let gw2_before = Gateway::get_by_address(&pool, &gateway2_pubkey) + .await? + .expect("gateway 2 not found"); + + assert_eq!(gw1_before.owner, None, "Gateway 1 should have NULL owner"); + assert_eq!( + gw1_before.owner_changed_at, None, + "Gateway 1 should have NULL owner_changed_at" + ); + assert_eq!(gw2_before.owner, None, "Gateway 2 should have NULL owner"); + assert_eq!( + gw2_before.owner_changed_at, None, + "Gateway 2 should have NULL owner_changed_at" + ); + + // Insert gateways into metadata tables with owners + gateway_metadata_db::insert_gateway_bulk(&pool, &gateways, 1000).await?; + + // Now add owners to the metadata database + let owner1 = "owner1_backfill_address".to_string(); + let owner2 = "owner2_backfill_address".to_string(); + + gateway_metadata_db::insert_asset_owner(&pool, &asset1, &owner1, now, now).await?; + gateway_metadata_db::insert_asset_owner(&pool, &asset2, &owner2, now, now).await?; + + // Count gateway records before backfill + let count_before = count_gateways(&pool).await?; + assert_eq!( + 2, count_before, + "Should have 2 gateway records before backfill" + ); + + // Run backfill_gateway_owners + tracker::backfill_gateway_owners(&pool, &pool).await?; + + // Verify no new records were added (backfill should UPDATE, not INSERT) + let count_after = count_gateways(&pool).await?; + assert_eq!( + count_before, count_after, + "Backfill should not create new records, only update existing ones" + ); + + // Verify owners were updated + let gw1_after = Gateway::get_by_address(&pool, &gateway1_pubkey) + .await? + .expect("gateway 1 not found after backfill"); + let gw2_after = Gateway::get_by_address(&pool, &gateway2_pubkey) + .await? + .expect("gateway 2 not found after backfill"); + + assert_eq!( + gw1_after.owner, + Some(owner1.clone()), + "Gateway 1 owner should be updated" + ); + assert_eq!( + gw1_after.owner_changed_at, + Some(gw1_before.last_changed_at), + "Gateway 1 owner_changed_at should be set to last_changed_at" + ); + + assert_eq!( + gw2_after.owner, + Some(owner2.clone()), + "Gateway 2 owner should be updated" + ); + assert_eq!( + gw2_after.owner_changed_at, + Some(gw2_before.last_changed_at), + "Gateway 2 owner_changed_at should be set to last_changed_at" + ); + + // Verify that other fields remain unchanged + assert_eq!(gw1_after.address, gw1_before.address); + assert_eq!(gw1_after.gateway_type, gw1_before.gateway_type); + assert_eq!(gw1_after.last_changed_at, gw1_before.last_changed_at); + assert_eq!(gw1_after.location, gw1_before.location); + assert_eq!( + gw1_after.hash, gw1_before.hash, + "Hash should not change during backfill" + ); + + assert_eq!(gw2_after.address, gw2_before.address); + assert_eq!(gw2_after.gateway_type, gw2_before.gateway_type); + assert_eq!(gw2_after.last_changed_at, gw2_before.last_changed_at); + assert_eq!(gw2_after.location, gw2_before.location); + assert_eq!( + gw2_after.hash, gw2_before.hash, + "Hash should not change during backfill" + ); + + Ok(()) +}