diff --git a/Cargo.lock b/Cargo.lock index 7cb7e6283..3439f5a2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3446,7 +3446,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#0ba63207fa6935861ad2c3649edbb3c74f7f87ec" +source = "git+https://www.github.com/helium/proto.git?branch=kurotych%2Fspeedtest-bounds#c4a03661f85a6399ed8c6b6b7245e9cb04301762" dependencies = [ "bytes", "prost", diff --git a/Cargo.toml b/Cargo.toml index 16ac8f801..8eb3685a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,5 +131,5 @@ anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madnin # helium-proto = { path = "../proto" } # beacon = { path = "../proto/beacon" } -# [patch.'https://github.com/helium/proto'] -# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "fix-carrier-id" } +[patch.'https://github.com/helium/proto'] +helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "kurotych/speedtest-bounds" } diff --git a/coverage_point_calculator/src/speedtest.rs b/coverage_point_calculator/src/speedtest.rs index 2e9392585..55dd81b8c 100644 --- a/coverage_point_calculator/src/speedtest.rs +++ b/coverage_point_calculator/src/speedtest.rs @@ -11,19 +11,19 @@ type Millis = u32; #[derive(Debug, Default, Clone, Copy, PartialEq, PartialOrd)] pub struct BytesPs(u64); -impl BytesPs { - const BYTES_PER_MEGABYTE: u64 = 125_000; +pub const BYTES_PER_MEGABIT: u64 = 125_000; +impl BytesPs { pub fn new(bytes_per_second: u64) -> Self { Self(bytes_per_second) } pub fn mbps(megabytes_per_second: u64) -> Self { - Self(megabytes_per_second * Self::BYTES_PER_MEGABYTE) + Self(megabytes_per_second * BYTES_PER_MEGABIT) } fn as_mbps(&self) -> u64 { - self.0 / Self::BYTES_PER_MEGABYTE + self.0 / BYTES_PER_MEGABIT } pub fn as_bps(&self) -> u64 { diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index ae0284017..3d12a8b54 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -3,6 +3,7 @@ use crate::{ Settings, }; use chrono::{DateTime, Utc}; +use coverage_point_calculator::speedtest::BYTES_PER_MEGABIT; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, file_sink::FileSinkClient, @@ -31,6 +32,10 @@ use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6; +// The limit must be 300 megabits per second. +// Values in proto are in bytes/sec format. +// Convert 300 megabits per second to bytes per second. +const SPEEDTEST_MAX_BYTES_PER_SECOND: u64 = 300 * BYTES_PER_MEGABIT; pub type EpochSpeedTests = HashMap>; @@ -176,11 +181,15 @@ where &self, speedtest: &CellSpeedtestIngestReport, ) -> anyhow::Result { - let pubkey = speedtest.report.pubkey.clone(); + if speedtest.report.upload_speed > SPEEDTEST_MAX_BYTES_PER_SECOND + || speedtest.report.download_speed > SPEEDTEST_MAX_BYTES_PER_SECOND + { + return Ok(SpeedtestResult::SpeedtestValueOutOfBounds); + } match self .gateway_info_resolver - .resolve_gateway_info(&pubkey) + .resolve_gateway_info(&speedtest.report.pubkey) .await? { Some(gw_info) if gw_info.is_data_only() => { diff --git a/mobile_verifier/tests/integrations/speedtests.rs b/mobile_verifier/tests/integrations/speedtests.rs index d007639ef..5066c06df 100644 --- a/mobile_verifier/tests/integrations/speedtests.rs +++ b/mobile_verifier/tests/integrations/speedtests.rs @@ -1,5 +1,6 @@ use crate::common; use chrono::{DateTime, NaiveDateTime, Utc}; +use coverage_point_calculator::speedtest::BYTES_PER_MEGABIT; use file_store::{ file_info_poller::FileInfoStream, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, @@ -7,7 +8,8 @@ use file_store::{ }; use helium_crypto::PublicKeyBinary; use helium_proto::services::{ - mobile_config::DeviceType as MobileDeviceType, poc_mobile::SpeedtestAvgValidity, + mobile_config::DeviceType as MobileDeviceType, + poc_mobile::{SpeedtestAvgValidity, SpeedtestVerificationResult as SpeedtestResult}, }; use mobile_config::{ client::{gateway_client::GatewayInfoResolver, ClientError}, @@ -86,17 +88,325 @@ async fn speedtests_average_should_only_include_last_48_hours( Ok(()) } -fn file_info_stream( - speedtests: Vec, -) -> FileInfoStream { - let file_info = FileInfo { - key: "key".to_string(), - prefix: "prefix".to_string(), - timestamp: Utc::now(), - size: 0, +#[sqlx::test] +async fn speedtest_upload_exceeds_300megabits_ps_limit(pool: Pool) -> anyhow::Result<()> { + let (_tx, rx) = tokio::sync::mpsc::channel(2); + let gateway_info_resolver = MockGatewayInfoResolver {}; + let (speedtest_avg_client, _speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); + + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + let speedtest_report = CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial".to_string(), + timestamp: Utc::now(), + upload_speed: mbps(400), // exceeds limit + download_speed: mbps(100), // within limit + latency: 10, + }, }; - FileInfoStream::new("default".to_string(), file_info, speedtests) + let result = daemon.validate_speedtest(&speedtest_report).await?; + assert_eq!(result, SpeedtestResult::SpeedtestValueOutOfBounds); + + Ok(()) +} + +#[sqlx::test] +async fn speedtest_download_exceeds_300_megabits_ps_limit( + pool: Pool, +) -> anyhow::Result<()> { + let (_tx, rx) = tokio::sync::mpsc::channel(2); + let gateway_info_resolver = MockGatewayInfoResolver {}; + let (speedtest_avg_client, _speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); + + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + // Create speedtest with download speed > 300Mbits + let speedtest_report = CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial".to_string(), + timestamp: Utc::now(), + upload_speed: mbps(50), // within limit + download_speed: mbps(350), // exceeds limit + latency: 10, + }, + }; + + let result = daemon.validate_speedtest(&speedtest_report).await?; + assert_eq!(result, SpeedtestResult::SpeedtestValueOutOfBounds); + + Ok(()) +} + +#[sqlx::test] +async fn speedtest_both_speeds_exceed_300_megabits_ps_limit( + pool: Pool, +) -> anyhow::Result<()> { + let (_tx, rx) = tokio::sync::mpsc::channel(2); + let gateway_info_resolver = MockGatewayInfoResolver {}; + let (speedtest_avg_client, _speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); + + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + // Create speedtest with both speeds > 300Mbits + let speedtest_report = CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial".to_string(), + timestamp: Utc::now(), + upload_speed: mbps(400), // exceeds limit + download_speed: mbps(350), // exceeds limit + latency: 10, + }, + }; + + let result = daemon.validate_speedtest(&speedtest_report).await?; + assert_eq!(result, SpeedtestResult::SpeedtestValueOutOfBounds); + + Ok(()) +} + +#[sqlx::test] +async fn speedtest_within_300_megabits_ps_limit_should_be_valid( + pool: Pool, +) -> anyhow::Result<()> { + let (_tx, rx) = tokio::sync::mpsc::channel(2); + let gateway_info_resolver = MockGatewayInfoResolver {}; + let (speedtest_avg_client, _speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); + + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + // Create speedtest with both speeds within 300Mbits limit + let speedtest_report = CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial".to_string(), + timestamp: Utc::now(), + upload_speed: mbps(100), // within limit + download_speed: mbps(200), // within limit + latency: 10, + }, + }; + + let result = daemon.validate_speedtest(&speedtest_report).await?; + assert_eq!(result, SpeedtestResult::SpeedtestValid); + + Ok(()) +} + +#[sqlx::test] +async fn speedtest_exactly_300_megabits_ps_limit_should_be_valid( + pool: Pool, +) -> anyhow::Result<()> { + let (_tx, rx) = tokio::sync::mpsc::channel(2); + let gateway_info_resolver = MockGatewayInfoResolver {}; + let (speedtest_avg_client, _speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); + + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + // Create speedtest with speeds exactly at 300Mbits limit + let speedtest_report = CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial".to_string(), + timestamp: Utc::now(), + upload_speed: mbps(300), // should be valid + download_speed: mbps(300), // should be valid + latency: 10, + }, + }; + + let result = daemon.validate_speedtest(&speedtest_report).await?; + assert_eq!(result, SpeedtestResult::SpeedtestValid); + + Ok(()) +} + +#[sqlx::test] +async fn invalid_speedtests_should_not_affect_average(pool: Pool) -> anyhow::Result<()> { + let (_tx, rx) = tokio::sync::mpsc::channel(2); + let gateway_info_resolver = MockGatewayInfoResolver {}; + let (speedtest_avg_client, speedtest_avg_receiver) = common::create_file_sink(); + let (verified_client, _verified_receiver) = common::create_file_sink(); + + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + + // Create a mix of valid and invalid speedtests + // Valid speedtests will average to "Poor" tier (0.25 multiplier) + // Invalid speedtests have very high speeds that would result in "Good" tier if included + let speedtests = vec![ + // Valid speedtest - should be included in average + // Upload: 3 Mbps = Poor tier (≥2 but <5) + // Download: 35 Mbps = Poor tier (≥30 but <50) + // Latency: 80ms = Poor tier (≥75 but <100) + CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial-1".to_string(), + timestamp: parse_dt("2024-01-01 01:00:00"), + upload_speed: mbps(3), // Poor tier + download_speed: mbps(35), // Poor tier + latency: 80, // Poor tier + }, + }, + // Invalid speedtest - upload exceeds 300Mbits, should NOT be included + // If included, this would push toward Good tier due to very high speeds + CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial-2".to_string(), + timestamp: parse_dt("2024-01-01 02:00:00"), + upload_speed: mbps(900), // Invalid (way above limit) + download_speed: mbps(150), // Good tier + latency: 20, // Good tier + }, + }, + // Another valid speedtest - should be included in average + // Upload: 4 Mbps = Poor tier (≥2 but <5) + // Download: 40 Mbps = Poor tier (≥30 but <50) + // Latency: 90ms = Poor tier (≥75 but <100) + CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial-3".to_string(), + timestamp: parse_dt("2024-01-01 03:00:00"), + upload_speed: mbps(4), // Poor tier + download_speed: mbps(40), // Poor tier + latency: 90, // Poor tier + }, + }, + // Invalid speedtest - download exceeds 300Mbits, should NOT be included + // If included, this would push toward Good tier due to very high speeds + CellSpeedtestIngestReport { + received_timestamp: Utc::now(), + report: CellSpeedtest { + pubkey: hotspot.clone(), + serial: "test-serial-4".to_string(), + timestamp: parse_dt("2024-01-01 04:00:00"), + upload_speed: mbps(15), // 15 Mbps - would be Good tier + download_speed: mbps(900), // invalid (way above limit) + latency: 30, // Would be Good tier + }, + }, + ]; + + let stream = file_info_stream(speedtests); + + // Drop the daemon when it's done running to close the channel + { + let daemon = SpeedtestDaemon::new( + pool, + gateway_info_resolver, + rx, + speedtest_avg_client, + verified_client, + ); + + daemon.process_file(stream).await?; + } + + let avgs = speedtest_avg_receiver.finish().await?; + + // Should have 2 average entries (one for each valid speedtest) + // Invalid speedtests with speeds > 300Mbits should NOT generate averages + assert_eq!( + 2, + avgs.len(), + "Only valid speedtests should generate averages" + ); + + // Verify the averages only include valid speedtests + // Expected averages based on the two valid speedtests: + // Upload: (3 Mbps + 4 Mbps) / 2 = 3.5 Mbps = Poor tier + // Download: (35 Mbps + 40 Mbps) / 2 = 37.5 Mbps = Poor tier + // Latency: (80ms + 90ms) / 2 = 85ms = Poor tier + // Result: Poor tier = 0.25 multiplier + let expected_upload_avg = mbps(3) + mbps(4); // Sum before division in the code + let expected_upload_avg = expected_upload_avg / 2; // (3.5 Mbps) + let expected_download_avg = mbps(35) + mbps(40); // Sum before division in the code + let expected_download_avg = expected_download_avg / 2; // (37.5 Mbps) + + // Check the last average (which includes both valid speedtests) + let last_avg = &avgs[1]; + assert_eq!( + expected_upload_avg, last_avg.upload_speed_avg_bps, + "Upload average should only include valid speedtests" + ); + assert_eq!( + expected_download_avg, last_avg.download_speed_avg_bps, + "Download average should only include valid speedtests" + ); + + // Most importantly: verify that the reward multiplier is 0.25 (Poor tier) + // If invalid speedtests were included, the multiplier would be much higher + assert_eq!( + 0.25, last_avg.reward_multiplier, + "Reward multiplier should be 0.25 (Poor tier) based only on valid speedtests" + ); + + Ok(()) } fn speedtest( @@ -124,7 +434,18 @@ fn parse_dt(dt: &str) -> DateTime { .expect("unable_to_parse") .and_utc() } - fn mbps(mbps: u64) -> u64 { - mbps * 125000 + mbps * BYTES_PER_MEGABIT +} +fn file_info_stream( + speedtests: Vec, +) -> FileInfoStream { + let file_info = FileInfo { + key: "key".to_string(), + prefix: "prefix".to_string(), + timestamp: Utc::now(), + size: 0, + }; + + FileInfoStream::new("default".to_string(), file_info, speedtests) }