Skip to content

Commit

Permalink
Merge branch 'mraszyk/pocket-ic-blocking-drop' into 'master'
Browse files Browse the repository at this point in the history
fix: block on dropping PocketIc instances

This MR makes the PocketIc server block when dropping PocketIc instances. While this might increase the latency of PocketIc instance deletions, it should increase the throughput of the deletions and thus decreases the amount of resources (e.g., open files) claimed by the PocketIc server. 

See merge request dfinity-lab/public/ic!19137
  • Loading branch information
mraszyk committed May 7, 2024
2 parents 7a1a23f + 45f4958 commit c126f31
Showing 1 changed file with 2 additions and 26 deletions.
28 changes: 2 additions & 26 deletions rs/pocket_ic_server/src/state_api/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ use crate::{OpId, Operation};
use base64;
use ic_http_endpoints_public::cors_layer;
use ic_types::{CanisterId, SubnetId};
use ic_utils_thread::JoinOnDrop;
use pocket_ic::common::rest::{HttpGatewayBackend, HttpGatewayConfig};
use pocket_ic::{ErrorCode, UserError, WasmResult};
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
thread::Builder as ThreadBuilder,
time::Duration,
};
use tokio::{
Expand Down Expand Up @@ -78,11 +76,6 @@ pub struct ApiState {
// threads making IC instances progress automatically
progress_threads: RwLock<Vec<Mutex<Option<ProgressThread>>>>,
sync_wait_time: Duration,
// dropping the PocketIC instance might be an expensive operation (the state machine is
// deallocated, e.g.). Thus, we immediately mark the instance as deleted while sending the
// PocketIC instance to a background worker and drop it there.
drop_sender: mpsc::UnboundedSender<PocketIc>,
_drop_worker_handle: JoinOnDrop<()>,
// PocketIC server port
port: Option<u16>,
// status of HTTP gateway (true = running, false = stopped)
Expand Down Expand Up @@ -142,24 +135,12 @@ impl PocketIcApiStateBuilder {
let progress_threads = RwLock::new((0..instances_len).map(|_| Mutex::new(None)).collect());

let sync_wait_time = self.sync_wait_time.unwrap_or(DEFAULT_SYNC_WAIT_DURATION);
#[allow(clippy::disallowed_methods)]
let (drop_sender, mut rx) = mpsc::unbounded_channel::<PocketIc>();
let drop_handle = ThreadBuilder::new()
.name("PocketIC GC Thread".into())
.spawn(move || {
while let Some(pocket_ic) = rx.blocking_recv() {
std::mem::drop(pocket_ic);
}
})
.unwrap();

Arc::new(ApiState {
instances: instances.into(),
graph: graph.into(),
progress_threads,
sync_wait_time,
drop_sender,
_drop_worker_handle: JoinOnDrop::new(drop_handle),
port: self.port,
http_gateways: Arc::new(RwLock::new(Vec::new())),
})
Expand Down Expand Up @@ -368,7 +349,7 @@ impl ApiState {
if let InstanceState::Available(pocket_ic) =
std::mem::replace(&mut *instance_state, InstanceState::Deleted)
{
self.drop_sender.send(pocket_ic).unwrap();
std::mem::drop(pocket_ic);
}
let progress_threads = self.progress_threads.read().await;
let mut progress_thread = progress_threads[instance_id].lock().await;
Expand Down Expand Up @@ -552,7 +533,6 @@ impl ApiState {
let mut progress_thread = progress_threads[instance_id].lock().await;
let instances = self.instances.clone();
let graph = self.graph.clone();
let drop_sender = self.drop_sender.clone();
let sync_wait_time = self.sync_wait_time;
if progress_thread.is_none() {
let (tx, mut rx) = mpsc::channel::<()>(1);
Expand All @@ -568,7 +548,6 @@ impl ApiState {
let retry_immediately = match Self::update_instances_with_timeout(
instances.clone(),
graph.clone(),
drop_sender.clone(),
cur_op.into(),
instance_id,
sync_wait_time,
Expand Down Expand Up @@ -669,7 +648,6 @@ impl ApiState {
Self::update_instances_with_timeout(
self.instances.clone(),
self.graph.clone(),
self.drop_sender.clone(),
op,
instance_id,
sync_wait_time,
Expand All @@ -682,7 +660,6 @@ impl ApiState {
async fn update_instances_with_timeout<O>(
instances: Arc<RwLock<Vec<Mutex<InstanceState>>>>,
graph: Arc<RwLock<HashMap<StateLabel, Computations>>>,
drop_sender: mpsc::UnboundedSender<PocketIc>,
op: Arc<O>,
instance_id: InstanceId,
sync_wait_time: Duration,
Expand Down Expand Up @@ -734,7 +711,6 @@ impl ApiState {
let bg_task = {
let old_state_label = state_label.clone();
let op_id = op_id.clone();
let drop_sender = drop_sender.clone();
let graph = graph.clone();
move || {
trace!(
Expand All @@ -755,7 +731,7 @@ impl ApiState {
drop(graph_guard);
let mut instance_state = instances[instance_id].blocking_lock();
if let InstanceState::Deleted = &*instance_state {
drop_sender.send(pocket_ic).unwrap();
std::mem::drop(pocket_ic);
} else {
*instance_state = InstanceState::Available(pocket_ic);
}
Expand Down

0 comments on commit c126f31

Please sign in to comment.