From d09190c2b177cb17381cd7fdf392fcc241ccc398 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 27 Mar 2024 17:10:09 -0700 Subject: [PATCH] refactor: update todo comments, small reading perf --- file-exchange/src/config.rs | 17 ------- file-exchange/src/discover/mod.rs | 3 -- file-exchange/src/download_client/mod.rs | 4 +- file-exchange/src/download_client/signer.rs | 1 - file-exchange/src/manifest/file_reader.rs | 1 - file-exchange/src/manifest/ipfs.rs | 7 --- file-exchange/src/manifest/mod.rs | 1 - file-exchange/src/manifest/store.rs | 44 +++++++------------ .../src/transaction_manager/staking.rs | 1 - file-service/src/config.rs | 1 - file-service/src/file_server/service.rs | 1 - file-service/src/file_server/status.rs | 1 - 12 files changed, 18 insertions(+), 64 deletions(-) diff --git a/file-exchange/src/config.rs b/file-exchange/src/config.rs index c954ae5..e659fbc 100644 --- a/file-exchange/src/config.rs +++ b/file-exchange/src/config.rs @@ -297,7 +297,6 @@ pub struct DownloaderArgs { } /// Publisher takes the files, generate bundle manifest, and publish to IPFS -//TODO: a single command to publish a range of files #[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] #[group(required = false, multiple = true)] pub struct PublisherArgs { @@ -335,8 +334,6 @@ pub struct PublisherArgs { value_name = "FILE_TYPE", value_enum, env = "FILE_TYPE", - //TODO: use enum - // value_parser = clap::value_parser!(FileType::from_str), help = "Type of the file (e.g., sql_snapshot, flatfiles)" )] pub file_type: String, @@ -345,8 +342,6 @@ pub struct PublisherArgs { long, value_name = "FILE_VERSION", env = "FILE_VERSION", - //TODO: use enum - // value_parser = clap::value_parser!(FileType::from_str), help = "Bundle versioning" )] pub bundle_version: String, @@ -528,18 +523,6 @@ pub enum FileType { Flatfiles, } -// impl FromStr for FileType { -// type Err = &'static str; - -// fn from_str(s: &str) -> Result { -// match s { -// "sql_snapshot" => Ok(FileType::SqlSnapshot), -// "flatfiles" => Ok(FileType::Flatfiles), -// _ => Err("Invalid file type"), -// } -// } -// } - /// Sets up tracing, allows log level to be set from the environment variables pub fn init_tracing(format: &str) -> Result<(), SetGlobalDefaultError> { let filter = EnvFilter::from_default_env(); diff --git a/file-exchange/src/discover/mod.rs b/file-exchange/src/discover/mod.rs index 0837593..d84f125 100644 --- a/file-exchange/src/discover/mod.rs +++ b/file-exchange/src/discover/mod.rs @@ -17,9 +17,7 @@ use crate::manifest::{ }; use crate::util::{UDecimal18, GRT}; -// Pair indexer operator address and indexer service endpoint (operator, indexer_url) // persumeably this should not be handled by clients themselves -//TODO: smarter type for tracking available endpoints #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct ServiceEndpoint { pub operator: String, @@ -242,7 +240,6 @@ pub async fn unavailable_files(file_map: &FileAvailbilityMap) -> Vec { missing_file } -//TODO: directly access the field instead #[derive(Debug, Serialize, Deserialize)] pub struct Operator { #[serde(alias = "publicKey")] diff --git a/file-exchange/src/download_client/mod.rs b/file-exchange/src/download_client/mod.rs index c92f093..d4e792f 100644 --- a/file-exchange/src/download_client/mod.rs +++ b/file-exchange/src/download_client/mod.rs @@ -171,7 +171,6 @@ impl Downloader { } /// Read bundle manifiest and download the individual file manifests - //TODO: update once there is payment pub async fn download_bundle(&self) -> Result<(), Error> { self.init_target_chunks(&self.bundle); tracing::trace!( @@ -235,7 +234,6 @@ impl Downloader { } /// Download a file by reading its chunk manifest - //TODO: update once there is payment pub async fn download_file_manifest(&self, meta: FileManifestMeta) -> Result<(), Error> { tracing::debug!( file_spec = tracing::field::debug(&meta), @@ -403,7 +401,7 @@ impl Downloader { tracing::warn!(err_msg); return Err(Error::DataUnavailable(err_msg.to_string())); }; - //TODO: do no add ipfs_hash here, construct query_endpoint after updating route 'files/id/:id' + //TODO: do not add ipfs_hash here, construct query_endpoint after updating route 'files/id/:id' let query_endpoint = service.service_endpoint.clone() + "/files/id/" + &self.config.ipfs_hash; let file_hash = meta.meta_info.hash.clone(); diff --git a/file-exchange/src/download_client/signer.rs b/file-exchange/src/download_client/signer.rs index 7c13df1..473343d 100644 --- a/file-exchange/src/download_client/signer.rs +++ b/file-exchange/src/download_client/signer.rs @@ -32,7 +32,6 @@ impl Access for TapReceipt { } } -//TODO: would be different with TAPv1/2: add record_receipt, update_allocations, get_receipts impl ReceiptSigner { pub async fn new(signer: SecretKey, chain_id: U256, verifier: Address) -> Self { Self { diff --git a/file-exchange/src/manifest/file_reader.rs b/file-exchange/src/manifest/file_reader.rs index e2a44c4..94b85fb 100644 --- a/file-exchange/src/manifest/file_reader.rs +++ b/file-exchange/src/manifest/file_reader.rs @@ -5,7 +5,6 @@ use std::path::Path; use crate::errors::Error; -// TODO: REFACTOR; read chunk can be further refactors, check for valid path, and use in serve_file_range // Read a chunk from the file at the file_path from specified start and end bytes pub fn read_chunk(file_path: &Path, (start, end): (u64, u64)) -> Result { let mut file = File::open(file_path).map_err(Error::FileIOError)?; diff --git a/file-exchange/src/manifest/ipfs.rs b/file-exchange/src/manifest/ipfs.rs index f28b0ec..1125c99 100644 --- a/file-exchange/src/manifest/ipfs.rs +++ b/file-exchange/src/manifest/ipfs.rs @@ -119,8 +119,6 @@ pub fn create_ipfs_client(uri: &str) -> IpfsClient { tracing::info!(ipfs_address, "Connect to IPFS node"); - //TODO: Test IPFS client - match IpfsClient::new(&ipfs_address) { Ok(ipfs_client) => ipfs_client, Err(e) => { @@ -139,14 +137,9 @@ mod tests { use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; - // fn test_client() -> IpfsClient { - // IpfsClient::new("https://ipfs.network.thegraph.com") - // } - #[tokio::test] async fn fetch_random_subgraph_yaml() { let ipfs_hash = "Qmc1mmagMJqopw2zb1iUTPRMhahMvEAKpQGS3KvuL9cpaX"; - // https://ipfs.network.thegraph.com/api/v0/cat?arg=Qmc1mmagMJqopw2zb1iUTPRMhahMvEAKpQGS3KvuL9cpaX let client = create_ipfs_client("https://ipfs.network.thegraph.com"); let retry_strategy = ExponentialBackoff::from_millis(10) diff --git a/file-exchange/src/manifest/mod.rs b/file-exchange/src/manifest/mod.rs index 1682a97..d1722b0 100644 --- a/file-exchange/src/manifest/mod.rs +++ b/file-exchange/src/manifest/mod.rs @@ -70,7 +70,6 @@ pub struct FileManifestMeta { } /* Bundle - packaging of file manifests mapped into local files */ -//TODO: Add GraphQL derivation #[derive(Clone, Debug, Serialize, Deserialize, SimpleObject)] pub struct Bundle { pub ipfs_hash: String, diff --git a/file-exchange/src/manifest/store.rs b/file-exchange/src/manifest/store.rs index 3357100..4f8ca63 100644 --- a/file-exchange/src/manifest/store.rs +++ b/file-exchange/src/manifest/store.rs @@ -245,7 +245,6 @@ impl Store { ); // Read all files in bundle to verify locally. This may cause a long initialization time - //TODO: allow for concurrent validation of files for file_meta in &local.bundle.file_manifests { self.read_and_validate_file(file_meta, &local.local_path) .await?; @@ -274,37 +273,28 @@ impl Store { ); // loop through file manifest byte range - //multipart read/ vectorized read - for i in 0..(file_manifest.total_bytes / file_manifest.chunk_size + 1) { - // read range - let start = i * file_manifest.chunk_size; - let end: usize = - (u64::min(start + file_manifest.chunk_size, file_manifest.total_bytes) - 1) - .try_into() - .unwrap(); - tracing::trace!( - i, - start_byte = tracing::field::debug(&start), - end_byte = tracing::field::debug(&end), - "Verify chunk index" - ); - let chunk_hash = file_manifest.chunk_hashes[i as usize].clone(); - - // read chunk - // let chunk_data = read_chunk(&file_path, (start, end))?; - let start: usize = start.try_into().unwrap(); - // let length: usize = end - start + 1; - let range = std::ops::Range { - start, - end: end + 1, - }; - let file_name = meta_info.name.clone(); + let chunk_ops: Vec<_> = (0..(file_manifest.total_bytes / file_manifest.chunk_size + 1)) + .map(|i| { + let start = i * file_manifest.chunk_size; + let end = (u64::min(start + file_manifest.chunk_size, file_manifest.total_bytes) + - 1) as usize; + let chunk_hash = file_manifest.chunk_hashes[i as usize].clone(); + + let range = std::ops::Range { + start: start as usize, + end: end + 1, + }; + (range, chunk_hash) + }) + .collect(); + + let file_name = meta_info.name.clone(); + for (range, chunk_hash) in chunk_ops { let chunk_data = self.range_read(&file_name, &range).await?; // verify chunk if !verify_chunk(&chunk_data, &chunk_hash) { tracing::error!( file = tracing::field::debug(&file_name), - chunk_index = tracing::field::debug(&i), chunk_hash = tracing::field::debug(&chunk_hash), "Cannot locally verify the serving file" ); diff --git a/file-exchange/src/transaction_manager/staking.rs b/file-exchange/src/transaction_manager/staking.rs index 5821f8b..2a4fdc2 100644 --- a/file-exchange/src/transaction_manager/staking.rs +++ b/file-exchange/src/transaction_manager/staking.rs @@ -50,7 +50,6 @@ impl TransactionManager { tokens: U256, epoch: u64, ) -> Result<(H160, Option), Error> { - //TODO: Start with hardcoding, later add field indexer address to TX manager, tokens to fn params let existing_ids: Vec = vec![]; let metadata: [u8; 32] = [0; 32]; diff --git a/file-service/src/config.rs b/file-service/src/config.rs index 33411ef..23c6c45 100644 --- a/file-service/src/config.rs +++ b/file-service/src/config.rs @@ -80,7 +80,6 @@ pub struct ServerArgs { default_value = "pretty" )] pub log_format: LogFormat, - //TODO: More complex price management #[arg( long, value_name = "default-price-per-byte", diff --git a/file-service/src/file_server/service.rs b/file-service/src/file_server/service.rs index 7286f28..8c870bd 100644 --- a/file-service/src/file_server/service.rs +++ b/file-service/src/file_server/service.rs @@ -54,7 +54,6 @@ pub async fn file_service( match req.get("content-range") { Some(r) => { let range = parse_range_header(r)?; - //TODO: validate receipt serve_file_range( context.state.store.clone(), &file_manifest.meta_info.name, diff --git a/file-service/src/file_server/status.rs b/file-service/src/file_server/status.rs index c3a7b46..78a5d8c 100644 --- a/file-service/src/file_server/status.rs +++ b/file-service/src/file_server/status.rs @@ -38,7 +38,6 @@ impl From for GraphQlBundleManifest { #[derive(Clone, Debug, SimpleObject)] pub struct GraphQlBundle { pub ipfs_hash: String, - //TODO: make local path available for admin only pub manifest: GraphQlBundleManifest, pub file_manifests: Vec, }