From ee62432a3e11e3c104bae0baf5107047c6982c84 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 9 Dec 2025 19:42:47 +0200 Subject: [PATCH 01/17] Add migration: drop_mobile_radio_tracker.sql --- .../migrations/20251209173943_drop_mobile_radio_tracker.sql | 1 + 1 file changed, 1 insertion(+) create mode 100644 mobile_config/migrations/20251209173943_drop_mobile_radio_tracker.sql diff --git a/mobile_config/migrations/20251209173943_drop_mobile_radio_tracker.sql b/mobile_config/migrations/20251209173943_drop_mobile_radio_tracker.sql new file mode 100644 index 000000000..8f6af103b --- /dev/null +++ b/mobile_config/migrations/20251209173943_drop_mobile_radio_tracker.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS mobile_radio_tracker; From 87d80bb9e68499039279c19d6e6816ed2406fdda Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 15 Dec 2025 15:24:32 +0200 Subject: [PATCH 02/17] Add add_owner_hash_to_gateways migration --- .../migrations/20251215132218_add_owner_hash_to_gateways.sql | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql diff --git a/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql b/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql new file mode 100644 index 000000000..50d233759 --- /dev/null +++ b/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql @@ -0,0 +1,3 @@ +ALTER TABLE gateways +ADD COLUMN owner VARCHAR(255), +ADD COLUMN hash_v2 TEXT; From 1e4d9ae040782daeb884e2db5b814feb75c12ea8 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 15 Dec 2025 15:34:43 +0200 Subject: [PATCH 03/17] Update db func --- mobile_config/src/gateway/db.rs | 40 +++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/mobile_config/src/gateway/db.rs b/mobile_config/src/gateway/db.rs index 7821f752e..c086837e0 100644 --- a/mobile_config/src/gateway/db.rs +++ b/mobile_config/src/gateway/db.rs @@ -95,6 +95,8 @@ pub struct Gateway { // When location last changed, set to refreshed_at (updated via SQL query see Gateway::insert) pub location_changed_at: Option>, pub location_asserts: Option, + pub owner: Option, + pub hash_v2: Option, } #[derive(Debug)] @@ -122,7 +124,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + hash_v2 ) ", ); @@ -138,7 +142,9 @@ impl Gateway { .push_bind(g.azimuth.map(|v| v as i64)) .push_bind(g.location.map(|v| v as i64)) .push_bind(g.location_changed_at) - .push_bind(g.location_asserts.map(|v| v as i64)); + .push_bind(g.location_asserts.map(|v| v as i64)) + .push_bind(g.owner.as_deref()) + .push_bind(g.hash_v2.as_deref()); }); let res = qb.build().execute(pool).await?; @@ -160,11 +166,13 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + hash_v2 ) VALUES ( $1, $2, $3, $4, $5, $6, $7, - $8, $9, $10, $11, $12 + $8, $9, $10, $11, $12, $13, $14 ) "#, ) @@ -180,6 +188,8 @@ impl Gateway { .bind(self.location.map(|v| v as i64)) .bind(self.location_changed_at) .bind(self.location_asserts.map(|v| v as i64)) + .bind(self.owner.as_deref()) + .bind(self.hash_v2.as_deref()) .execute(pool) .await?; @@ -205,7 +215,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + hash_v2 FROM gateways WHERE address = $1 ORDER BY inserted_at DESC @@ -240,7 +252,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + hash_v2 FROM gateways WHERE address = ANY($1) ORDER BY address, inserted_at DESC @@ -273,7 +287,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + hash_v2 FROM gateways WHERE address = $1 AND inserted_at <= $2 @@ -311,7 +327,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + hash_v2 FROM gateways WHERE address = ANY($1) AND last_changed_at >= $2 @@ -346,7 +364,9 @@ impl Gateway { azimuth, location, location_changed_at, - location_asserts + location_asserts, + owner, + hash_v2 FROM gateways WHERE gateway_type = ANY($1) AND last_changed_at >= $2 @@ -428,6 +448,8 @@ impl FromRow<'_, PgRow> for Gateway { location: to_u64(row.try_get("location")?), location_changed_at: row.try_get("location_changed_at")?, location_asserts: to_u32(row.try_get("location_asserts")?), + owner: row.try_get("owner")?, + hash_v2: row.try_get("hash_v2")?, }) } } From 397eafd63415da2f9cdbcb6eee8d53cd8661b757 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 15 Dec 2025 16:12:36 +0200 Subject: [PATCH 04/17] Fix and refactor tests --- Cargo.lock | 1 + mobile_config/Cargo.toml | 1 + mobile_config/src/gateway/metadata_db.rs | 2 + .../tests/integrations/gateway_db.rs | 2 + .../tests/integrations/gateway_service.rs | 40 +++ .../tests/integrations/gateway_service_v3.rs | 331 ++++++++++-------- 6 files changed, 225 insertions(+), 152 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 980c5993f..945f6f844 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4926,6 +4926,7 @@ dependencies = [ "csv", "custom-tracing", "db-store", + "derive_builder", "file-store", "file-store-oracles", "futures", diff --git a/mobile_config/Cargo.toml b/mobile_config/Cargo.toml index 8efce3730..879dee525 100644 --- a/mobile_config/Cargo.toml +++ b/mobile_config/Cargo.toml @@ -60,6 +60,7 @@ task-manager = { path = "../task_manager" } tls-init = { path = "../tls_init" } [dev-dependencies] +derive_builder = "0.20" rand = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } tempfile = "3" diff --git a/mobile_config/src/gateway/metadata_db.rs b/mobile_config/src/gateway/metadata_db.rs index 983382532..68daadd11 100644 --- a/mobile_config/src/gateway/metadata_db.rs +++ b/mobile_config/src/gateway/metadata_db.rs @@ -140,6 +140,8 @@ impl MobileHotspotInfo { None }, location_asserts: self.num_location_asserts.map(|n| n as u32), + owner: None, // TODO + hash_v2: None, })) } } diff --git a/mobile_config/tests/integrations/gateway_db.rs b/mobile_config/tests/integrations/gateway_db.rs index 4df7844fa..c04aa33f4 100644 --- a/mobile_config/tests/integrations/gateway_db.rs +++ b/mobile_config/tests/integrations/gateway_db.rs @@ -201,5 +201,7 @@ fn gw(address: PublicKeyBinary, gateway_type: GatewayType, t: chrono::DateTime anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(now), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway1.insert(&pool).await?; @@ -144,6 +146,8 @@ async fn gateway_stream_info_v1(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(now_plus_10), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway2.insert(&pool).await?; @@ -192,6 +196,8 @@ async fn gateway_stream_info_v2_by_type(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(now), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway1.insert(&pool).await?; @@ -209,6 +215,8 @@ async fn gateway_stream_info_v2_by_type(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(now_plus_10), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway2.insert(&pool).await?; @@ -263,6 +271,8 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway1.insert(&pool).await?; @@ -280,6 +290,8 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway2.insert(&pool).await?; @@ -297,6 +309,8 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc3), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway3.insert(&pool).await?; @@ -314,6 +328,8 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc4), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway4.insert(&pool).await?; @@ -415,6 +431,8 @@ async fn gateway_info_batch_v1(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway1.insert(&pool).await?; @@ -432,6 +450,8 @@ async fn gateway_info_batch_v1(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway2.insert(&pool).await?; @@ -501,6 +521,8 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway1.insert(&pool).await?; @@ -518,6 +540,8 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway2.insert(&pool).await?; @@ -610,6 +634,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc1), location_changed_at: Some(refreshed_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway1.insert(&pool).await?; @@ -627,6 +653,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway2.insert(&pool).await?; @@ -644,6 +672,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc3), location_changed_at: Some(inserted_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway3.insert(&pool).await?; @@ -661,6 +691,8 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location: Some(loc4), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway4.insert(&pool).await?; @@ -740,6 +772,8 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc1), location_changed_at: Some(refreshed_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway1.insert(&pool).await?; @@ -757,6 +791,8 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { location: Some(loc2), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway2.insert(&pool).await?; @@ -837,6 +873,8 @@ async fn gateway_info_at_timestamp(pool: PgPool) -> anyhow::Result<()> { location: Some(loc_original), location_changed_at: Some(refreshed_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway_original.insert(&pool).await?; @@ -862,6 +900,8 @@ async fn gateway_info_at_timestamp(pool: PgPool) -> anyhow::Result<()> { location: Some(loc_recent), location_changed_at: Some(created_at), location_asserts: Some(1), + owner: None, + hash_v2: None, }; gateway_recent.insert(&pool).await?; diff --git a/mobile_config/tests/integrations/gateway_service_v3.rs b/mobile_config/tests/integrations/gateway_service_v3.rs index 1ee5b311e..806ef40e7 100644 --- a/mobile_config/tests/integrations/gateway_service_v3.rs +++ b/mobile_config/tests/integrations/gateway_service_v3.rs @@ -1,7 +1,8 @@ use crate::common::{make_keypair, spawn_gateway_service}; -use chrono::{Duration, Utc}; +use chrono::{DateTime, Duration, Utc}; +use derive_builder::Builder; use futures::stream::StreamExt; -use helium_crypto::{Keypair, Sign}; +use helium_crypto::{Keypair, PublicKey, Sign}; use helium_proto::services::mobile_config::{ self as proto, DeploymentInfo, DeviceTypeV2, GatewayClient, GatewayInfoStreamReqV3, GatewayInfoV3, LocationInfo, @@ -11,6 +12,61 @@ use prost::Message; use sqlx::PgPool; use std::vec; +#[derive(Builder)] +#[builder(setter(into))] +struct TestGateway { + address: PublicKey, + gateway_type: GatewayType, + #[builder(default)] + created_at: DateTime, + #[builder(default)] + inserted_at: DateTime, + #[builder(default)] + refreshed_at: DateTime, + #[builder(default)] + last_changed_at: DateTime, + #[builder(default)] + hash: String, + #[builder(default = "Some(18)")] + antenna: Option, + #[builder(default)] + elevation: Option, + #[builder(default)] + azimuth: Option, + #[builder(default)] + location: Option, + #[builder(default)] + location_changed_at: Option>, + #[builder(default)] + location_asserts: Option, + #[builder(default)] + owner: Option, + #[builder(default)] + hash_v2: Option, +} + +impl From for Gateway { + fn from(tg: TestGateway) -> Self { + Gateway { + address: tg.address.into(), + gateway_type: tg.gateway_type, + created_at: tg.created_at, + inserted_at: tg.inserted_at, + refreshed_at: tg.refreshed_at, + last_changed_at: tg.last_changed_at, + hash: tg.hash, + antenna: tg.antenna, + elevation: tg.elevation, + azimuth: tg.azimuth, + location: tg.location, + location_changed_at: tg.location_changed_at, + location_asserts: tg.location_asserts, + owner: tg.owner, + hash_v2: tg.hash_v2, + } + } +} + #[sqlx::test] async fn gateway_stream_info_v3_basic(pool: PgPool) -> anyhow::Result<()> { let admin_key = make_keypair(); @@ -25,38 +81,34 @@ async fn gateway_stream_info_v3_basic(pool: PgPool) -> anyhow::Result<()> { let now_plus_5 = now + Duration::seconds(5); let now_plus_10 = now + Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: Some(18), - elevation: Some(2), - azimuth: Some(161), - location: Some(loc1), - location_changed_at: Some(now_plus_5), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now_plus_10) + .elevation(2) + .azimuth(161) + .location(loc1) + .location_changed_at(now_plus_5) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiOutdoor, - created_at: now_plus_10, - inserted_at: now_plus_10, - refreshed_at: now_plus_10, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_plus_10), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiOutdoor) + .created_at(now_plus_10) + .inserted_at(now_plus_10) + .refreshed_at(now_plus_10) + .last_changed_at(now_plus_10) + .location(loc2) + .location_changed_at(now_plus_10) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -103,21 +155,15 @@ async fn gateway_stream_info_v3_no_metadata(pool: PgPool) -> anyhow::Result<()> let now = Utc::now(); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: None, - location_changed_at: None, - location_asserts: None, - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now_plus_10) + .build()? + .into(); gateway1.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -149,21 +195,19 @@ async fn gateway_stream_info_v3_no_deployment_info(pool: PgPool) -> anyhow::Resu let now_plus_5 = now + chrono::Duration::seconds(5); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(now_plus_5), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now_plus_10) + .antenna(None) + .location(loc1) + .location_changed_at(now_plus_5) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -207,38 +251,34 @@ async fn gateway_stream_info_v3_updated_at(pool: PgPool) -> anyhow::Result<()> { let created_at = Utc::now() - Duration::hours(5); let inserted_at = Utc::now() - Duration::hours(3); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: Some(18), - elevation: Some(2), - azimuth: Some(161), - location: Some(loc1), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .elevation(2) + .azimuth(161) + .location(loc1) + .location_changed_at(inserted_at) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at, - inserted_at: created_at, - refreshed_at: created_at, - last_changed_at: created_at, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(created_at), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(created_at) + .inserted_at(created_at) + .refreshed_at(created_at) + .last_changed_at(created_at) + .location(loc2) + .location_changed_at(created_at) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -295,38 +335,29 @@ async fn gateway_stream_info_v3_min_location_changed_at_zero(pool: PgPool) -> an let now_minus_four = now - Duration::hours(4); let now_minus_three = now - Duration::hours(3); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now_minus_six, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: None, - location_changed_at: None, - location_asserts: None, - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now_minus_six) + .last_changed_at(now_minus_three) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now_minus_six, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_minus_four), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now_minus_six) + .last_changed_at(now_minus_three) + .location(loc2) + .location_changed_at(now_minus_four) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -366,38 +397,34 @@ async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) -> anyhow::Res // asset_2 location changed at now - 4 hours // request min_location_changed_at location changed at now - 5 hour - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: Some(18), - elevation: Some(2), - azimuth: Some(161), - location: Some(loc1), - location_changed_at: Some(now_minus_six), - location_asserts: Some(1), - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now) + .last_changed_at(now_minus_three) + .elevation(2u32) + .azimuth(161u32) + .location(loc1) + .location_changed_at(now_minus_six) + .location_asserts(1) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_minus_six, - inserted_at: now_minus_six, - refreshed_at: now, - last_changed_at: now_minus_three, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_minus_four), - location_asserts: Some(1), - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_minus_six) + .inserted_at(now_minus_six) + .refreshed_at(now) + .last_changed_at(now_minus_three) + .location(loc2) + .location_changed_at(now_minus_four) + .location_asserts(1) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; From 746b95e06f4d0fba4d9a7d0c9631de77c783200a Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Mon, 15 Dec 2025 17:42:19 +0200 Subject: [PATCH 05/17] Use gateway builder in tests --- .../tests/integrations/common/gateway_db.rs | 60 ++++- .../tests/integrations/gateway_service.rs | 244 ++++++++---------- .../tests/integrations/gateway_service_v3.rs | 66 +---- 3 files changed, 170 insertions(+), 200 deletions(-) diff --git a/mobile_config/tests/integrations/common/gateway_db.rs b/mobile_config/tests/integrations/common/gateway_db.rs index 7ca9dd2d0..f93ef3967 100644 --- a/mobile_config/tests/integrations/common/gateway_db.rs +++ b/mobile_config/tests/integrations/common/gateway_db.rs @@ -1,8 +1,64 @@ use chrono::{DateTime, Utc}; -use helium_crypto::PublicKeyBinary; -use mobile_config::gateway::db::GatewayType; +use derive_builder::Builder; +use helium_crypto::{PublicKey, PublicKeyBinary}; +use mobile_config::gateway::db::{Gateway, GatewayType}; use sqlx::PgPool; +#[derive(Builder)] +#[builder(setter(into))] +pub struct TestGateway { + pub address: PublicKey, + pub gateway_type: GatewayType, + #[builder(default)] + created_at: DateTime, + #[builder(default)] + inserted_at: DateTime, + #[builder(default)] + refreshed_at: DateTime, + #[builder(default)] + last_changed_at: DateTime, + #[builder(default)] + hash: String, + #[builder(default = "Some(18)")] + antenna: Option, + #[builder(default)] + elevation: Option, + #[builder(default)] + azimuth: Option, + #[builder(default)] + location: Option, + #[builder(default)] + location_changed_at: Option>, + #[builder(default)] + location_asserts: Option, + #[builder(default)] + owner: Option, + #[builder(default)] + hash_v2: Option, +} + +impl From for Gateway { + fn from(tg: TestGateway) -> Self { + Gateway { + address: tg.address.into(), + gateway_type: tg.gateway_type, + created_at: tg.created_at, + inserted_at: tg.inserted_at, + refreshed_at: tg.refreshed_at, + last_changed_at: tg.last_changed_at, + hash: tg.hash, + antenna: tg.antenna, + elevation: tg.elevation, + azimuth: tg.azimuth, + location: tg.location, + location_changed_at: tg.location_changed_at, + location_asserts: tg.location_asserts, + owner: tg.owner, + hash_v2: tg.hash_v2, + } + } +} + #[derive(Debug, Clone)] pub struct PreHistoricalGateway { pub address: PublicKeyBinary, diff --git a/mobile_config/tests/integrations/gateway_service.rs b/mobile_config/tests/integrations/gateway_service.rs index 891927330..9a6bff5d8 100644 --- a/mobile_config/tests/integrations/gateway_service.rs +++ b/mobile_config/tests/integrations/gateway_service.rs @@ -1,4 +1,4 @@ -use crate::common::{make_keypair, spawn_gateway_service}; +use crate::common::{gateway_db::TestGatewayBuilder, make_keypair, spawn_gateway_service}; use chrono::{DateTime, Duration, Utc}; use futures::stream::StreamExt; use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; @@ -113,42 +113,34 @@ async fn gateway_stream_info_v1(pool: PgPool) -> anyhow::Result<()> { let now = Utc::now(); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(now), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now) + .antenna(None) + .location(Some(loc1)) + .location_changed_at(Some(now)) + .location_asserts(Some(1)) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_plus_10, - inserted_at: now_plus_10, - refreshed_at: now_plus_10, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_plus_10), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_plus_10) + .inserted_at(now_plus_10) + .refreshed_at(now_plus_10) + .last_changed_at(now_plus_10) + .antenna(None) + .location(Some(loc2)) + .location_changed_at(Some(now_plus_10)) + .location_asserts(Some(1)) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -182,42 +174,34 @@ async fn gateway_stream_info_v2_by_type(pool: PgPool) -> anyhow::Result<()> { let now = Utc::now(); let now_plus_10 = now + chrono::Duration::seconds(10); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at: now, - inserted_at: now, - refreshed_at: now, - last_changed_at: now, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(now), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(now) + .inserted_at(now) + .refreshed_at(now) + .last_changed_at(now) + .antenna(None) + .location(Some(loc1)) + .location_changed_at(Some(now)) + .location_asserts(Some(1)) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at: now_plus_10, - inserted_at: now_plus_10, - refreshed_at: now_plus_10, - last_changed_at: now_plus_10, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(now_plus_10), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(now_plus_10) + .inserted_at(now_plus_10) + .refreshed_at(now_plus_10) + .last_changed_at(now_plus_10) + .antenna(None) + .location(Some(loc2)) + .location_changed_at(Some(now_plus_10)) + .location_asserts(Some(1)) + .build()? + .into(); gateway2.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; @@ -257,80 +241,66 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { let created_at = Utc::now() - Duration::hours(5); let inserted_at = Utc::now() - Duration::hours(3); - let gateway1 = Gateway { - address: address1.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc1), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway1: Gateway = TestGatewayBuilder::default() + .address(address1.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .antenna(None) + .location(Some(loc1)) + .location_changed_at(Some(inserted_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway1.insert(&pool).await?; - let gateway2 = Gateway { - address: address2.clone().into(), - gateway_type: GatewayType::WifiIndoor, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: Some(1), - elevation: None, - azimuth: None, - location: Some(loc2), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway2: Gateway = TestGatewayBuilder::default() + .address(address2.clone()) + .gateway_type(GatewayType::WifiIndoor) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .antenna(Some(1u32)) + .location(Some(loc2)) + .location_changed_at(Some(inserted_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway2.insert(&pool).await?; - let gateway3 = Gateway { - address: address3.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at, - inserted_at, - refreshed_at: inserted_at, - last_changed_at: inserted_at, - hash: "".to_string(), - antenna: Some(1), - elevation: Some(2), - azimuth: Some(3), - location: Some(loc3), - location_changed_at: Some(inserted_at), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway3: Gateway = TestGatewayBuilder::default() + .address(address3.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(created_at) + .inserted_at(inserted_at) + .refreshed_at(inserted_at) + .last_changed_at(inserted_at) + .antenna(Some(1)) + .elevation(Some(2)) + .azimuth(Some(3)) + .location(Some(loc3)) + .location_changed_at(Some(inserted_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway3.insert(&pool).await?; - let gateway4 = Gateway { - address: address4.clone().into(), - gateway_type: GatewayType::WifiDataOnly, - created_at, - inserted_at: created_at, - refreshed_at: created_at, - last_changed_at: created_at, - hash: "".to_string(), - antenna: None, - elevation: None, - azimuth: None, - location: Some(loc4), - location_changed_at: Some(created_at), - location_asserts: Some(1), - owner: None, - hash_v2: None, - }; + let gateway4: Gateway = TestGatewayBuilder::default() + .address(address4.clone()) + .gateway_type(GatewayType::WifiDataOnly) + .created_at(created_at) + .inserted_at(created_at) + .refreshed_at(created_at) + .last_changed_at(created_at) + .antenna(None) + .location(Some(loc4)) + .location_changed_at(Some(created_at)) + .location_asserts(Some(1)) + .build()? + .into(); gateway4.insert(&pool).await?; let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; diff --git a/mobile_config/tests/integrations/gateway_service_v3.rs b/mobile_config/tests/integrations/gateway_service_v3.rs index 806ef40e7..704e71c60 100644 --- a/mobile_config/tests/integrations/gateway_service_v3.rs +++ b/mobile_config/tests/integrations/gateway_service_v3.rs @@ -1,8 +1,7 @@ -use crate::common::{make_keypair, spawn_gateway_service}; -use chrono::{DateTime, Duration, Utc}; -use derive_builder::Builder; +use crate::common::{gateway_db::TestGatewayBuilder, make_keypair, spawn_gateway_service}; +use chrono::{Duration, Utc}; use futures::stream::StreamExt; -use helium_crypto::{Keypair, PublicKey, Sign}; +use helium_crypto::{Keypair, Sign}; use helium_proto::services::mobile_config::{ self as proto, DeploymentInfo, DeviceTypeV2, GatewayClient, GatewayInfoStreamReqV3, GatewayInfoV3, LocationInfo, @@ -12,61 +11,6 @@ use prost::Message; use sqlx::PgPool; use std::vec; -#[derive(Builder)] -#[builder(setter(into))] -struct TestGateway { - address: PublicKey, - gateway_type: GatewayType, - #[builder(default)] - created_at: DateTime, - #[builder(default)] - inserted_at: DateTime, - #[builder(default)] - refreshed_at: DateTime, - #[builder(default)] - last_changed_at: DateTime, - #[builder(default)] - hash: String, - #[builder(default = "Some(18)")] - antenna: Option, - #[builder(default)] - elevation: Option, - #[builder(default)] - azimuth: Option, - #[builder(default)] - location: Option, - #[builder(default)] - location_changed_at: Option>, - #[builder(default)] - location_asserts: Option, - #[builder(default)] - owner: Option, - #[builder(default)] - hash_v2: Option, -} - -impl From for Gateway { - fn from(tg: TestGateway) -> Self { - Gateway { - address: tg.address.into(), - gateway_type: tg.gateway_type, - created_at: tg.created_at, - inserted_at: tg.inserted_at, - refreshed_at: tg.refreshed_at, - last_changed_at: tg.last_changed_at, - hash: tg.hash, - antenna: tg.antenna, - elevation: tg.elevation, - azimuth: tg.azimuth, - location: tg.location, - location_changed_at: tg.location_changed_at, - location_asserts: tg.location_asserts, - owner: tg.owner, - hash_v2: tg.hash_v2, - } - } -} - #[sqlx::test] async fn gateway_stream_info_v3_basic(pool: PgPool) -> anyhow::Result<()> { let admin_key = make_keypair(); @@ -404,8 +348,8 @@ async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) -> anyhow::Res .inserted_at(now_minus_six) .refreshed_at(now) .last_changed_at(now_minus_three) - .elevation(2u32) - .azimuth(161u32) + .elevation(2) + .azimuth(161) .location(loc1) .location_changed_at(now_minus_six) .location_asserts(1) From e35bc33757b633fc7bdb45c9f3f45bf96b57eed3 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 16 Dec 2025 13:22:33 +0200 Subject: [PATCH 06/17] Refactor updating gateways logic --- mobile_config/src/gateway/tracker.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index afdd171d3..9e57714a5 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -94,27 +94,22 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow to_insert.push(gw); } Some(last_gw) => { - let loc_changed = gw.location != last_gw.location; + // FYI hash includes location let hash_changed = gw.hash != last_gw.hash; + if !hash_changed { + continue; + } - // FYI hash includes location - gw.last_changed_at = if hash_changed { - gw.refreshed_at - } else { - last_gw.last_changed_at - }; + gw.last_changed_at = gw.refreshed_at; + let loc_changed = gw.location != last_gw.location; gw.location_changed_at = if loc_changed { Some(gw.refreshed_at) } else { last_gw.location_changed_at }; - // We only add record if something changed - // FYI hash includes location - if hash_changed { - to_insert.push(gw); - } + to_insert.push(gw); } } } From 336e54cc05c7e71babca30a9d882bcee85e30047 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 16 Dec 2025 14:48:26 +0200 Subject: [PATCH 07/17] Add owner_chagned_at field --- ...51215132218_add_owner_hash_to_gateways.sql | 4 ++-- mobile_config/src/gateway/db.rs | 22 ++++++++--------- mobile_config/src/gateway/metadata_db.rs | 2 +- mobile_config/src/gateway/tracker.rs | 7 ++++++ .../tests/integrations/common/gateway_db.rs | 4 ++-- .../tests/integrations/gateway_db.rs | 2 +- .../tests/integrations/gateway_service.rs | 24 +++++++++---------- 7 files changed, 36 insertions(+), 29 deletions(-) diff --git a/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql b/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql index 50d233759..241c70380 100644 --- a/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql +++ b/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql @@ -1,3 +1,3 @@ ALTER TABLE gateways -ADD COLUMN owner VARCHAR(255), -ADD COLUMN hash_v2 TEXT; +ADD COLUMN IF NOT EXISTS owner VARCHAR(255), +ADD COLUMN IF NOT EXISTS owner_changed_at TIMESTAMPTZ; diff --git a/mobile_config/src/gateway/db.rs b/mobile_config/src/gateway/db.rs index c086837e0..ed0869a97 100644 --- a/mobile_config/src/gateway/db.rs +++ b/mobile_config/src/gateway/db.rs @@ -96,7 +96,7 @@ pub struct Gateway { pub location_changed_at: Option>, pub location_asserts: Option, pub owner: Option, - pub hash_v2: Option, + pub owner_changed_at: Option>, } #[derive(Debug)] @@ -126,7 +126,7 @@ impl Gateway { location_changed_at, location_asserts, owner, - hash_v2 + owner_changed_at ) ", ); @@ -144,7 +144,7 @@ impl Gateway { .push_bind(g.location_changed_at) .push_bind(g.location_asserts.map(|v| v as i64)) .push_bind(g.owner.as_deref()) - .push_bind(g.hash_v2.as_deref()); + .push_bind(g.owner_changed_at); }); let res = qb.build().execute(pool).await?; @@ -168,7 +168,7 @@ impl Gateway { location_changed_at, location_asserts, owner, - hash_v2 + owner_changed_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, @@ -189,7 +189,7 @@ impl Gateway { .bind(self.location_changed_at) .bind(self.location_asserts.map(|v| v as i64)) .bind(self.owner.as_deref()) - .bind(self.hash_v2.as_deref()) + .bind(self.owner_changed_at) .execute(pool) .await?; @@ -217,7 +217,7 @@ impl Gateway { location_changed_at, location_asserts, owner, - hash_v2 + owner_changed_at FROM gateways WHERE address = $1 ORDER BY inserted_at DESC @@ -254,7 +254,7 @@ impl Gateway { location_changed_at, location_asserts, owner, - hash_v2 + owner_changed_at FROM gateways WHERE address = ANY($1) ORDER BY address, inserted_at DESC @@ -289,7 +289,7 @@ impl Gateway { location_changed_at, location_asserts, owner, - hash_v2 + owner_changed_at FROM gateways WHERE address = $1 AND inserted_at <= $2 @@ -329,7 +329,7 @@ impl Gateway { location_changed_at, location_asserts, owner, - hash_v2 + owner_changed_at FROM gateways WHERE address = ANY($1) AND last_changed_at >= $2 @@ -366,7 +366,7 @@ impl Gateway { location_changed_at, location_asserts, owner, - hash_v2 + owner_changed_at FROM gateways WHERE gateway_type = ANY($1) AND last_changed_at >= $2 @@ -449,7 +449,7 @@ impl FromRow<'_, PgRow> for Gateway { location_changed_at: row.try_get("location_changed_at")?, location_asserts: to_u32(row.try_get("location_asserts")?), owner: row.try_get("owner")?, - hash_v2: row.try_get("hash_v2")?, + owner_changed_at: row.try_get("owner_changed_at")?, }) } } diff --git a/mobile_config/src/gateway/metadata_db.rs b/mobile_config/src/gateway/metadata_db.rs index 68daadd11..86c34a7b8 100644 --- a/mobile_config/src/gateway/metadata_db.rs +++ b/mobile_config/src/gateway/metadata_db.rs @@ -141,7 +141,7 @@ impl MobileHotspotInfo { }, location_asserts: self.num_location_asserts.map(|n| n as u32), owner: None, // TODO - hash_v2: None, + owner_changed_at: Some(refreshed_at), })) } } diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index 9e57714a5..c987d7701 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -109,6 +109,13 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow last_gw.location_changed_at }; + let owner_changed = gw.owner != last_gw.owner; + gw.owner_changed_at = if owner_changed { + Some(gw.refreshed_at) + } else { + last_gw.owner_changed_at + }; + to_insert.push(gw); } } diff --git a/mobile_config/tests/integrations/common/gateway_db.rs b/mobile_config/tests/integrations/common/gateway_db.rs index f93ef3967..7103ecaa7 100644 --- a/mobile_config/tests/integrations/common/gateway_db.rs +++ b/mobile_config/tests/integrations/common/gateway_db.rs @@ -34,7 +34,7 @@ pub struct TestGateway { #[builder(default)] owner: Option, #[builder(default)] - hash_v2: Option, + owner_changed_at: Option>, } impl From for Gateway { @@ -54,7 +54,7 @@ impl From for Gateway { location_changed_at: tg.location_changed_at, location_asserts: tg.location_asserts, owner: tg.owner, - hash_v2: tg.hash_v2, + owner_changed_at: tg.owner_changed_at, } } } diff --git a/mobile_config/tests/integrations/gateway_db.rs b/mobile_config/tests/integrations/gateway_db.rs index c04aa33f4..bb85a478e 100644 --- a/mobile_config/tests/integrations/gateway_db.rs +++ b/mobile_config/tests/integrations/gateway_db.rs @@ -202,6 +202,6 @@ fn gw(address: PublicKeyBinary, gateway_type: GatewayType, t: chrono::DateTime anyhow::Result<()> { location_changed_at: Some(inserted_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -421,7 +421,7 @@ async fn gateway_info_batch_v1(pool: PgPool) -> anyhow::Result<()> { location_changed_at: Some(created_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -492,7 +492,7 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { location_changed_at: Some(inserted_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -511,7 +511,7 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { location_changed_at: Some(created_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -605,7 +605,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location_changed_at: Some(refreshed_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -624,7 +624,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location_changed_at: Some(created_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -643,7 +643,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location_changed_at: Some(inserted_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway3.insert(&pool).await?; @@ -662,7 +662,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< location_changed_at: Some(created_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway4.insert(&pool).await?; @@ -743,7 +743,7 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { location_changed_at: Some(refreshed_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway1.insert(&pool).await?; @@ -762,7 +762,7 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { location_changed_at: Some(created_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway2.insert(&pool).await?; @@ -844,7 +844,7 @@ async fn gateway_info_at_timestamp(pool: PgPool) -> anyhow::Result<()> { location_changed_at: Some(refreshed_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway_original.insert(&pool).await?; @@ -871,7 +871,7 @@ async fn gateway_info_at_timestamp(pool: PgPool) -> anyhow::Result<()> { location_changed_at: Some(created_at), location_asserts: Some(1), owner: None, - hash_v2: None, + owner_changed_at: Some(created_at), }; gateway_recent.insert(&pool).await?; From 2b843fa3d7fcc7606918f06fb1397bc7c09dca84 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 16 Dec 2025 14:57:33 +0200 Subject: [PATCH 08/17] Track owner changing correctly --- mobile_config/src/gateway/tracker.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index c987d7701..bf675feee 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -94,29 +94,35 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow to_insert.push(gw); } Some(last_gw) => { + let loc_changed = gw.location != last_gw.location; // FYI hash includes location + // owner (at this moment) is not included in hash let hash_changed = gw.hash != last_gw.hash; - if !hash_changed { - continue; - } + let owner_changed = gw.owner != last_gw.owner; - gw.last_changed_at = gw.refreshed_at; + gw.last_changed_at = if hash_changed { + gw.refreshed_at + } else { + last_gw.last_changed_at + }; - let loc_changed = gw.location != last_gw.location; gw.location_changed_at = if loc_changed { Some(gw.refreshed_at) } else { last_gw.location_changed_at }; - let owner_changed = gw.owner != last_gw.owner; gw.owner_changed_at = if owner_changed { Some(gw.refreshed_at) } else { last_gw.owner_changed_at }; - to_insert.push(gw); + // We only add record if something changed + // FYI hash includes location + if hash_changed || owner_changed { + to_insert.push(gw); + } } } } From 24ee7e3320ddb0e26a13039fe258ed7f803cf699 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 16 Dec 2025 16:08:55 +0200 Subject: [PATCH 09/17] Delete mobile_config cli import --- mobile_config/src/cli/import.rs | 102 -------------------------------- mobile_config/src/cli/mod.rs | 8 +-- 2 files changed, 1 insertion(+), 109 deletions(-) delete mode 100644 mobile_config/src/cli/import.rs diff --git a/mobile_config/src/cli/import.rs b/mobile_config/src/cli/import.rs deleted file mode 100644 index 0479aac3e..000000000 --- a/mobile_config/src/cli/import.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::{ - gateway::db::{Gateway, LocationChangedAtUpdate}, - settings::Settings, -}; -use chrono::{DateTime, Utc}; -use h3o::{error::InvalidCellIndex, CellIndex}; -use helium_crypto::PublicKey; -use serde::Deserialize; -use std::{ - fs::File, - path::{Path, PathBuf}, - str::FromStr, - time::Instant, -}; - -#[derive(Debug, clap::Parser)] -pub struct Import { - #[clap(short = 'f')] - file: PathBuf, - - #[clap(subcommand)] - cmd: Cmd, -} - -#[derive(Debug, clap::Subcommand)] -pub enum Cmd { - HotspotsAssertions, -} - -impl Import { - pub async fn run(self, settings: &Settings) -> anyhow::Result<()> { - match self.cmd { - Cmd::HotspotsAssertions => { - custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; - - tracing::info!("started"); - - let pool = settings.database.connect("mobile-config-store").await?; - - let start = Instant::now(); - - let updates = read_csv(self.file)? - .into_iter() - .filter_map(|row| row.try_into().ok()) - .collect::>(); - - tracing::info!("file read, updating {} records", updates.len()); - - let updated = Gateway::update_bulk_location_changed_at(&pool, &updates).await?; - - let elapsed = start.elapsed(); - tracing::info!(?elapsed, updated, "finished"); - - Ok(()) - } - } - } -} - -#[derive(Debug, Deserialize)] -struct CsvRow { - public_key: PublicKey, - // serialnumber: String, - time: DateTime, - // latitude: f64, - // longitude: f64, - h3: String, - // assertion_type: String, -} - -#[derive(Debug, thiserror::Error)] -pub enum CsvRowError { - #[error("H3 index parse error: {0}")] - H3IndexParseError(#[from] InvalidCellIndex), -} - -impl TryFrom for LocationChangedAtUpdate { - type Error = CsvRowError; - - fn try_from(row: CsvRow) -> Result { - let cell = CellIndex::from_str(&row.h3)?; - - Ok(Self { - address: row.public_key.into(), - location_changed_at: row.time, - location: cell.into(), - }) - } -} - -fn read_csv>(path: P) -> anyhow::Result> { - let file = File::open(path)?; - let mut rdr = csv::Reader::from_reader(file); - let mut rows = Vec::new(); - - for result in rdr.deserialize() { - let record: CsvRow = result?; - rows.push(record); - } - - Ok(rows) -} diff --git a/mobile_config/src/cli/mod.rs b/mobile_config/src/cli/mod.rs index edd76242f..243ff971a 100644 --- a/mobile_config/src/cli/mod.rs +++ b/mobile_config/src/cli/mod.rs @@ -1,5 +1,5 @@ use crate::{ - cli::{api::Api, import::Import, server::Server}, + cli::{api::Api, server::Server}, settings::Settings, }; use base64::{engine::general_purpose, Engine}; @@ -9,7 +9,6 @@ use std::io::Write; use std::{fs::File, path::PathBuf}; pub mod api; -pub mod import; pub mod server; #[derive(Debug, clap::Parser)] @@ -55,10 +54,6 @@ impl Cli { Ok(()) } - Cmd::Import(import) => { - let settings = Settings::new(self.config)?; - import.run(&settings).await - } Cmd::Server(server) => { let settings = Settings::new(self.config)?; server.run(&settings).await @@ -71,6 +66,5 @@ impl Cli { pub enum Cmd { Api(Api), GenerateKey, - Import(Import), Server(Server), } From 1d88a3777fdd4876a1c3055ef57b4579d4013f41 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Tue, 16 Dec 2025 16:51:39 +0200 Subject: [PATCH 10/17] Track owner --- mobile_config/src/gateway/metadata_db.rs | 9 +++++++-- mobile_config/src/gateway/tracker.rs | 2 +- .../integrations/common/gateway_metadata_db.rs | 14 ++++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/mobile_config/src/gateway/metadata_db.rs b/mobile_config/src/gateway/metadata_db.rs index 86c34a7b8..5d0796e7b 100644 --- a/mobile_config/src/gateway/metadata_db.rs +++ b/mobile_config/src/gateway/metadata_db.rs @@ -21,6 +21,7 @@ pub struct MobileHotspotInfo { dc_onboarding_fee_paid: Option, device_type: DeviceType, deployment_info: Option, + owner: Option, } impl MobileHotspotInfo { @@ -88,10 +89,13 @@ impl MobileHotspotInfo { mhi.is_active, mhi.dc_onboarding_fee_paid::bigint, mhi.device_type::text, - mhi.deployment_info::text + mhi.deployment_info::text, + ao.owner FROM key_to_assets kta INNER JOIN mobile_hotspot_infos mhi ON kta.asset = mhi.asset + LEFT JOIN asset_owners ao ON + kta.asset = ao.asset WHERE kta.entity_key IS NOT NULL AND mhi.refreshed_at IS NOT NULL ORDER BY kta.entity_key, refreshed_at DESC @@ -140,7 +144,7 @@ impl MobileHotspotInfo { None }, location_asserts: self.num_location_asserts.map(|n| n as u32), - owner: None, // TODO + owner: self.owner.clone(), owner_changed_at: Some(refreshed_at), })) } @@ -173,6 +177,7 @@ impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for MobileHotspotInfo { num_location_asserts: row.get::, &str>("num_location_asserts"), is_active: row.get::, &str>("is_active"), dc_onboarding_fee_paid: row.get::, &str>("dc_onboarding_fee_paid"), + owner: row.get::, &str>("owner"), device_type, deployment_info, }) diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index bf675feee..6e520cbb1 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -100,7 +100,7 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow let hash_changed = gw.hash != last_gw.hash; let owner_changed = gw.owner != last_gw.owner; - gw.last_changed_at = if hash_changed { + gw.last_changed_at = if hash_changed || owner_changed { gw.refreshed_at } else { last_gw.last_changed_at diff --git a/mobile_config/tests/integrations/common/gateway_metadata_db.rs b/mobile_config/tests/integrations/common/gateway_metadata_db.rs index 40ea78a5e..fadb7e20e 100644 --- a/mobile_config/tests/integrations/common/gateway_metadata_db.rs +++ b/mobile_config/tests/integrations/common/gateway_metadata_db.rs @@ -138,4 +138,18 @@ pub async fn create_tables(pool: &PgPool) { .execute(pool) .await .unwrap(); + + sqlx::query( + r#" + CREATE TABLE asset_owners ( + asset character varying(255) NULL, + owner character varying(255) NULL, + created_at timestamptz, + updated_at timestamptz, + last_block integer + );"#, + ) + .execute(pool) + .await + .unwrap(); } From 7f8cdb1ffd233d5eb8f1cf9459832cebc64e8033 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 17 Dec 2025 12:22:25 +0200 Subject: [PATCH 11/17] Backfill owners --- mobile_config/src/gateway/db.rs | 82 ++++++++++++++++++++++++++- mobile_config/src/gateway/tracker.rs | 83 +++++++++++++++++++++++++++- 2 files changed, 163 insertions(+), 2 deletions(-) diff --git a/mobile_config/src/gateway/db.rs b/mobile_config/src/gateway/db.rs index ed0869a97..78ee9dc86 100644 --- a/mobile_config/src/gateway/db.rs +++ b/mobile_config/src/gateway/db.rs @@ -401,7 +401,7 @@ impl Gateway { r#" UPDATE gateways AS g SET location_changed_at = v.location_changed_at - FROM ( + FROM ( "#, ); @@ -426,6 +426,86 @@ impl Gateway { Ok(total) } + + /// Get gateways where owner is NULL (for post-migration backfill) + /// Only returns addresses where the MOST RECENT record has a NULL owner + pub async fn get_null_owners(pool: &PgPool) -> anyhow::Result> { + let gateways = sqlx::query_as::<_, Self>( + r#" + WITH latest_gateways AS ( + SELECT DISTINCT ON (address) + address, + gateway_type, + created_at, + inserted_at, + refreshed_at, + last_changed_at, + hash, + antenna, + elevation, + azimuth, + location, + location_changed_at, + location_asserts, + owner, + owner_changed_at + FROM gateways + ORDER BY address, inserted_at DESC + ) + SELECT * FROM latest_gateways + WHERE owner IS NULL + "#, + ) + .fetch_all(pool) + .await?; + + Ok(gateways) + } + + /// Update owner and owner_changed_at for multiple gateways + pub async fn update_owners(pool: &PgPool, gateways: &[Gateway]) -> anyhow::Result { + if gateways.is_empty() { + return Ok(0); + } + + const MAX_ROWS: usize = 20000; + let mut total = 0; + + for chunk in gateways.chunks(MAX_ROWS) { + let mut qb = QueryBuilder::::new( + r#" + UPDATE gateways AS g + SET + owner = v.owner, + owner_changed_at = v.owner_changed_at + FROM ( + "#, + ); + + qb.push_values(chunk, |mut b, gw| { + b.push_bind(gw.address.as_ref()) + .push_bind(gw.owner.as_deref()) + .push_bind(gw.owner_changed_at); + }); + + qb.push( + r#" + ) AS v(address, owner, owner_changed_at) + WHERE g.address = v.address + AND g.inserted_at = ( + SELECT MAX(inserted_at) + FROM gateways + WHERE address = v.address + ) + "#, + ); + + let res = qb.build().execute(pool).await?; + total += res.rows_affected(); + } + + Ok(total) + } } impl FromRow<'_, PgRow> for Gateway { diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index 6e520cbb1..3078e7ab0 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -39,6 +39,10 @@ impl Tracker { tracing::info!("starting with interval: {:?}", self.interval); let mut interval = tokio::time::interval(self.interval); + if let Err(e) = backfill_gateway_owners(&self.pool, &self.metadata).await { + tracing::error!("backfill_gateway_owners is faled. {e}"); + } + loop { tokio::select! { biased; @@ -57,6 +61,78 @@ impl Tracker { } } +/// Post-migration function to backfill owner data for gateways with NULL owner. +/// Gets gateways where owner is NULL, streams mobile hotspot info, +/// and updates gateways.owner with owner_changed_at = last_changed_at +pub async fn backfill_gateway_owners( + pool: &Pool, + metadata: &Pool, +) -> anyhow::Result<()> { + tracing::info!("starting owner backfill"); + let start = Instant::now(); + + const BATCH_SIZE: usize = 1_000; + + // Get gateways where owner is NULL + let null_owner_gateways = Gateway::get_null_owners(pool).await?; + let null_owner_addresses: HashMap = null_owner_gateways + .into_iter() + .map(|gw| (gw.address.to_string(), gw)) + .collect(); + + tracing::info!( + "found {} gateways with null owners", + null_owner_addresses.len() + ); + if null_owner_addresses.is_empty() { + return Ok(()); + } + + // Stream mobile hotspot info and update matching gateways + let total: u64 = MobileHotspotInfo::stream(metadata) + .map_err(anyhow::Error::from) + .try_filter_map(|mhi| async move { + match mhi.to_gateway() { + Ok(Some(gw)) => Ok(Some(gw)), + Ok(None) => Ok(None), + Err(e) => { + tracing::error!(?e, "error converting gateway"); + Err(e) + } + } + }) + .try_chunks(BATCH_SIZE) + .map_err(|TryChunksError(_gateways, err)| err) + .try_fold(0, |total, batch| { + let null_owner_addresses = &null_owner_addresses; + async move { + let mut to_update = Vec::with_capacity(BATCH_SIZE); + + for gw in batch { + if let Some(existing_gw) = null_owner_addresses.get(&gw.address.to_string()) { + if let Some(owner) = gw.owner { + // Update gateway with owner from mobile hotspot info + let mut updated_gw = existing_gw.clone(); + updated_gw.owner = Some(owner); + // Set owner_changed_at = last_changed_at + updated_gw.owner_changed_at = Some(existing_gw.last_changed_at); + to_update.push(updated_gw); + } + } + } + + let affected = Gateway::update_owners(pool, &to_update).await?; + Ok(total + affected) + } + }) + .await?; + + let elapsed = start.elapsed(); + tracing::info!(?elapsed, updated = total, "done owner backfill"); + + Ok(()) +} + pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { tracing::info!("starting execute"); let start = Instant::now(); @@ -98,7 +174,12 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow // FYI hash includes location // owner (at this moment) is not included in hash let hash_changed = gw.hash != last_gw.hash; - let owner_changed = gw.owner != last_gw.owner; + + let owner_changed = if gw.owner.is_none() { + false + } else { + gw.owner != last_gw.owner + }; gw.last_changed_at = if hash_changed || owner_changed { gw.refreshed_at From 62a88fca9d7dd8d8c73f556c7378a69b50af5932 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 17 Dec 2025 13:41:08 +0200 Subject: [PATCH 12/17] Add test_gateway_tracker_owner_tracking --- .../common/gateway_metadata_db.rs | 48 +++++++++++++ .../tests/integrations/gateway_tracker.rs | 70 +++++++++++++++++++ 2 files changed, 118 insertions(+) diff --git a/mobile_config/tests/integrations/common/gateway_metadata_db.rs b/mobile_config/tests/integrations/common/gateway_metadata_db.rs index fadb7e20e..d47a43a59 100644 --- a/mobile_config/tests/integrations/common/gateway_metadata_db.rs +++ b/mobile_config/tests/integrations/common/gateway_metadata_db.rs @@ -108,6 +108,54 @@ pub async fn update_gateway( Ok(()) } +pub async fn insert_asset_owner( + pool: &PgPool, + asset: &str, + owner: &str, + created_at: DateTime, + updated_at: DateTime, +) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO asset_owners ( + asset, owner, created_at, updated_at, last_block + ) + VALUES ($1, $2, $3, $4, 0) + "#, + ) + .bind(asset) + .bind(owner) + .bind(created_at) + .bind(updated_at) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn update_asset_owner( + pool: &PgPool, + asset: &str, + owner: &str, + updated_at: DateTime, +) -> anyhow::Result<()> { + sqlx::query( + r#" + UPDATE asset_owners + SET owner = $1, + updated_at = $2 + WHERE asset = $3 + "#, + ) + .bind(owner) + .bind(updated_at) + .bind(asset) + .execute(pool) + .await?; + + Ok(()) +} + pub async fn create_tables(pool: &PgPool) { sqlx::query( r#" diff --git a/mobile_config/tests/integrations/gateway_tracker.rs b/mobile_config/tests/integrations/gateway_tracker.rs index c356553ec..9ba350724 100644 --- a/mobile_config/tests/integrations/gateway_tracker.rs +++ b/mobile_config/tests/integrations/gateway_tracker.rs @@ -118,3 +118,73 @@ async fn count_gateways(pool: &PgPool) -> anyhow::Result { Ok(count) } + +#[sqlx::test] +async fn test_gateway_tracker_owner_tracking(pool: PgPool) -> anyhow::Result<()> { + let now = Utc::now() + .with_nanosecond(Utc::now().timestamp_subsec_micros() * 1000) + .unwrap(); + + // ensure tables exist + gateway_metadata_db::create_tables(&pool).await; + + // Create a gateway with owner information + let pubkey: helium_crypto::PublicKeyBinary = make_keypair().public_key().clone().into(); + let asset = "test_asset_001".to_string(); + let initial_owner = "owner1_address".to_string(); + let hex_val = 631_711_281_837_647_359_i64; + + let gateway = gateway_metadata_db::GatewayInsert { + asset: asset.clone(), + location: Some(hex_val), + device_type: "\"wifiIndoor\"".to_string(), + key: pubkey.clone(), + created_at: now, + refreshed_at: Some(now), + deployment_info: None, + }; + + // Insert the gateway into mobile_hotspot_infos and key_to_assets + gateway_metadata_db::insert_gateway_bulk(&pool, &[gateway], 1000).await?; + + // Insert the owner into asset_owners table + gateway_metadata_db::insert_asset_owner(&pool, &asset, &initial_owner, now, now).await?; + + // Run the tracker execute function + tracker::execute(&pool, &pool).await?; + + // Verify the gateway was created with the correct owner + let retrieved_gateway = Gateway::get_by_address(&pool, &pubkey) + .await? + .expect("gateway not found"); + + assert_eq!(retrieved_gateway.address, pubkey.clone()); + assert_eq!(retrieved_gateway.gateway_type, GatewayType::WifiIndoor); + assert_eq!(retrieved_gateway.owner, Some(initial_owner.clone())); + assert_eq!(retrieved_gateway.owner_changed_at, Some(now)); + + // Update the owner in asset_owners table + let new_owner = "owner2_address".to_string(); + let update_time = now + chrono::Duration::hours(1); + + // Update the refreshed_at time in mobile_hotspot_infos to simulate a new update + gateway_metadata_db::update_gateway(&pool, &asset, hex_val, update_time, 1).await?; + + gateway_metadata_db::update_asset_owner(&pool, &asset, &new_owner, update_time).await?; + + // Run tracker::execute again + tracker::execute(&pool, &pool).await?; + + // Verify the owner and owner_changed_at were updated + let updated_gateway = Gateway::get_by_address(&pool, &pubkey) + .await? + .expect("gateway not found"); + + assert_eq!(updated_gateway.address, pubkey.clone()); + assert_eq!(updated_gateway.owner, Some(new_owner.clone())); + assert_eq!(updated_gateway.owner_changed_at, Some(update_time)); + // last_changed_at should also be updated since owner changed + assert_eq!(updated_gateway.last_changed_at, update_time); + + Ok(()) +} From 5475cf29ecca442c9d1c3fb92ab42c311ca29805 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 17 Dec 2025 14:11:49 +0200 Subject: [PATCH 13/17] Make sure new record was created --- mobile_config/tests/integrations/gateway_tracker.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/mobile_config/tests/integrations/gateway_tracker.rs b/mobile_config/tests/integrations/gateway_tracker.rs index 9ba350724..c27110b08 100644 --- a/mobile_config/tests/integrations/gateway_tracker.rs +++ b/mobile_config/tests/integrations/gateway_tracker.rs @@ -163,6 +163,10 @@ async fn test_gateway_tracker_owner_tracking(pool: PgPool) -> anyhow::Result<()> assert_eq!(retrieved_gateway.owner, Some(initial_owner.clone())); assert_eq!(retrieved_gateway.owner_changed_at, Some(now)); + // Count gateways before owner change + let count_before = count_gateways(&pool).await?; + assert_eq!(1, count_before); + // Update the owner in asset_owners table let new_owner = "owner2_address".to_string(); let update_time = now + chrono::Duration::hours(1); @@ -175,6 +179,13 @@ async fn test_gateway_tracker_owner_tracking(pool: PgPool) -> anyhow::Result<()> // Run tracker::execute again tracker::execute(&pool, &pool).await?; + // Verify a new record was created after owner change + let count_after = count_gateways(&pool).await?; + assert_eq!( + 2, count_after, + "A new record should be created when owner changes" + ); + // Verify the owner and owner_changed_at were updated let updated_gateway = Gateway::get_by_address(&pool, &pubkey) .await? From c9ec56f4e348d438c2be1631af3b37a359fa08b9 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 17 Dec 2025 14:19:28 +0200 Subject: [PATCH 14/17] Don't updated (yet) last_changed_at if owner was changed --- mobile_config/src/gateway/tracker.rs | 4 +++- mobile_config/tests/integrations/gateway_tracker.rs | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index 3078e7ab0..c38dbb4ec 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -181,7 +181,9 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow gw.owner != last_gw.owner }; - gw.last_changed_at = if hash_changed || owner_changed { + // TODO at the second stage of implementing owner and owner_changed at + // last_changed_at should also be affected if owner was changed + gw.last_changed_at = if hash_changed { gw.refreshed_at } else { last_gw.last_changed_at diff --git a/mobile_config/tests/integrations/gateway_tracker.rs b/mobile_config/tests/integrations/gateway_tracker.rs index c27110b08..a85944dd5 100644 --- a/mobile_config/tests/integrations/gateway_tracker.rs +++ b/mobile_config/tests/integrations/gateway_tracker.rs @@ -194,8 +194,9 @@ async fn test_gateway_tracker_owner_tracking(pool: PgPool) -> anyhow::Result<()> assert_eq!(updated_gateway.address, pubkey.clone()); assert_eq!(updated_gateway.owner, Some(new_owner.clone())); assert_eq!(updated_gateway.owner_changed_at, Some(update_time)); - // last_changed_at should also be updated since owner changed - assert_eq!(updated_gateway.last_changed_at, update_time); + // last_changed_at should also be updated since owner changed (In next owner implementing stage). + // But currently, it stays the same. After full implementation change `now` to `update_time` + assert_eq!(updated_gateway.last_changed_at, now); Ok(()) } From 147a1320c6d0aebf9dafe83ffafce6268768862f Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 17 Dec 2025 15:59:55 +0200 Subject: [PATCH 15/17] Add backfill owner test --- .../tests/integrations/gateway_tracker.rs | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) diff --git a/mobile_config/tests/integrations/gateway_tracker.rs b/mobile_config/tests/integrations/gateway_tracker.rs index a85944dd5..1282c1c33 100644 --- a/mobile_config/tests/integrations/gateway_tracker.rs +++ b/mobile_config/tests/integrations/gateway_tracker.rs @@ -200,3 +200,172 @@ async fn test_gateway_tracker_owner_tracking(pool: PgPool) -> anyhow::Result<()> Ok(()) } + +#[sqlx::test] +async fn test_backfill_gateway_owners(pool: PgPool) -> anyhow::Result<()> { + let now = Utc::now() + .with_nanosecond(Utc::now().timestamp_subsec_micros() * 1000) + .unwrap(); + + // ensure tables exist + gateway_metadata_db::create_tables(&pool).await; + + // Create gateways without owners initially + let gateway1_pubkey: helium_crypto::PublicKeyBinary = + make_keypair().public_key().clone().into(); + let gateway2_pubkey: helium_crypto::PublicKeyBinary = + make_keypair().public_key().clone().into(); + + let asset1 = "test_asset_backfill_001".to_string(); + let asset2 = "test_asset_backfill_002".to_string(); + let hex_val = 631_711_281_837_647_359_i64; + + let gateways = vec![ + gateway_metadata_db::GatewayInsert { + asset: asset1.clone(), + location: Some(hex_val), + device_type: "\"wifiIndoor\"".to_string(), + key: gateway1_pubkey.clone(), + created_at: now, + refreshed_at: Some(now), + deployment_info: None, + }, + gateway_metadata_db::GatewayInsert { + asset: asset2.clone(), + location: Some(hex_val + 1), + device_type: "\"wifiOutdoor\"".to_string(), + key: gateway2_pubkey.clone(), + created_at: now, + refreshed_at: Some(now), + deployment_info: None, + }, + ]; + + // Directly insert gateways into the gateways table WITHOUT owners + // This simulates the state before the owner tracking feature was added + let gw1 = Gateway { + address: gateway1_pubkey.clone(), + gateway_type: GatewayType::WifiIndoor, + created_at: now, + inserted_at: now, + refreshed_at: now, + last_changed_at: now, + hash: "test_hash_1".to_string(), + antenna: None, + elevation: None, + azimuth: None, + location: Some(hex_val as u64), + location_changed_at: Some(now), + location_asserts: Some(1), + owner: None, + owner_changed_at: None, + }; + + let gw2 = Gateway { + address: gateway2_pubkey.clone(), + gateway_type: GatewayType::WifiOutdoor, + created_at: now, + inserted_at: now, + refreshed_at: now, + last_changed_at: now, + hash: "test_hash_2".to_string(), + antenna: None, + elevation: None, + azimuth: None, + location: Some((hex_val + 1) as u64), + location_changed_at: Some(now), + location_asserts: Some(1), + owner: None, + owner_changed_at: None, + }; + + Gateway::insert_bulk(&pool, &[gw1.clone(), gw2.clone()]).await?; + + // Verify gateways were created with NULL owners + let gw1_before = Gateway::get_by_address(&pool, &gateway1_pubkey) + .await? + .expect("gateway 1 not found"); + let gw2_before = Gateway::get_by_address(&pool, &gateway2_pubkey) + .await? + .expect("gateway 2 not found"); + + assert_eq!(gw1_before.owner, None, "Gateway 1 should have NULL owner"); + assert_eq!( + gw1_before.owner_changed_at, None, + "Gateway 1 should have NULL owner_changed_at" + ); + assert_eq!(gw2_before.owner, None, "Gateway 2 should have NULL owner"); + assert_eq!( + gw2_before.owner_changed_at, None, + "Gateway 2 should have NULL owner_changed_at" + ); + + // Insert gateways into metadata tables with owners + gateway_metadata_db::insert_gateway_bulk(&pool, &gateways, 1000).await?; + + // Now add owners to the metadata database + let owner1 = "owner1_backfill_address".to_string(); + let owner2 = "owner2_backfill_address".to_string(); + + gateway_metadata_db::insert_asset_owner(&pool, &asset1, &owner1, now, now).await?; + gateway_metadata_db::insert_asset_owner(&pool, &asset2, &owner2, now, now).await?; + + // Count gateway records before backfill + let count_before = count_gateways(&pool).await?; + assert_eq!(2, count_before, "Should have 2 gateway records before backfill"); + + // Run backfill_gateway_owners + tracker::backfill_gateway_owners(&pool, &pool).await?; + + // Verify no new records were added (backfill should UPDATE, not INSERT) + let count_after = count_gateways(&pool).await?; + assert_eq!( + count_before, count_after, + "Backfill should not create new records, only update existing ones" + ); + + // Verify owners were updated + let gw1_after = Gateway::get_by_address(&pool, &gateway1_pubkey) + .await? + .expect("gateway 1 not found after backfill"); + let gw2_after = Gateway::get_by_address(&pool, &gateway2_pubkey) + .await? + .expect("gateway 2 not found after backfill"); + + assert_eq!( + gw1_after.owner, + Some(owner1.clone()), + "Gateway 1 owner should be updated" + ); + assert_eq!( + gw1_after.owner_changed_at, + Some(gw1_before.last_changed_at), + "Gateway 1 owner_changed_at should be set to last_changed_at" + ); + + assert_eq!( + gw2_after.owner, + Some(owner2.clone()), + "Gateway 2 owner should be updated" + ); + assert_eq!( + gw2_after.owner_changed_at, + Some(gw2_before.last_changed_at), + "Gateway 2 owner_changed_at should be set to last_changed_at" + ); + + // Verify that other fields remain unchanged + assert_eq!(gw1_after.address, gw1_before.address); + assert_eq!(gw1_after.gateway_type, gw1_before.gateway_type); + assert_eq!(gw1_after.last_changed_at, gw1_before.last_changed_at); + assert_eq!(gw1_after.location, gw1_before.location); + assert_eq!(gw1_after.hash, gw1_before.hash, "Hash should not change during backfill"); + + assert_eq!(gw2_after.address, gw2_before.address); + assert_eq!(gw2_after.gateway_type, gw2_before.gateway_type); + assert_eq!(gw2_after.last_changed_at, gw2_before.last_changed_at); + assert_eq!(gw2_after.location, gw2_before.location); + assert_eq!(gw2_after.hash, gw2_before.hash, "Hash should not change during backfill"); + + Ok(()) +} From e8d76d30bea998338e9d858bb234325094ce8456 Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 17 Dec 2025 16:01:42 +0200 Subject: [PATCH 16/17] Fix fmt --- .../tests/integrations/gateway_tracker.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/mobile_config/tests/integrations/gateway_tracker.rs b/mobile_config/tests/integrations/gateway_tracker.rs index 1282c1c33..7f05ac1fb 100644 --- a/mobile_config/tests/integrations/gateway_tracker.rs +++ b/mobile_config/tests/integrations/gateway_tracker.rs @@ -312,7 +312,10 @@ async fn test_backfill_gateway_owners(pool: PgPool) -> anyhow::Result<()> { // Count gateway records before backfill let count_before = count_gateways(&pool).await?; - assert_eq!(2, count_before, "Should have 2 gateway records before backfill"); + assert_eq!( + 2, count_before, + "Should have 2 gateway records before backfill" + ); // Run backfill_gateway_owners tracker::backfill_gateway_owners(&pool, &pool).await?; @@ -359,13 +362,19 @@ async fn test_backfill_gateway_owners(pool: PgPool) -> anyhow::Result<()> { assert_eq!(gw1_after.gateway_type, gw1_before.gateway_type); assert_eq!(gw1_after.last_changed_at, gw1_before.last_changed_at); assert_eq!(gw1_after.location, gw1_before.location); - assert_eq!(gw1_after.hash, gw1_before.hash, "Hash should not change during backfill"); + assert_eq!( + gw1_after.hash, gw1_before.hash, + "Hash should not change during backfill" + ); assert_eq!(gw2_after.address, gw2_before.address); assert_eq!(gw2_after.gateway_type, gw2_before.gateway_type); assert_eq!(gw2_after.last_changed_at, gw2_before.last_changed_at); assert_eq!(gw2_after.location, gw2_before.location); - assert_eq!(gw2_after.hash, gw2_before.hash, "Hash should not change during backfill"); + assert_eq!( + gw2_after.hash, gw2_before.hash, + "Hash should not change during backfill" + ); Ok(()) } From e12e0639b33d46fe35a949712d2130e8d4cc0c0e Mon Sep 17 00:00:00 2001 From: Anatolii Kurotych Date: Wed, 17 Dec 2025 16:22:43 +0200 Subject: [PATCH 17/17] Rename migration file --- ...to_gateways.sql => 20251215132218_add_owner_to_gateways.sql} | 0 mobile_config/src/gateway/tracker.rs | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename mobile_config/migrations/{20251215132218_add_owner_hash_to_gateways.sql => 20251215132218_add_owner_to_gateways.sql} (100%) diff --git a/mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql b/mobile_config/migrations/20251215132218_add_owner_to_gateways.sql similarity index 100% rename from mobile_config/migrations/20251215132218_add_owner_hash_to_gateways.sql rename to mobile_config/migrations/20251215132218_add_owner_to_gateways.sql diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index c38dbb4ec..b1e0eff90 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -40,7 +40,7 @@ impl Tracker { let mut interval = tokio::time::interval(self.interval); if let Err(e) = backfill_gateway_owners(&self.pool, &self.metadata).await { - tracing::error!("backfill_gateway_owners is faled. {e}"); + tracing::error!("backfill_gateway_owners is failed. {e}"); } loop {