Skip to content

Commit

Permalink
feat: downloader client with concurrency and onchain configs
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Feb 26, 2024
1 parent 0f8b3d8 commit d9bc87c
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 26 deletions.
37 changes: 33 additions & 4 deletions file-exchange/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Cli {
pub enum Role {
Downloader(DownloaderArgs),
Publisher(PublisherArgs),
Wallet(WalletArgs),
Wallet(OnChainArgs),
}

/// Server enable payments through the staking contract,
Expand All @@ -81,7 +81,7 @@ pub enum OnchainAction {

#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
#[group(required = false, multiple = true)]
pub struct WalletArgs {
pub struct OnChainArgs {
#[clap(subcommand)]
pub action: Option<OnchainAction>,
#[clap(
Expand Down Expand Up @@ -111,9 +111,16 @@ pub struct WalletArgs {
long,
value_name = "network_subgraph",
env = "NETWORK_SUBGRAPH",
help = "Graph Network subgraph API endpoint"
help = "The Graph Network subgraph API endpoint"
)]
pub network_subgraph: String,
#[clap(
long,
value_name = "escrow_subgraph",
env = "ESCROW_SUBGRAPH",
help = "The Graph Scalar TAP subgraph API endpoint"
)]
pub escrow_subgraph: String,
}

#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -153,7 +160,7 @@ pub struct DownloaderArgs {
env = "VERIFIER",
help = "TAP verifier contract address"
)]
pub verifier: String,
pub verifier: Option<String>,
// Trust tracking should be done by the gateway/DHT
#[arg(
long,
Expand All @@ -178,6 +185,20 @@ pub struct DownloaderArgs {
help = "Auth token that to query for free"
)]
pub free_query_auth_token: Option<String>,
#[clap(
long,
value_name = "NETWORK_SUBGRAPH",
env = "NETWORK_SUBGRAPH",
help = "The Graph Network Subgraph API endpoint"
)]
pub network_subgraph: String,
#[clap(
long,
value_name = "ESCROW_SUBGRAPH",
env = "ESCROW_SUBGRAPH",
help = "The Graph Scalar TAP Subgraph API endpoint"
)]
pub escrow_subgraph: String,

#[arg(
long,
Expand All @@ -187,6 +208,14 @@ pub struct DownloaderArgs {
help = "Maximum retry for each chunk"
)]
pub max_retry: u64,
#[arg(
long,
value_name = "PROVIDER_CONCURRENCY",
default_value = "10",
env = "PROVIDER_CONCURRENCY",
help = "Configure maximum concurrency limit for downloading the bundle from; affects cost estimation for escrow accounts, transfer speed performance, failure rate"
)]
pub provider_concurrency: u64,
}

/// Publisher takes the files, generate bundle manifest, and publish to IPFS
Expand Down
88 changes: 72 additions & 16 deletions file-exchange/src/download_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use tokio::sync::Mutex;

