diff --git a/Cargo.lock b/Cargo.lock index 6cd64b2fc..6ea2c584e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2382,6 +2382,29 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "dataset-downloader" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-compression", + "async-trait", + "aws-local", + "chrono", + "file-store", + "futures-util", + "hex-assignments", + "hextree", + "lazy_static", + "regex", + "sqlx", + "tempfile", + "tokio", + "tokio-util", + "tracing", + "uuid", +] + [[package]] name = "db-store" version = "0.1.0" @@ -4997,6 +5020,7 @@ dependencies = [ "coverage-map", "coverage-point-calculator", "custom-tracing", + "dataset-downloader", "db-store", "derive_builder", "file-store", diff --git a/Cargo.toml b/Cargo.toml index 98b8de21b..04d87da76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,8 @@ members = [ "task_manager", "hex_assignments", "aws_local", -] + "dataset_downloader"] + resolver = "2" [workspace.package] @@ -118,6 +119,9 @@ aws-config = "0.51" aws-sdk-s3 = "0.21" aws-types = { version = "0.51", features = ["hardcoded-credentials"]} tempfile = "3" +regex = "1" +async-compression = { version = "0", features = ["tokio", "gzip"] } + [patch.crates-io] anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madninja/const_pubkey" } diff --git a/aws_local/src/lib.rs b/aws_local/src/lib.rs index 30e9d4b43..41b580fee 100644 --- a/aws_local/src/lib.rs +++ b/aws_local/src/lib.rs @@ -13,9 +13,9 @@ use tonic::transport::Uri; use uuid::Uuid; pub const AWSLOCAL_ENDPOINT_ENV: &str = "AWSLOCAL_ENDPOINT"; -pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://localhost:4566"; pub fn aws_local_default_endpoint() -> String { + const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://localhost:4566"; env::var(AWSLOCAL_ENDPOINT_ENV).unwrap_or_else(|_| AWSLOCAL_DEFAULT_ENDPOINT.to_string()) } diff --git a/dataset_downloader/Cargo.toml b/dataset_downloader/Cargo.toml new file mode 100644 index 000000000..c62e16941 --- /dev/null +++ b/dataset_downloader/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "dataset-downloader" +version = "0.1.0" +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +sqlx = { workspace = true } +anyhow = { workspace = true } +uuid = { workspace = true } +async-trait = { workspace = true } +regex = { workspace = true } +hextree = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +lazy_static = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true } +futures-util = { workspace = true } +async-compression = { workspace = true } +file-store = { path = "../file_store" } +hex-assignments = { path = "../hex_assignments" } + +[dev-dependencies] +aws-local = { path = "../aws_local" } +tempfile = "3" diff --git a/dataset_downloader/src/lib.rs b/dataset_downloader/src/lib.rs new file mode 100644 index 000000000..5b9ec1a5f --- /dev/null +++ b/dataset_downloader/src/lib.rs @@ -0,0 +1,670 @@ +use std::path::{Path, PathBuf}; + +use chrono::{DateTime, Utc}; +use futures_util::{Stream, StreamExt}; +use hex_assignments::assignment::HexAssignments; +use hex_assignments::HexBoostDataAssignmentsExt; +use hextree::disktree::DiskTreeMap; +use lazy_static::lazy_static; +use regex::Regex; +use sqlx::{FromRow, PgPool, Postgres, Transaction}; +use sqlx::{PgConnection, Type}; +use tokio::{fs::File, io::AsyncWriteExt}; + +use file_store::{traits::TimestampDecode, FileStore}; +use hex_assignments::{ + footfall::Footfall, landtype::Landtype, service_provider_override::ServiceProviderOverride, + urbanization::Urbanization, HexAssignment, HexBoostData, +}; + +#[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Type)] +#[sqlx(type_name = "signal_level")] +#[sqlx(rename_all = "lowercase")] +pub enum SignalLevel { + None, + Low, + Medium, + High, +} +#[derive(FromRow)] +pub struct UnassignedHex { + pub uuid: uuid::Uuid, + #[sqlx(try_from = "i64")] + pub hex: u64, + pub signal_level: SignalLevel, + pub signal_power: i32, +} + +pub struct AssignedHex { + pub uuid: uuid::Uuid, + pub hex: u64, + pub signal_level: SignalLevel, + pub signal_power: i32, + pub assignments: HexAssignments, +} + +impl UnassignedHex { + pub fn assign( + self, + data_sets: &impl HexBoostDataAssignmentsExt, + ) -> anyhow::Result { + let cell = hextree::Cell::try_from(self.hex)?; + + Ok(AssignedHex { + uuid: self.uuid, + hex: self.hex, + signal_level: self.signal_level, + signal_power: self.signal_power, + assignments: data_sets.assignments(cell)?, + }) + } +} + +#[async_trait::async_trait] +pub trait NewDataSetHandler: Send + Sync + 'static { + // Calls when new data set arrived but before it marked as processed + // If this function fails, new data sets will not be marked as processed. + // Don't call txn.commit() inside callback + async fn callback( + &self, + txn: &mut Transaction<'_, Postgres>, + data_sets: &HexBoostData, + ) -> anyhow::Result<()>; +} + +#[async_trait::async_trait] +pub trait DataSet: HexAssignment + Send + Sync + 'static { + const TYPE: DataSetType; + + fn timestamp(&self) -> Option>; + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()>; + + fn is_ready(&self) -> bool; + + async fn fetch_first_data_set( + &mut self, + pool: &PgPool, + data_set_directory: &Path, + ) -> anyhow::Result<()> { + let Some(first_data_set) = db::fetch_latest_processed_data_set(pool, Self::TYPE).await? + else { + return Ok(()); + }; + let path = get_data_set_path(data_set_directory, Self::TYPE, first_data_set.time_to_use); + self.update(Path::new(&path), first_data_set.time_to_use)?; + Ok(()) + } + + async fn check_for_available_data_sets( + &self, + store: &FileStore, + pool: &PgPool, + ) -> anyhow::Result<()> { + tracing::info!("Checking for new {} data sets", Self::TYPE.to_prefix()); + let mut new_data_sets = store.list(Self::TYPE.to_prefix(), self.timestamp(), None); + while let Some(new_data_set) = new_data_sets.next().await.transpose()? { + db::insert_new_data_set(pool, &new_data_set.key, Self::TYPE, new_data_set.timestamp) + .await?; + } + Ok(()) + } + + async fn fetch_next_available_data_set( + &mut self, + store: &FileStore, + pool: &PgPool, + data_set_directory: &Path, + ) -> anyhow::Result> { + self.check_for_available_data_sets(store, pool).await?; + + let latest_unprocessed_data_set = + db::fetch_latest_unprocessed_data_set(pool, Self::TYPE, self.timestamp()).await?; + + let Some(latest_unprocessed_data_set) = latest_unprocessed_data_set else { + return Ok(None); + }; + + let path = get_data_set_path( + data_set_directory, + Self::TYPE, + latest_unprocessed_data_set.time_to_use, + ); + + if !latest_unprocessed_data_set.status.is_downloaded() { + download_data_set(store, &latest_unprocessed_data_set.filename, &path).await?; + let con = &mut pool.acquire().await?; + latest_unprocessed_data_set.mark_as_downloaded(con).await?; + tracing::info!( + data_set = latest_unprocessed_data_set.filename, + "Data set download complete" + ); + } + + self.update(Path::new(&path), latest_unprocessed_data_set.time_to_use)?; + + Ok(Some(latest_unprocessed_data_set)) + } +} + +#[async_trait::async_trait] +impl DataSet for Footfall { + const TYPE: DataSetType = DataSetType::Footfall; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.footfall = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.footfall.is_some() + } +} + +#[async_trait::async_trait] +impl DataSet for Landtype { + const TYPE: DataSetType = DataSetType::Landtype; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.landtype = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.landtype.is_some() + } +} + +#[async_trait::async_trait] +impl DataSet for Urbanization { + const TYPE: DataSetType = DataSetType::Urbanization; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.urbanized = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.urbanized.is_some() + } +} + +#[async_trait::async_trait] +impl DataSet for ServiceProviderOverride { + const TYPE: DataSetType = DataSetType::ServiceProviderOverride; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.service_provider_override = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.service_provider_override.is_some() + } +} + +pub struct DataSetDownloader { + pool: PgPool, + store: FileStore, + data_set_directory: PathBuf, +} + +#[derive(FromRow, Debug)] +pub struct NewDataSet { + filename: String, + time_to_use: DateTime, + status: DataSetStatus, +} + +impl NewDataSet { + async fn mark_as_downloaded(&self, con: &mut PgConnection) -> anyhow::Result<()> { + db::set_data_set_status(con, &self.filename, DataSetStatus::Downloaded).await?; + Ok(()) + } + + async fn mark_as_processed(&self, con: &mut PgConnection) -> anyhow::Result<()> { + db::set_data_set_status(con, &self.filename, DataSetStatus::Processed).await?; + Ok(()) + } + + pub fn filename(&self) -> &String { + &self.filename + } +} + +#[derive(Copy, Clone, sqlx::Type, Debug)] +#[sqlx(type_name = "data_set_status")] +#[sqlx(rename_all = "lowercase")] +pub enum DataSetStatus { + Pending, + Downloaded, + Processed, +} + +impl DataSetStatus { + pub fn is_downloaded(&self) -> bool { + matches!(self, Self::Downloaded) + } +} + +pub fn is_hex_boost_data_ready(data_sets: &HexBoostData) -> bool { + let h = &data_sets; + h.urbanization.is_ready() + && h.footfall.is_ready() + && h.landtype.is_ready() + && h.service_provider_override.is_ready() +} + +impl DataSetDownloader { + pub fn new(pool: PgPool, store: FileStore, data_set_directory: PathBuf) -> Self { + Self { + pool, + store, + data_set_directory, + } + } + + pub async fn check_for_new_data_sets( + &mut self, + data_set_processor: Option<&dyn NewDataSetHandler>, + mut data_sets: HexBoostData, + ) -> anyhow::Result { + let new_urbanized = data_sets + .urbanization + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_footfall = data_sets + .footfall + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_landtype = data_sets + .landtype + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_service_provider_override = data_sets + .service_provider_override + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + + let new_data_set = new_urbanized.is_some() + || new_footfall.is_some() + || new_landtype.is_some() + || new_service_provider_override.is_some(); + + if !new_data_set { + return Ok(data_sets); + } + + let mut txn = self.pool.begin().await?; + + if let Some(dsp) = data_set_processor { + if is_hex_boost_data_ready(&data_sets) { + tracing::info!("Processing new data sets"); + dsp.callback(&mut txn, &data_sets).await?; + } + } + + // Mark the new data sets as processed and delete the old ones + if let Some(ref new_urbanized) = new_urbanized { + new_urbanized.mark_as_processed(&mut txn).await?; + } + + if let Some(ref new_footfall) = new_footfall { + new_footfall.mark_as_processed(&mut txn).await?; + } + if let Some(ref new_landtype) = new_landtype { + new_landtype.mark_as_processed(&mut txn).await?; + } + + if let Some(ref new_service_provider_override) = new_service_provider_override { + new_service_provider_override + .mark_as_processed(&mut txn) + .await?; + } + txn.commit().await?; + + // Ignoring tracing error messages can be critical if server out of space + if let Some(new_urbanized) = new_urbanized { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::Urbanization, + new_urbanized.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_urbanized.time_to_use, + "Deleting old urbanized data set file is failed." + ); + } + } + if let Some(new_footfall) = new_footfall { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::Footfall, + new_footfall.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_footfall.time_to_use, + "Deleting old fotfall data set file is failed." + ); + } + } + if let Some(new_landtype) = new_landtype { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::Landtype, + new_landtype.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_landtype.time_to_use, + "Deleting old landtype data set file is failed." + ); + } + } + if let Some(new_service_provider_override) = new_service_provider_override { + if let Err(err) = delete_old_data_sets( + &self.data_set_directory, + DataSetType::ServiceProviderOverride, + new_service_provider_override.time_to_use, + ) + .await + { + tracing::error!( + error = ?err, + data_set_directory = ?self.data_set_directory, + time_to_use = ?new_service_provider_override.time_to_use, + "Deleting old service_provider_override data set file is failed." + ); + } + } + + Ok(data_sets) + } + pub async fn fetch_first_datasets( + &self, + mut data_sets: HexBoostData, + ) -> anyhow::Result { + data_sets + .urbanization + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + data_sets + .footfall + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + data_sets + .landtype + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + data_sets + .service_provider_override + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + Ok(data_sets) + } +} + +fn get_data_set_path( + data_set_directory: &Path, + data_set_type: DataSetType, + time_to_use: DateTime, +) -> PathBuf { + let path = PathBuf::from(format!( + "{}.{}.{}.h3tree", + data_set_type.to_prefix(), + time_to_use.timestamp_millis(), + data_set_type.to_hex_res_prefix(), + )); + let mut dir = data_set_directory.to_path_buf(); + dir.push(path); + dir +} + +lazy_static! { + static ref RE: Regex = Regex::new(r"([a-z,_]+).(\d+)(.res[0-9]{1,2}.h3tree)?").unwrap(); +} + +async fn delete_old_data_sets( + data_set_directory: &Path, + data_set_type: DataSetType, + time_to_use: DateTime, +) -> anyhow::Result<()> { + let mut data_sets = tokio::fs::read_dir(data_set_directory).await?; + while let Some(data_set) = data_sets.next_entry().await? { + let file_name = data_set.file_name(); + let file_name = file_name.to_string_lossy(); + let Some(cap) = RE.captures(&file_name) else { + tracing::warn!("Could not determine data set file type: {}", file_name); + continue; + }; + let prefix = &cap[1]; + let timestamp = cap[2].parse::()?.to_timestamp_millis()?; + if prefix == data_set_type.to_prefix() && timestamp < time_to_use { + tracing::info!(data_set = &*file_name, "Deleting old data set file"); + tokio::fs::remove_file(data_set.path()).await?; + } + } + Ok(()) +} + +async fn download_data_set( + store: &FileStore, + in_file_name: &str, + out_path: &Path, +) -> anyhow::Result<()> { + tracing::info!("Downloading new data set: {}", out_path.to_string_lossy()); + let stream = store.get_raw(in_file_name).await?; + let mut bytes = tokio_util::codec::FramedRead::new( + async_compression::tokio::bufread::GzipDecoder::new(tokio_util::io::StreamReader::new( + stream, + )), + tokio_util::codec::BytesCodec::new(), + ); + let mut file = File::create(&out_path).await?; + while let Some(bytes) = bytes.next().await.transpose()? { + file.write_all(&bytes).await?; + } + Ok(()) +} + +#[derive(Copy, Clone, sqlx::Type)] +#[sqlx(type_name = "data_set_type")] +#[sqlx(rename_all = "snake_case")] +pub enum DataSetType { + Urbanization, + Footfall, + Landtype, + ServiceProviderOverride, +} + +impl DataSetType { + pub fn to_prefix(self) -> &'static str { + match self { + Self::Urbanization => "urbanization", + Self::Footfall => "footfall", + Self::Landtype => "landtype", + Self::ServiceProviderOverride => "service_provider_override", + } + } + + pub fn to_hex_res_prefix(self) -> &'static str { + match self { + Self::Urbanization => "res10", + Self::Footfall => "res10", + Self::Landtype => "res10", + Self::ServiceProviderOverride => "res12", + } + } +} +pub mod db { + use super::*; + + pub async fn fetch_latest_file_date( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result>> { + sqlx::query_scalar("SELECT time_to_use FROM hex_assignment_data_set_status WHERE data_set = $1 ORDER BY time_to_use DESC LIMIT 1") + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + pub async fn insert_new_data_set( + pool: &PgPool, + filename: &str, + data_set_type: DataSetType, + time_to_use: DateTime, + ) -> sqlx::Result<()> { + sqlx::query( + r#" + INSERT INTO hex_assignment_data_set_status (filename, data_set, time_to_use, status) + VALUES ($1, $2, $3, 'pending') + ON CONFLICT DO NOTHING + "#, + ) + .bind(filename) + .bind(data_set_type) + .bind(time_to_use) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn fetch_latest_unprocessed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + since: Option>, + ) -> sqlx::Result> { + sqlx::query_as( + "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status != 'processed' AND data_set = $1 AND COALESCE(time_to_use > $2, TRUE) AND time_to_use <= $3 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .bind(since) + .bind(Utc::now()) + .fetch_optional(pool) + .await + } + + pub async fn fetch_latest_processed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result> { + sqlx::query_as( + "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + pub async fn set_data_set_status( + con: &mut PgConnection, + filename: &str, + status: DataSetStatus, + ) -> sqlx::Result<()> { + sqlx::query("UPDATE hex_assignment_data_set_status SET status = $1 WHERE filename = $2") + .bind(status) + .bind(filename) + .execute(con) + .await?; + Ok(()) + } + + pub async fn fetch_time_of_latest_processed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result>> { + sqlx::query_scalar( + "SELECT time_to_use FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + /// Check if there are any pending or downloaded files prior to the given reward period + pub async fn check_for_unprocessed_data_sets( + pool: &PgPool, + period_end: DateTime, + ) -> sqlx::Result { + Ok(sqlx::query_scalar( + "SELECT COUNT(*) > 0 FROM hex_assignment_data_set_status WHERE time_to_use <= $1 AND status != 'processed'", + ) + .bind(period_end) + .fetch_one(pool) + .await? + || sqlx::query_scalar( + r#" + SELECT COUNT(*) > 0 FROM coverage_objects + WHERE inserted_at < $1 AND uuid IN ( + SELECT + DISTINCT uuid + FROM + hexes + WHERE + urbanized IS NULL + OR footfall IS NULL + OR landtype IS NULL + OR service_provider_override IS NULL + ) + "#, + ) + .bind(period_end) + .fetch_one(pool) + .await?) + } + + pub fn fetch_all_hexes( + con: &mut PgConnection, + ) -> impl Stream> + '_ { + sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(con) + } + + pub fn fetch_hexes_with_null_assignments( + con: &mut PgConnection, + ) -> impl Stream> + '_ { + sqlx::query_as( + "SELECT + uuid, hex, signal_level, signal_power + FROM + hexes + WHERE + urbanized IS NULL + OR footfall IS NULL + OR landtype IS NULL + OR service_provider_override IS NULL", + ) + .fetch(con) + } +} diff --git a/dataset_downloader/tests/downloader_test.rs b/dataset_downloader/tests/downloader_test.rs new file mode 100644 index 000000000..756bc586e --- /dev/null +++ b/dataset_downloader/tests/downloader_test.rs @@ -0,0 +1,174 @@ +use std::path::PathBuf; +use std::str::FromStr; + +use aws_local::{aws_local_default_endpoint, gen_bucket_name, AwsLocal}; +use dataset_downloader::db::{fetch_latest_processed_data_set, fetch_latest_unprocessed_data_set}; +use dataset_downloader::{DataSetDownloader, DataSetType, NewDataSetHandler}; +use sqlx::{PgPool, Postgres, Transaction}; +use tempfile::TempDir; + +use hex_assignments::HexBoostData; + +pub async fn create_data_set_downloader( + pool: PgPool, + file_paths: Vec, + tmp_dir: &TempDir, +) -> (DataSetDownloader, HexBoostData, String) { + let bucket_name = gen_bucket_name(); + + let awsl = AwsLocal::new(&aws_local_default_endpoint(), &bucket_name).await; + + for file_path in file_paths { + awsl.put_file_to_aws(&file_path).await.unwrap(); + } + + let data_set_directory = tmp_dir.path(); + tokio::fs::create_dir_all(data_set_directory).await.unwrap(); + + let file_store = awsl.file_store.clone(); + + let mut dsd = DataSetDownloader::new(pool, file_store, data_set_directory.to_path_buf()); + + let mut hbd = HexBoostData::default(); + + hbd = dsd.fetch_first_datasets(hbd).await.unwrap(); + hbd = dsd.check_for_new_data_sets(None, hbd).await.unwrap(); + + (dsd, hbd, bucket_name) +} + +pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool { + sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS(SELECT 1 FROM hex_assignment_data_set_status WHERE filename = $1) + "#, + ) + .bind(filename) + .fetch_one(pool) + .await + .unwrap() +} + +#[sqlx::test(migrations = "../mobile_verifier/migrations")] +async fn test_dataset_downloader_new_file(pool: PgPool) { + // Scenario: + // 1. DataSetDownloader downloads initial files + // 2. Upload a new file + // 3. DataSetDownloader downloads new file + + let paths = [ + "footfall.1722895200000.gz", + "urbanization.1722895200000.gz", + "landtype.1722895200000.gz", + "service_provider_override.1739404800000.gz", + ]; + + let file_paths: Vec = paths + .iter() + .map(|f| PathBuf::from(format!("./tests/fixtures/{}", f))) + .collect(); + + let tmp_dir = TempDir::new().expect("Unable to create temp dir"); + let (mut data_set_downloader, data_sets, bucket_name) = + create_data_set_downloader(pool.clone(), file_paths, &tmp_dir).await; + assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz").await); + + let awsl = AwsLocal::new(&aws_local_default_endpoint(), &bucket_name).await; + awsl.put_file_to_aws(&PathBuf::from_str("./tests/fixtures/footfall.1732895200000.gz").unwrap()) + .await + .unwrap(); + data_set_downloader + .check_for_new_data_sets(None, data_sets) + .await + .unwrap(); + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); +} + +struct TestDatasetHandler {} + +#[async_trait::async_trait] +impl NewDataSetHandler for TestDatasetHandler { + async fn callback( + &self, + _txn: &mut Transaction<'_, Postgres>, + _data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + Err(anyhow::anyhow!("Err")) + } +} + +#[sqlx::test(migrations = "../mobile_verifier/migrations")] +async fn test_dataset_downloader_callback_failed(pool: PgPool) { + // Scenario: + // 1. DataSetDownloader downloads initial files + // 2. Upload a new file + // 3. Callback fails + // 4. Uploaded file should be processed again + // 3. Callback successful, file marked as processed + + let paths = [ + "footfall.1722895200000.gz", + "urbanization.1722895200000.gz", + "landtype.1722895200000.gz", + "service_provider_override.1739404800000.gz", + ]; + + let file_paths: Vec = paths + .iter() + .map(|f| PathBuf::from(format!("./tests/fixtures/{}", f))) + .collect(); + + let tmp_dir = TempDir::new().expect("Unable to create temp dir"); + let (mut data_set_downloader, data_sets, bucket_name) = + create_data_set_downloader(pool.clone(), file_paths.clone(), &tmp_dir).await; + assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz").await); + + let dh = TestDatasetHandler {}; + + let awsl = AwsLocal::new(&aws_local_default_endpoint(), &bucket_name).await; + awsl.put_file_to_aws(&PathBuf::from_str("./tests/fixtures/footfall.1732895200000.gz").unwrap()) + .await + .unwrap(); + + assert!(data_set_downloader + .check_for_new_data_sets(Some(&dh), data_sets) + .await + .is_err()); + + let last_processed = fetch_latest_processed_data_set(&pool, DataSetType::Footfall) + .await + .unwrap() + .unwrap(); + let last_unprocessed = fetch_latest_unprocessed_data_set(&pool, DataSetType::Footfall, None) + .await + .unwrap() + .unwrap(); + + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); + + assert_eq!(last_processed.filename(), "footfall.1722895200000.gz"); + assert_eq!(last_unprocessed.filename(), "footfall.1732895200000.gz"); + + // initialized datasets and fetches new ones + create_data_set_downloader(pool.clone(), file_paths, &tmp_dir).await; + + let last_processed = fetch_latest_processed_data_set(&pool, DataSetType::Footfall) + .await + .unwrap() + .unwrap(); + assert!( + fetch_latest_unprocessed_data_set(&pool, DataSetType::Footfall, None) + .await + .unwrap() + .is_none() + ); + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); + + assert_eq!(last_processed.filename(), "footfall.1732895200000.gz"); +} diff --git a/dataset_downloader/tests/fixtures/footfall.1722895200000.gz b/dataset_downloader/tests/fixtures/footfall.1722895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/dataset_downloader/tests/fixtures/footfall.1722895200000.gz differ diff --git a/dataset_downloader/tests/fixtures/footfall.1732895200000.gz b/dataset_downloader/tests/fixtures/footfall.1732895200000.gz new file mode 100644 index 000000000..40cdd6818 Binary files /dev/null and b/dataset_downloader/tests/fixtures/footfall.1732895200000.gz differ diff --git a/dataset_downloader/tests/fixtures/landtype.1722895200000.gz b/dataset_downloader/tests/fixtures/landtype.1722895200000.gz new file mode 100644 index 000000000..0fb00d726 Binary files /dev/null and b/dataset_downloader/tests/fixtures/landtype.1722895200000.gz differ diff --git a/dataset_downloader/tests/fixtures/service_provider_override.1739404800000.gz b/dataset_downloader/tests/fixtures/service_provider_override.1739404800000.gz new file mode 100644 index 000000000..c7e8498b3 Binary files /dev/null and b/dataset_downloader/tests/fixtures/service_provider_override.1739404800000.gz differ diff --git a/dataset_downloader/tests/fixtures/urbanization.1722895200000.gz b/dataset_downloader/tests/fixtures/urbanization.1722895200000.gz new file mode 100644 index 000000000..66f369c81 Binary files /dev/null and b/dataset_downloader/tests/fixtures/urbanization.1722895200000.gz differ diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 051dc0ad9..a1f3d536a 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -62,6 +62,7 @@ poc-metrics = { path = "../metrics" } reward-scheduler = { path = "../reward_scheduler" } solana = { path = "../solana" } task-manager = { path = "../task_manager" } +dataset-downloader = { path = "../dataset_downloader" } [dev-dependencies] aws-local = { path = "../aws_local" } diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index d3b4d9a28..5a91fdc3b 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -1,243 +1,37 @@ -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - pin::pin, - time::Duration, -}; +use std::{collections::HashMap, path::PathBuf, pin::pin, time::Duration}; -use chrono::{DateTime, Utc}; +use chrono::Utc; +use dataset_downloader::{ + db, is_hex_boost_data_ready, AssignedHex, DataSetDownloader, NewDataSetHandler, UnassignedHex, +}; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{ - FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampDecode, - TimestampEncode, - }, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, FileStore, }; -use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{Stream, TryFutureExt, TryStreamExt}; use helium_proto::services::poc_mobile::{self as proto, OracleBoostingReportV1}; -use hextree::disktree::DiskTreeMap; -use lazy_static::lazy_static; -use regex::Regex; use rust_decimal::prelude::ToPrimitive; use rust_decimal_macros::dec; -use sqlx::{FromRow, PgPool, QueryBuilder}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use task_manager::{ManagedTask, TaskManager}; -use tokio::{fs::File, io::AsyncWriteExt, time::Instant}; - -use crate::{ - coverage::{NewCoverageObjectNotification, SignalLevel}, - Settings, -}; - -use hex_assignments::{ - assignment::HexAssignments, footfall::Footfall, landtype::Landtype, - service_provider_override::ServiceProviderOverride, urbanization::Urbanization, HexAssignment, - HexBoostData, HexBoostDataAssignmentsExt, -}; - -#[async_trait::async_trait] -pub trait DataSet: HexAssignment + Send + Sync + 'static { - const TYPE: DataSetType; - - fn timestamp(&self) -> Option>; - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()>; - - fn is_ready(&self) -> bool; - - async fn fetch_first_data_set( - &mut self, - pool: &PgPool, - data_set_directory: &Path, - ) -> anyhow::Result<()> { - let Some(first_data_set) = db::fetch_latest_processed_data_set(pool, Self::TYPE).await? - else { - return Ok(()); - }; - let path = get_data_set_path(data_set_directory, Self::TYPE, first_data_set.time_to_use); - self.update(Path::new(&path), first_data_set.time_to_use)?; - Ok(()) - } - - async fn check_for_available_data_sets( - &self, - store: &FileStore, - pool: &PgPool, - ) -> anyhow::Result<()> { - tracing::info!("Checking for new {} data sets", Self::TYPE.to_prefix()); - let mut new_data_sets = store.list(Self::TYPE.to_prefix(), self.timestamp(), None); - while let Some(new_data_set) = new_data_sets.next().await.transpose()? { - db::insert_new_data_set(pool, &new_data_set.key, Self::TYPE, new_data_set.timestamp) - .await?; - } - Ok(()) - } - - async fn fetch_next_available_data_set( - &mut self, - store: &FileStore, - pool: &PgPool, - data_set_directory: &Path, - ) -> anyhow::Result> { - self.check_for_available_data_sets(store, pool).await?; - - let latest_unprocessed_data_set = - db::fetch_latest_unprocessed_data_set(pool, Self::TYPE, self.timestamp()).await?; - - let Some(latest_unprocessed_data_set) = latest_unprocessed_data_set else { - return Ok(None); - }; - - let path = get_data_set_path( - data_set_directory, - Self::TYPE, - latest_unprocessed_data_set.time_to_use, - ); - - if !latest_unprocessed_data_set.status.is_downloaded() { - download_data_set(store, &latest_unprocessed_data_set.filename, &path).await?; - latest_unprocessed_data_set.mark_as_downloaded(pool).await?; - tracing::info!( - data_set = latest_unprocessed_data_set.filename, - "Data set download complete" - ); - } - - self.update(Path::new(&path), latest_unprocessed_data_set.time_to_use)?; - - Ok(Some(latest_unprocessed_data_set)) - } -} - -#[async_trait::async_trait] -impl DataSet for Footfall { - const TYPE: DataSetType = DataSetType::Footfall; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.footfall = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } - - fn is_ready(&self) -> bool { - self.footfall.is_some() - } -} - -#[async_trait::async_trait] -impl DataSet for Landtype { - const TYPE: DataSetType = DataSetType::Landtype; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.landtype = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } +use tokio::time::Instant; - fn is_ready(&self) -> bool { - self.landtype.is_some() - } -} +use crate::{coverage::NewCoverageObjectNotification, Settings}; -#[async_trait::async_trait] -impl DataSet for Urbanization { - const TYPE: DataSetType = DataSetType::Urbanization; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.urbanized = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } - - fn is_ready(&self) -> bool { - self.urbanized.is_some() - } -} - -#[async_trait::async_trait] -impl DataSet for ServiceProviderOverride { - const TYPE: DataSetType = DataSetType::ServiceProviderOverride; - - fn timestamp(&self) -> Option> { - self.timestamp - } - - fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { - self.service_provider_override = Some(DiskTreeMap::open(path)?); - self.timestamp = Some(time_to_use); - Ok(()) - } - - fn is_ready(&self) -> bool { - self.service_provider_override.is_some() - } -} - -pub fn is_hex_boost_data_ready(h: &HexBoostData) -> bool { - h.urbanization.is_ready() - && h.footfall.is_ready() - && h.landtype.is_ready() - && h.service_provider_override.is_ready() -} +use hex_assignments::{HexBoostData, HexBoostDataAssignmentsExt}; pub struct DataSetDownloaderDaemon { - pool: PgPool, + data_set_downloader: DataSetDownloader, data_sets: HexBoostData, - store: FileStore, - data_set_processor: FileSinkClient, - data_set_directory: PathBuf, + + pool: PgPool, + oracle_boostring_writer: OracleBoostingWriter, new_coverage_object_notification: NewCoverageObjectNotification, poll_duration: Duration, } -#[derive(FromRow)] -pub struct NewDataSet { - filename: String, - time_to_use: DateTime, - status: DataSetStatus, -} - -impl NewDataSet { - async fn mark_as_downloaded(&self, pool: &PgPool) -> anyhow::Result<()> { - db::set_data_set_status(pool, &self.filename, DataSetStatus::Downloaded).await?; - Ok(()) - } - - async fn mark_as_processed(&self, pool: &PgPool) -> anyhow::Result<()> { - db::set_data_set_status(pool, &self.filename, DataSetStatus::Processed).await?; - Ok(()) - } -} - -#[derive(Copy, Clone, sqlx::Type)] -#[sqlx(type_name = "data_set_status")] -#[sqlx(rename_all = "lowercase")] -pub enum DataSetStatus { - Pending, - Downloaded, - Processed, -} - -impl DataSetStatus { - pub fn is_downloaded(&self) -> bool { - matches!(self, Self::Downloaded) - } -} - impl ManagedTask for DataSetDownloaderDaemon { fn start_task( self: Box, @@ -294,6 +88,27 @@ impl DataSetDownloaderDaemon { } } +struct OracleBoostingWriter { + data_set_processor: FileSinkClient, +} + +#[async_trait::async_trait] +impl NewDataSetHandler for OracleBoostingWriter { + async fn callback( + &self, + txn: &mut Transaction<'_, Postgres>, + data_sets: &HexBoostData, + ) -> anyhow::Result<()> { + let assigned_coverage_objs = + AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(txn), data_sets).await?; + assigned_coverage_objs + .write(&self.data_set_processor) + .await?; + assigned_coverage_objs.save(txn).await?; + Ok(()) + } +} + impl DataSetDownloaderDaemon { pub fn new( pool: PgPool, @@ -304,123 +119,36 @@ impl DataSetDownloaderDaemon { new_coverage_object_notification: NewCoverageObjectNotification, poll_duration: Duration, ) -> Self { + let data_set_downloader = DataSetDownloader::new(pool.clone(), store, data_set_directory); + let oracle_boostring_writer = OracleBoostingWriter { data_set_processor }; Self { - pool, - data_sets, - store, - data_set_processor, - data_set_directory, + oracle_boostring_writer, + data_set_downloader, new_coverage_object_notification, poll_duration, + data_sets, + pool, } } - pub async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> { - let new_urbanized = self - .data_sets - .urbanization - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - let new_footfall = self - .data_sets - .footfall - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - let new_landtype = self - .data_sets - .landtype - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - let new_service_provider_override = self - .data_sets - .service_provider_override - .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) - .await?; - - // If all of the data sets are ready and there is at least one new one, re-process all - // hex assignments: - let new_data_set = new_urbanized.is_some() - || new_footfall.is_some() - || new_landtype.is_some() - || new_service_provider_override.is_some(); - if is_hex_boost_data_ready(&self.data_sets) && new_data_set { - tracing::info!("Processing new data sets"); - self.data_set_processor - .set_all_oracle_boosting_assignments(&self.pool, &self.data_sets) - .await?; - } - - // Mark the new data sets as processed and delete the old ones - if let Some(new_urbanized) = new_urbanized { - new_urbanized.mark_as_processed(&self.pool).await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::Urbanization, - new_urbanized.time_to_use, - ) - .await?; - } - if let Some(new_footfall) = new_footfall { - new_footfall.mark_as_processed(&self.pool).await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::Footfall, - new_footfall.time_to_use, - ) - .await?; - } - if let Some(new_landtype) = new_landtype { - new_landtype.mark_as_processed(&self.pool).await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::Landtype, - new_landtype.time_to_use, - ) - .await?; - } - if let Some(new_service_provider_override) = new_service_provider_override { - new_service_provider_override - .mark_as_processed(&self.pool) - .await?; - delete_old_data_sets( - &self.data_set_directory, - DataSetType::ServiceProviderOverride, - new_service_provider_override.time_to_use, - ) - .await?; - } - Ok(()) - } - - pub async fn fetch_first_datasets(&mut self) -> anyhow::Result<()> { - self.data_sets - .urbanization - .fetch_first_data_set(&self.pool, &self.data_set_directory) - .await?; - self.data_sets - .footfall - .fetch_first_data_set(&self.pool, &self.data_set_directory) - .await?; - self.data_sets - .landtype - .fetch_first_data_set(&self.pool, &self.data_set_directory) - .await?; - self.data_sets - .service_provider_override - .fetch_first_data_set(&self.pool, &self.data_set_directory) - .await?; - Ok(()) - } - pub async fn run(mut self) -> anyhow::Result<()> { tracing::info!("Starting data set downloader task"); - self.fetch_first_datasets().await?; + self.data_sets = self + .data_set_downloader + .fetch_first_datasets(self.data_sets) + .await?; + // Attempt to fill in any unassigned hexes. This is for the edge case in // which we shutdown before a coverage object updates. if is_hex_boost_data_ready(&self.data_sets) { - self.data_set_processor - .set_unassigned_oracle_boosting_assignments(&self.pool, &self.data_sets) + let mut txn = self.pool.begin().await?; + + self.oracle_boostring_writer + .data_set_processor + .set_unassigned_oracle_boosting_assignments(&mut txn, &self.data_sets) .await?; + + txn.commit().await?; } let mut wakeup = Instant::now() + self.poll_duration; @@ -428,17 +156,22 @@ impl DataSetDownloaderDaemon { #[rustfmt::skip] tokio::select! { _ = self.new_coverage_object_notification.await_new_coverage_object() => { + let mut txn = self.pool.begin().await?; + // If we see a new coverage object, we want to assign only those hexes // that don't have an assignment if is_hex_boost_data_ready(&self.data_sets) { - self.data_set_processor.set_unassigned_oracle_boosting_assignments( - &self.pool, + self.oracle_boostring_writer.data_set_processor.set_unassigned_oracle_boosting_assignments( + &mut txn, &self.data_sets, ).await?; } + + txn.commit().await?; + }, _ = tokio::time::sleep_until(wakeup) => { - self.check_for_new_data_sets().await?; + self.data_sets = self.data_set_downloader.check_for_new_data_sets(Some(&self.oracle_boostring_writer), self.data_sets).await?; wakeup = Instant::now() + self.poll_duration; } } @@ -446,306 +179,51 @@ impl DataSetDownloaderDaemon { } } -fn get_data_set_path( - data_set_directory: &Path, - data_set_type: DataSetType, - time_to_use: DateTime, -) -> PathBuf { - let path = PathBuf::from(format!( - "{}.{}.{}.h3tree", - data_set_type.to_prefix(), - time_to_use.timestamp_millis(), - data_set_type.to_hex_res_prefix(), - )); - let mut dir = data_set_directory.to_path_buf(); - dir.push(path); - dir -} - -lazy_static! { - static ref RE: Regex = Regex::new(r"([a-z,_]+).(\d+)(.res[0-9]{1,2}.h3tree)?").unwrap(); -} - -async fn delete_old_data_sets( - data_set_directory: &Path, - data_set_type: DataSetType, - time_to_use: DateTime, -) -> anyhow::Result<()> { - let mut data_sets = tokio::fs::read_dir(data_set_directory).await?; - while let Some(data_set) = data_sets.next_entry().await? { - let file_name = data_set.file_name(); - let file_name = file_name.to_string_lossy(); - let Some(cap) = RE.captures(&file_name) else { - tracing::warn!("Could not determine data set file type: {}", file_name); - continue; - }; - let prefix = &cap[1]; - let timestamp = cap[2].parse::()?.to_timestamp_millis()?; - if prefix == data_set_type.to_prefix() && timestamp < time_to_use { - tracing::info!(data_set = &*file_name, "Deleting old data set file"); - tokio::fs::remove_file(data_set.path()).await?; - } - } - Ok(()) -} - -async fn download_data_set( - store: &FileStore, - in_file_name: &str, - out_path: &Path, -) -> anyhow::Result<()> { - tracing::info!("Downloading new data set: {}", out_path.to_string_lossy()); - let stream = store.get_raw(in_file_name).await?; - let mut bytes = tokio_util::codec::FramedRead::new( - async_compression::tokio::bufread::GzipDecoder::new(tokio_util::io::StreamReader::new( - stream, - )), - tokio_util::codec::BytesCodec::new(), - ); - let mut file = File::create(&out_path).await?; - while let Some(bytes) = bytes.next().await.transpose()? { - file.write_all(&bytes).await?; - } - Ok(()) -} - -#[derive(Copy, Clone, sqlx::Type)] -#[sqlx(type_name = "data_set_type")] -#[sqlx(rename_all = "snake_case")] -pub enum DataSetType { - Urbanization, - Footfall, - Landtype, - ServiceProviderOverride, -} - -impl DataSetType { - pub fn to_prefix(self) -> &'static str { - match self { - Self::Urbanization => "urbanization", - Self::Footfall => "footfall", - Self::Landtype => "landtype", - Self::ServiceProviderOverride => "service_provider_override", - } - } - - pub fn to_hex_res_prefix(self) -> &'static str { - match self { - Self::Urbanization => "res10", - Self::Footfall => "res10", - Self::Landtype => "res10", - Self::ServiceProviderOverride => "res12", - } - } -} - #[async_trait::async_trait] -pub trait DataSetProcessor: Send + Sync + 'static { +pub trait DataSetProcessor<'a>: Send + Sync + 'static { async fn set_all_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()>; async fn set_unassigned_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()>; } #[async_trait::async_trait] -impl DataSetProcessor for FileSinkClient { +impl DataSetProcessor<'_> for FileSinkClient { async fn set_all_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()> { let assigned_coverage_objs = - AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets) - .await?; + AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(txn), data_sets).await?; assigned_coverage_objs.write(self).await?; - assigned_coverage_objs.save(pool).await?; + assigned_coverage_objs.save(txn).await?; Ok(()) } async fn set_unassigned_oracle_boosting_assignments( &self, - pool: &PgPool, + txn: &mut Transaction<'_, Postgres>, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result<()> { let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( - db::fetch_hexes_with_null_assignments(pool), + db::fetch_hexes_with_null_assignments(txn), data_sets, ) .await?; assigned_coverage_objs.write(self).await?; - assigned_coverage_objs.save(pool).await?; + assigned_coverage_objs.save(txn).await?; Ok(()) } } -pub struct NopDataSetProcessor; - -#[async_trait::async_trait] -impl DataSetProcessor for NopDataSetProcessor { - async fn set_all_oracle_boosting_assignments( - &self, - _pool: &PgPool, - _data_sets: &impl HexBoostDataAssignmentsExt, - ) -> anyhow::Result<()> { - Ok(()) - } - - async fn set_unassigned_oracle_boosting_assignments( - &self, - _pool: &PgPool, - _data_sets: &impl HexBoostDataAssignmentsExt, - ) -> anyhow::Result<()> { - Ok(()) - } -} - -pub mod db { - use super::*; - - pub async fn fetch_latest_file_date( - pool: &PgPool, - data_set_type: DataSetType, - ) -> sqlx::Result>> { - sqlx::query_scalar("SELECT time_to_use FROM hex_assignment_data_set_status WHERE data_set = $1 ORDER BY time_to_use DESC LIMIT 1") - .bind(data_set_type) - .fetch_optional(pool) - .await - } - - pub async fn insert_new_data_set( - pool: &PgPool, - filename: &str, - data_set_type: DataSetType, - time_to_use: DateTime, - ) -> sqlx::Result<()> { - sqlx::query( - r#" - INSERT INTO hex_assignment_data_set_status (filename, data_set, time_to_use, status) - VALUES ($1, $2, $3, 'pending') - ON CONFLICT DO NOTHING - "#, - ) - .bind(filename) - .bind(data_set_type) - .bind(time_to_use) - .execute(pool) - .await?; - Ok(()) - } - - pub async fn fetch_latest_unprocessed_data_set( - pool: &PgPool, - data_set_type: DataSetType, - since: Option>, - ) -> sqlx::Result> { - sqlx::query_as( - "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status != 'processed' AND data_set = $1 AND COALESCE(time_to_use > $2, TRUE) AND time_to_use <= $3 ORDER BY time_to_use DESC LIMIT 1" - ) - .bind(data_set_type) - .bind(since) - .bind(Utc::now()) - .fetch_optional(pool) - .await - } - - pub async fn fetch_latest_processed_data_set( - pool: &PgPool, - data_set_type: DataSetType, - ) -> sqlx::Result> { - sqlx::query_as( - "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" - ) - .bind(data_set_type) - .fetch_optional(pool) - .await - } - - pub async fn set_data_set_status( - pool: &PgPool, - filename: &str, - status: DataSetStatus, - ) -> sqlx::Result<()> { - sqlx::query("UPDATE hex_assignment_data_set_status SET status = $1 WHERE filename = $2") - .bind(status) - .bind(filename) - .execute(pool) - .await?; - Ok(()) - } - - pub async fn fetch_time_of_latest_processed_data_set( - pool: &PgPool, - data_set_type: DataSetType, - ) -> sqlx::Result>> { - sqlx::query_scalar( - "SELECT time_to_use FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" - ) - .bind(data_set_type) - .fetch_optional(pool) - .await - } - - /// Check if there are any pending or downloaded files prior to the given reward period - pub async fn check_for_unprocessed_data_sets( - pool: &PgPool, - period_end: DateTime, - ) -> sqlx::Result { - Ok(sqlx::query_scalar( - "SELECT COUNT(*) > 0 FROM hex_assignment_data_set_status WHERE time_to_use <= $1 AND status != 'processed'", - ) - .bind(period_end) - .fetch_one(pool) - .await? - || sqlx::query_scalar( - r#" - SELECT COUNT(*) > 0 FROM coverage_objects - WHERE inserted_at < $1 AND uuid IN ( - SELECT - DISTINCT uuid - FROM - hexes - WHERE - urbanized IS NULL - OR footfall IS NULL - OR landtype IS NULL - OR service_provider_override IS NULL - ) - "#, - ) - .bind(period_end) - .fetch_one(pool) - .await?) - } - - pub fn fetch_all_hexes(pool: &PgPool) -> impl Stream> + '_ { - sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(pool) - } - - pub fn fetch_hexes_with_null_assignments( - pool: &PgPool, - ) -> impl Stream> + '_ { - sqlx::query_as( - "SELECT - uuid, hex, signal_level, signal_power - FROM - hexes - WHERE - urbanized IS NULL - OR footfall IS NULL - OR landtype IS NULL - OR service_provider_override IS NULL", - ) - .fetch(pool) - } -} - pub struct AssignedCoverageObjects { pub coverage_objs: HashMap>, } @@ -803,7 +281,7 @@ impl AssignedCoverageObjects { Ok(()) } - pub async fn save(self, pool: &PgPool) -> anyhow::Result<()> { + pub async fn save(self, txn: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { const NUMBER_OF_FIELDS_IN_QUERY: u16 = 8; const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize; @@ -833,41 +311,10 @@ impl AssignedCoverageObjects { "#, ) .build() - .execute(pool) + .execute(&mut **txn) .await?; } Ok(()) } } - -#[derive(FromRow)] -pub struct UnassignedHex { - uuid: uuid::Uuid, - #[sqlx(try_from = "i64")] - hex: u64, - signal_level: SignalLevel, - signal_power: i32, -} - -impl UnassignedHex { - fn assign(self, data_sets: &impl HexBoostDataAssignmentsExt) -> anyhow::Result { - let cell = hextree::Cell::try_from(self.hex)?; - - Ok(AssignedHex { - uuid: self.uuid, - hex: self.hex, - signal_level: self.signal_level, - signal_power: self.signal_power, - assignments: data_sets.assignments(cell)?, - }) - } -} - -pub struct AssignedHex { - pub uuid: uuid::Uuid, - pub hex: u64, - pub signal_level: SignalLevel, - pub signal_power: i32, - pub assignments: HexAssignments, -} diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 1a066815d..2a8416f89 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -1,7 +1,5 @@ use crate::{ - banning, - boosting_oracles::db::check_for_unprocessed_data_sets, - coverage, data_session, + banning, coverage, data_session, heartbeats::{self, HeartbeatReward}, radio_threshold, resolve_subdao_pubkey, reward_shares::{ @@ -15,6 +13,7 @@ use crate::{ }; use anyhow::bail; use chrono::{DateTime, TimeZone, Utc}; +use dataset_downloader::db::check_for_unprocessed_data_sets; use db_store::meta; use file_store::{ file_sink::FileSinkClient, diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index 4c13ec16c..2603c591b 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -37,7 +37,9 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use sqlx::PgPool; use std::{collections::HashMap, pin::pin}; +use task_manager::{ManagedTask, TaskManager}; use tempfile::TempDir; +use tokio::task::spawn_local; use uuid::Uuid; #[derive(Clone)] @@ -114,11 +116,9 @@ pub async fn create_data_set_downloader( file_upload: FileUpload, new_coverage_object_notification: NewCoverageObjectNotification, tmp_dir: &TempDir, -) -> (DataSetDownloaderDaemon, PathBuf, String) { - let bucket_name = gen_bucket_name(); - - let endpoint = aws_local_default_endpoint(); - let awsl = AwsLocal::new(endpoint.as_str(), &bucket_name).await; + bucket_name: String, +) -> impl ManagedTask { + let awsl = AwsLocal::new(&aws_local_default_endpoint(), &bucket_name).await; for file_path in file_paths { awsl.put_file_to_aws(&file_path).await.unwrap(); @@ -131,19 +131,20 @@ pub async fn create_data_set_downloader( .unwrap(); let file_store = awsl.file_store.clone(); - let poll_duration = std::time::Duration::from_secs(4); - - let (oracle_boosting_reports, _) = OracleBoostingReportV1::file_sink( - tmp_dir.path(), - file_upload.clone(), - FileSinkCommitStrategy::Automatic, - FileSinkRollTime::Duration(std::time::Duration::from_secs(15 * 60)), - env!("CARGO_PKG_NAME"), - ) - .await - .unwrap(); + let poll_duration = std::time::Duration::from_millis(25); + + let (oracle_boosting_reports, oracle_boosting_reports_server) = + OracleBoostingReportV1::file_sink( + tmp_dir.path(), + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(std::time::Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), + ) + .await + .unwrap(); - let mut data_set_downloader = DataSetDownloaderDaemon::new( + let dsdd = DataSetDownloaderDaemon::new( pool, HexBoostData::default(), file_store, @@ -153,9 +154,10 @@ pub async fn create_data_set_downloader( poll_duration, ); - data_set_downloader.fetch_first_datasets().await.unwrap(); - data_set_downloader.check_for_new_data_sets().await.unwrap(); - (data_set_downloader, data_set_directory, bucket_name) + TaskManager::builder() + .add_task(oracle_boosting_reports_server) + .add_task(dsdd) + .build() } pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool { @@ -171,11 +173,11 @@ pub async fn hex_assignment_file_exist(pool: &PgPool, filename: &str) -> bool { } #[sqlx::test] -async fn test_dataset_downloader(pool: PgPool) { +async fn test_dataset_downloader_daemon(pool: PgPool) { // Scenario: - // 1. DataSetDownloader downloads initial files + // 1. DataSetDownloaderDaemon downloads initial files // 2. Upload a new file - // 3. DataSetDownloader downloads new file + // 3. DataSetDownloaderDaemon downloads new file let paths = [ "footfall.1722895200000.gz", @@ -197,28 +199,52 @@ async fn test_dataset_downloader(pool: PgPool) { let (_, new_coverage_obj_notification) = new_coverage_object_notification_channel(); let tmp_dir = TempDir::new().expect("Unable to create temp dir"); - let (mut data_set_downloader, _, bucket_name) = create_data_set_downloader( + let bucket_name = gen_bucket_name(); + + let task = create_data_set_downloader( pool.clone(), file_paths, file_upload, new_coverage_obj_notification, &tmp_dir, + bucket_name.clone(), ) .await; - assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); - assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); - assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); - assert!(hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz").await); - - let endpoint = aws_local_default_endpoint(); - let awsl = AwsLocal::new(endpoint.as_str(), &bucket_name).await; - awsl.put_file_to_aws( - &PathBuf::from_str("./tests/integrations/fixtures/footfall.1732895200000.gz").unwrap(), - ) - .await - .unwrap(); - data_set_downloader.check_for_new_data_sets().await.unwrap(); - assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); + + let local = tokio::task::LocalSet::new(); + + local + .run_until(async move { + spawn_local(async { + TaskManager::builder() + .add_task(task) + .build() + .start() + .await + .unwrap(); + }); + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + + assert!(hex_assignment_file_exist(&pool, "footfall.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "urbanization.1722895200000.gz").await); + assert!(hex_assignment_file_exist(&pool, "landtype.1722895200000.gz").await); + assert!( + hex_assignment_file_exist(&pool, "service_provider_override.1739404800000.gz") + .await + ); + + let endpoint = aws_local_default_endpoint(); + let awsl = AwsLocal::new(endpoint.as_str(), &bucket_name).await; + awsl.put_file_to_aws( + &PathBuf::from_str("./tests/integrations/fixtures/footfall.1732895200000.gz") + .unwrap(), + ) + .await + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + assert!(hex_assignment_file_exist(&pool, "footfall.1732895200000.gz").await); + }) + .await; } #[sqlx::test] diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 0edfbb047..f6c24f7a3 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -202,8 +202,10 @@ pub async fn set_unassigned_oracle_boosting_assignments( pool: &PgPool, data_sets: &impl HexBoostDataAssignmentsExt, ) -> anyhow::Result> { + let mut tx = pool.begin().await?; + let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( - mobile_verifier::boosting_oracles::data_sets::db::fetch_hexes_with_null_assignments(pool), + dataset_downloader::db::fetch_hexes_with_null_assignments(&mut tx), data_sets, ) .await?; @@ -233,7 +235,9 @@ pub async fn set_unassigned_oracle_boosting_assignments( timestamp, }); } - assigned_coverage_objs.save(pool).await?; + + assigned_coverage_objs.save(&mut tx).await?; + tx.commit().await?; Ok(output) }