use crate::errors::Error;
use crate::manifest::{
file_hasher::verify_chunk, ipfs::IpfsClient, manifest_fetcher::read_bundle, Bundle,
FileManifestMeta,
};
use crate::{config::DownloaderArgs, discover, graphql};
use crate::{config::OnChainArgs, errors::Error, transaction_manager::TransactionManager};
use crate::{
discover::{Finder, ServiceEndpoint},
download_client::signer::Access,
Expand All @@ -47,14 +47,21 @@ pub struct Downloader {
// key is the file manifest identifier (IPFS hash) and value is a HashSet of downloaded chunk indices
target_chunks: Arc<StdMutex<HashMap<String, HashSet<u64>>>>,
chunk_max_retry: u64,
provider_concurrency: u64,
bundle_finder: Finder,
payment: PaymentMethod,
}

/// A downloader can either provide a free query auth token or receipt signer
pub enum PaymentMethod {
FreeQuery(String),
TAPSigner(ReceiptSigner),
PaidQuery(OnChainSigner),
}

pub struct OnChainSigner {
#[allow(dead_code)]
transaction_manager: TransactionManager,
receipt_signer: ReceiptSigner,
}

impl Downloader {
Expand All @@ -74,9 +81,9 @@ impl Downloader {
let signing_key = wallet.signer().to_bytes();
let secp256k1_private_key =
SecretKey::from_slice(&signing_key).expect("Private key from wallet");
let provider_link = args.provider.expect("Provider required to connect");
let provider =
Provider::<Http>::try_from(&args.provider.expect("Provider required to connect"))
.expect("Connect to the provider");
Provider::<Http>::try_from(&provider_link).expect("Connect to the provider");
//TODO: migrate ethers type to alloy
let chain_id = U256::from(
provider
Expand All @@ -85,14 +92,27 @@ impl Downloader {
.expect("Get chain id from provider")
.as_u128(),
);
PaymentMethod::TAPSigner(
ReceiptSigner::new(
secp256k1_private_key,
chain_id,
Address::from_str(&args.verifier).expect("Parse verifier"),
)
.await,
let transaction_manager = TransactionManager::new(OnChainArgs {
action: None,
mnemonic: mnemonic.to_string(),
provider: provider_link.clone(),
verifier: args.verifier.clone(),
network_subgraph: args.network_subgraph,
escrow_subgraph: args.escrow_subgraph,
})
.await
.expect("Initialize transaction manager for paid queries");
let receipt_signer = ReceiptSigner::new(
secp256k1_private_key,
chain_id,
Address::from_str(&args.verifier.expect("Provide verifier"))
.expect("Parse verifier"),
)
.await;
PaymentMethod::PaidQuery(OnChainSigner {
transaction_manager,
receipt_signer,
})
} else {
panic!("No payment wallet nor free query token provided");
};
Expand All @@ -108,6 +128,7 @@ impl Downloader {
indexer_blocklist: Arc::new(StdMutex::new(HashSet::new())),
target_chunks: Arc::new(StdMutex::new(HashMap::new())),
chunk_max_retry: args.max_retry,
provider_concurrency: args.provider_concurrency,
bundle_finder: Finder::new(ipfs_client),
payment,
}
Expand Down Expand Up @@ -273,8 +294,9 @@ impl Downloader {
async fn payment_header(&self, receiver: &str) -> Result<(HeaderName, String), Error> {
match &self.payment {
PaymentMethod::FreeQuery(token) => Ok((AUTHORIZATION, token.to_string())),
PaymentMethod::TAPSigner(signer) => {
PaymentMethod::PaidQuery(signer) => {
let receipt = signer
.receipt_signer
.create_receipt(graphql::allocation_id(receiver), &discover::Finder::fees())
.await?;
Ok((
Expand Down Expand Up @@ -338,11 +360,23 @@ impl Downloader {
.filter(|url| !blocklist.contains(*url))
.cloned()
.collect::<Vec<_>>();
let all_available = &self
.bundle_finder
.bundle_availabilities(&self.ipfs_hash, endpoints)
.await;
let mut sorted_endpoints = all_available.to_vec();
// Sort by price_per_byte in ascending order and select the top 'provider_concurrency' endpoints
//TODO: add other types of selection such as latency and reliability
sorted_endpoints.sort_by(|a, b| {
a.price_per_byte
.partial_cmp(&b.price_per_byte)
.unwrap_or(std::cmp::Ordering::Equal)
});
self.update_indexer_urls(
&self
.bundle_finder
.bundle_availabilities(&self.ipfs_hash, endpoints)
.await,
&sorted_endpoints
.into_iter()
.take(self.provider_concurrency as usize)
.collect::<Vec<ServiceEndpoint>>(),
);
let indexer_endpoints = self.indexer_urls.lock().unwrap().clone();
if indexer_endpoints.is_empty() {
Expand Down Expand Up @@ -374,6 +408,28 @@ impl Downloader {
return Err(Error::DataUnavailable(msg));
}
}
} else {
// estimate the cost to download the bundle from each provider
let _num_files = self.bundle.file_manifests.len();
let total_bytes: f32 = self
.bundle
.file_manifests
.iter()
.map(|f| f.file_manifest.total_bytes)
.sum::<u64>() as f32;

//TODO: add concurrency limit for better calculations
for endpoint in indexer_endpoints {
let fail_tolerance = 1.5_f32;
let _escrow_requirement = endpoint.price_per_byte * total_bytes
/ (self.provider_concurrency as f32)
* fail_tolerance;

// check for escrow balance
}
// indexer_endpoints.iter().map(|e| {
// let total_ask = e.price_per_byte * total_bytes;
// })
};
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions file-exchange/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::fs;
use std::str::FromStr;
use std::sync::Arc;

use crate::config::WalletArgs;
use crate::config::OnChainArgs;
use crate::errors::Error;
use crate::transaction_manager::{escrow::Escrow, graph_token::L2GraphToken, staking::L2Staking};
use crate::util::build_wallet;
Expand All @@ -30,12 +30,12 @@ pub struct TransactionManager {
staking_contract: L2Staking<Arc<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>>,
escrow_contract: Escrow<Arc<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>>,
token_contract: L2GraphToken<Arc<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>>,
pub args: WalletArgs,
pub args: OnChainArgs,
}

impl TransactionManager {
// Constructor to create a new instance
pub async fn new(args: WalletArgs) -> Result<Self, anyhow::Error> {
pub async fn new(args: OnChainArgs) -> Result<Self, anyhow::Error> {
tracing::info!("Initialize transaction manager");
let provider = Provider::<Http>::try_from(&args.provider)?;
let chain_id = provider.get_chainid().await?;
Expand Down
5 changes: 3 additions & 2 deletions file-exchange/tests/allocate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod tests {
use chrono::Utc;
use ethers_core::types::U256;
use file_exchange::{
config::{AllocateArgs, OnchainAction, WalletArgs},
config::{AllocateArgs, OnChainArgs, OnchainAction},
transaction_manager::TransactionManager,
};

Expand All @@ -14,13 +14,14 @@ mod tests {
// 1. Basic setup; const
std::env::set_var("RUST_LOG", "off,file_exchange=debug,allocate=trace");
file_exchange::config::init_tracing("pretty").unwrap();
let wallet_args = WalletArgs {
let wallet_args = OnChainArgs {
mnemonic: String::from(
"sheriff obscure trick beauty army fat wink legal flee leader section suit",
),
provider: String::from("https://arbitrum-sepolia.infura.io/v3/aaaaaaaaaaaaaaaaaaaa"),
verifier: Some(String::from("0xfC24cE7a4428A6B89B52645243662A02BA734ECF")),
network_subgraph: String::from("https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-arbitrum-sepolia"),
escrow_subgraph: String::from("https://api.thegraph.com/subgraphs/name/graphprotocol/scalar-tap-arbitrum-sepolia"),
action: Some(file_exchange::config::OnchainAction::Allocate(
AllocateArgs {
tokens: U256::from(100),
Expand Down
2 changes: 1 addition & 1 deletion file-exchange/tests/file_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod tests {
"http://localhost:5677".to_string(),
]
.to_vec(),
verifier: String::from("0xfC24cE7a4428A6B89B52645243662A02BA734ECF"),
verifier: Some(String::from("0xfC24cE7a4428A6B89B52645243662A02BA734ECF")),
mnemonic: None,
free_query_auth_token: Some("Bearer free-token".to_string()),
provider: None,
Expand Down

0 comments on commit d9bc87c

Please sign in to comment.