diff --git a/.moon/workspace.yml b/.moon/workspace.yml index a463d2809ea..080f2424688 100644 --- a/.moon/workspace.yml +++ b/.moon/workspace.yml @@ -10,6 +10,7 @@ projects: - './packages/*' - '!packages/cli' - '!packages/core-*' + - 'scenarios/*' - 'website' generator: diff --git a/.yarn/versions/c89cc34c.yml b/.yarn/versions/c89cc34c.yml new file mode 100644 index 00000000000..28fac241db0 --- /dev/null +++ b/.yarn/versions/c89cc34c.yml @@ -0,0 +1,9 @@ +releases: + "@moonrepo/cli": minor + "@moonrepo/core-linux-arm64-gnu": minor + "@moonrepo/core-linux-arm64-musl": minor + "@moonrepo/core-linux-x64-gnu": minor + "@moonrepo/core-linux-x64-musl": minor + "@moonrepo/core-macos-arm64": minor + "@moonrepo/core-macos-x64": minor + "@moonrepo/core-windows-x64-msvc": minor diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bc9fd2fe52..e7eeece741a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Unreleased + +#### 🚀 Updates + +- Reworked child process handling to properly handle signals and shutdown accordingly. Additionally, + when the pipeline receives a signal, we now display the status that shutdown the pipeline in the + summary. + ## 1.31.2 #### 🚀 Updates diff --git a/Cargo.lock b/Cargo.lock index 2b8c408f57b..fb59fda35e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -844,6 +844,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.39" @@ -993,7 +999,7 @@ version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68fa787550392a9d58f44c21a3022cfb3ea3e2458b7f85d3b399d0ceeccf409" dependencies = [ - "nix", + "nix 0.27.1", "winapi", ] @@ -3223,6 +3229,7 @@ dependencies = [ "moon_common", "moon_console", "moon_notifier", + "moon_process", "moon_project", "moon_remote", "moon_task", @@ -3358,6 +3365,7 @@ dependencies = [ "moon_node_tool", "moon_platform", "moon_plugin", + "moon_process", "moon_project", "moon_project_graph", "moon_python_lang", @@ -4010,11 +4018,13 @@ name = "moon_process" version = "0.0.1" dependencies = [ "cached", + "libc", "miette 7.4.0", "moon_args", "moon_common", "moon_console", "once_cell", + "process-wrap", "rustc-hash", "starbase_shell", "system_env", @@ -4720,6 +4730,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nodejs_package_json" version = "0.3.1" @@ -5278,6 +5300,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "process-wrap" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d35f4dc9988d1326b065b4def5e950c3ed727aa03e3151b86cc9e2aec6b03f54" +dependencies = [ + "futures", + "indexmap 2.7.0", + "nix 0.29.0", + "tokio", + "tracing", + "windows 0.59.0", +] + [[package]] name = "prost" version = "0.13.4" @@ -8061,6 +8097,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f919aee0a93304be7f62e8e5027811bbba96bcb1de84d6618be56e43f8a32a1" +dependencies = [ + "windows-core 0.59.0", + "windows-targets 0.53.0", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -8091,10 +8137,23 @@ dependencies = [ "windows-implement 0.58.0", "windows-interface 0.58.0", "windows-result 0.2.0", - "windows-strings", + "windows-strings 0.1.0", "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "810ce18ed2112484b0d4e15d022e5f598113e220c53e373fb31e67e21670c1ce" +dependencies = [ + "windows-implement 0.59.0", + "windows-interface 0.59.0", + "windows-result 0.3.0", + "windows-strings 0.3.0", + "windows-targets 0.53.0", +] + [[package]] name = "windows-implement" version = "0.57.0" @@ -8117,6 +8176,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-implement" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83577b051e2f49a058c308f17f273b570a6a758386fc291b5f6a934dd84e48c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-interface" version = "0.57.0" @@ -8139,6 +8209,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-interface" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb26fd936d991781ea39e87c3a27285081e3c0da5ca0fcbc02d368cc6f52ff01" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-registry" version = "0.2.0" @@ -8146,7 +8227,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ "windows-result 0.2.0", - "windows-strings", + "windows-strings 0.1.0", "windows-targets 0.52.6", ] @@ -8168,6 +8249,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-result" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d08106ce80268c4067c0571ca55a9b4e9516518eaa1a1fe9b37ca403ae1d1a34" +dependencies = [ + "windows-targets 0.53.0", +] + [[package]] name = "windows-strings" version = "0.1.0" @@ -8178,6 +8268,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-strings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b888f919960b42ea4e11c2f408fadb55f78a9f236d5eef084103c8ce52893491" +dependencies = [ + "windows-targets 0.53.0", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -8229,13 +8328,29 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1e4c7e8ceaaf9cb7d7507c974735728ab453b67ef8f18febdd7c11fe59dca8b" +dependencies = [ + "windows_aarch64_gnullvm 0.53.0", + "windows_aarch64_msvc 0.53.0", + "windows_i686_gnu 0.53.0", + "windows_i686_gnullvm 0.53.0", + "windows_i686_msvc 0.53.0", + "windows_x86_64_gnu 0.53.0", + "windows_x86_64_gnullvm 0.53.0", + "windows_x86_64_msvc 0.53.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -8248,6 +8363,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -8260,6 +8381,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -8272,12 +8399,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1dc67659d35f387f5f6c479dc4e28f1d4bb90ddd1a5d3da2e5d97b42d6272c3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -8290,6 +8429,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -8302,6 +8447,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -8314,6 +8465,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -8326,6 +8483,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" + [[package]] name = "winnow" version = "0.6.20" diff --git a/crates/action-pipeline/Cargo.toml b/crates/action-pipeline/Cargo.toml index afd6525b845..e08fd24ead3 100644 --- a/crates/action-pipeline/Cargo.toml +++ b/crates/action-pipeline/Cargo.toml @@ -16,6 +16,7 @@ moon_cache = { path = "../cache" } moon_common = { path = "../common" } moon_console = { path = "../console" } moon_notifier = { path = "../notifier" } +moon_process = { path = "../process" } moon_project = { path = "../project" } moon_remote = { path = "../remote" } moon_task = { path = "../task" } diff --git a/crates/action-pipeline/src/action_pipeline.rs b/crates/action-pipeline/src/action_pipeline.rs index a3922cbbceb..98281c59e53 100644 --- a/crates/action-pipeline/src/action_pipeline.rs +++ b/crates/action-pipeline/src/action_pipeline.rs @@ -8,12 +8,14 @@ use crate::subscribers::moonbase_subscriber::MoonbaseSubscriber; use crate::subscribers::remote_subscriber::RemoteSubscriber; use crate::subscribers::reports_subscriber::ReportsSubscriber; use crate::subscribers::webhooks_subscriber::WebhooksSubscriber; -use moon_action::{Action, ActionNode}; +use miette::IntoDiagnostic; +use moon_action::{Action, ActionNode, ActionPipelineStatus}; use moon_action_context::{ActionContext, TargetState}; use moon_action_graph::ActionGraph; use moon_api::Moonbase; use moon_app_context::AppContext; use moon_common::{color, is_ci, is_test_env}; +use moon_process::{ProcessRegistry, SignalType}; use moon_toolchain_plugin::ToolchainRegistry; use moon_workspace_graph::WorkspaceGraph; use rustc_hash::{FxHashMap, FxHashSet}; @@ -33,9 +35,9 @@ pub struct ActionPipeline { pub summarize: bool, // State - aborted: bool, actions: Vec, duration: Option, + status: ActionPipelineStatus, // Data app_context: Arc, @@ -54,15 +56,15 @@ impl ActionPipeline { debug!("Creating pipeline to run actions"); Self { - aborted: false, - actions: vec![], action_context: Arc::new(ActionContext::default()), + actions: vec![], app_context, bail: false, concurrency: num_cpus::get(), duration: None, emitter: Arc::new(EventEmitter::default()), report_name: "runReport.json".into(), + status: ActionPipelineStatus::Pending, summarize: false, toolchain_registry, workspace_graph, @@ -101,11 +103,11 @@ impl ActionPipeline { self.emitter .emit(Event::PipelineCompleted { actions: &actions, - aborted: self.aborted, context: &self.action_context, duration: self.duration, error: None, error_report: None, + status: &self.status, }) .await?; @@ -115,11 +117,11 @@ impl ActionPipeline { self.emitter .emit(Event::PipelineCompleted { actions: &actions, - aborted: self.aborted, context: &self.action_context, duration: self.duration, error: Some(error.to_string()), error_report: Some(&error), + status: &self.status, }) .await?; @@ -161,16 +163,18 @@ impl ActionPipeline { let signal_handle = self.monitor_signals(cancel_token.clone()); // Dispatch jobs from the graph to run actions - let queue_handle = self.dispatch_jobs(action_graph, job_context)?; + let queue_handle = self.dispatch_jobs(action_graph, job_context.clone())?; // Wait and receive all results coming through debug!("Waiting for jobs to return results"); + let process_registry = ProcessRegistry::instance(); let mut actions = vec![]; let mut error = None; while let Some(mut action) = receiver.recv().await { if self.bail && action.should_bail() || action.should_abort() { + process_registry.terminate_running(); abort_token.cancel(); error = Some(action.get_error()); } @@ -179,25 +183,35 @@ impl ActionPipeline { if abort_token.is_cancelled() { debug!("Aborting pipeline (because something failed)"); + self.status = ActionPipelineStatus::Aborted; break; } else if cancel_token.is_cancelled() { debug!("Cancelling pipeline (via signal)"); + self.status = ActionPipelineStatus::Interrupted; break; } else if actions.len() == total_actions { debug!("Finished pipeline, received all results"); + self.status = ActionPipelineStatus::Completed; break; } } drop(receiver); - // Wait for the queue to abort all running tasks - let _ = queue_handle.await; + // Capture and handle any signals + if cancel_token.is_cancelled() { + self.status = signal_handle.await.into_diagnostic()?; + } else { + signal_handle.abort(); + } + + // Wait for the queue to shutdown all running tasks + process_registry.wait_for_running_to_shutdown().await; - // Force abort the signal handler - signal_handle.abort(); + let mut _job_handles = queue_handle.await.into_diagnostic()?; + + // exhaust_job_handles(&mut job_handles, &job_context).await; - self.aborted = abort_token.is_cancelled(); self.actions = actions; self.duration = Some(start.elapsed()); @@ -213,7 +227,7 @@ impl ActionPipeline { &self, action_graph: ActionGraph, job_context: JobContext, - ) -> miette::Result> { + ) -> miette::Result>> { let node_indices = action_graph.sort_topological()?; let app_context = Arc::clone(&self.app_context); let action_context = Arc::clone(&self.action_context); @@ -233,11 +247,9 @@ impl ActionPipeline { // If the pipeline was aborted or cancelled (signal), // loop through and abort all currently running handles if job_context.is_aborted_or_cancelled() { - job_handles.shutdown().await; - // Return instead of break, so that we avoid // running persistent tasks below - return; + return job_handles; } // If none is returned, then we are waiting on other currently running @@ -290,18 +302,18 @@ impl ActionPipeline { if node.is_interactive() && exhaust_job_handles(&mut job_handles, &job_context).await { - return; + return job_handles; } } // Ensure all non-persistent actions have finished if exhaust_job_handles(&mut job_handles, &job_context).await { - return; + return job_handles; } // Then run all persistent actions in parallel if persistent_indices.is_empty() { - return; + return job_handles; } debug!( @@ -335,36 +347,44 @@ impl ActionPipeline { )); }); - // Since these tasks are persistent and never complete, - // we need to continually check if they've been aborted or - // cancelled, otherwise we will end up with zombie processes - loop { - sleep(Duration::from_millis(150)).await; + // // Since these tasks are persistent and never complete, + // // we need to continually check if they've been aborted or + // // cancelled, otherwise we will end up with zombie processes + // loop { + // sleep(Duration::from_millis(50)).await; - // No tasks running, so don't hang forever - if job_context.result_sender.is_closed() { - break; - } + // // No tasks running, so don't hang forever + // if job_context.result_sender.is_closed() { + // break; + // } - if job_context.is_aborted_or_cancelled() { - debug!("Shutting down {} persistent jobs", job_handles.len()); + // if job_context.is_aborted_or_cancelled() { + // debug!("Shutting down {} persistent jobs", job_handles.len()); - job_handles.shutdown().await; - break; - } - } + // exhaust_job_handles(&mut job_handles, &job_context).await; + // break; + // } + // } + + job_handles })) } - fn monitor_signals(&self, cancel_token: CancellationToken) -> JoinHandle<()> { + fn monitor_signals(&self, cancel_token: CancellationToken) -> JoinHandle { tokio::spawn(async move { - debug!("Listening for ctrl+c signal"); - - if tokio::signal::ctrl_c().await.is_ok() { - debug!("Received ctrl+c signal, shutting down!"); + let mut receiver = ProcessRegistry::instance().receive_signal(); + if let Ok(signal) = receiver.recv().await { cancel_token.cancel(); + + return match signal { + SignalType::Interrupt => ActionPipelineStatus::Interrupted, + SignalType::Quit => ActionPipelineStatus::Aborted, + SignalType::Terminate => ActionPipelineStatus::Terminated, + }; } + + ActionPipelineStatus::Interrupted }) } @@ -473,19 +493,10 @@ async fn dispatch_job_with_permit( #[instrument(skip_all)] async fn exhaust_job_handles(set: &mut JoinSet, job_context: &JobContext) -> bool { while set.join_next().await.is_some() { - // If the pipeline was aborted or cancelled (signal), - // loop through and abort all currently running handles - if job_context.is_aborted_or_cancelled() { - set.shutdown().await; - set.detach_all(); - - // Aborted - return true; - } + continue; } set.detach_all(); - // Not aborted - false + job_context.is_aborted_or_cancelled() } diff --git a/crates/action-pipeline/src/event_emitter.rs b/crates/action-pipeline/src/event_emitter.rs index f56b6bdc952..ba8247676a2 100644 --- a/crates/action-pipeline/src/event_emitter.rs +++ b/crates/action-pipeline/src/event_emitter.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use moon_action::{Action, ActionNode, RunTaskNode}; +use moon_action::{Action, ActionNode, ActionPipelineStatus, RunTaskNode}; use moon_action_context::ActionContext; use moon_project::Project; use moon_task::Target; @@ -46,12 +46,12 @@ pub enum Event<'data> { #[serde(rename_all = "camelCase")] PipelineCompleted { actions: &'data [Action], - aborted: bool, context: &'data ActionContext, duration: Option, error: Option, #[serde(skip)] error_report: Option<&'data miette::Report>, + status: &'data ActionPipelineStatus, }, // Syncing projects diff --git a/crates/action-pipeline/src/job.rs b/crates/action-pipeline/src/job.rs index 20536dd7e14..9deda7ac399 100644 --- a/crates/action-pipeline/src/job.rs +++ b/crates/action-pipeline/src/job.rs @@ -22,40 +22,56 @@ impl Job { let mut action = Action::new(self.node); action.node_index = self.node_index; - tokio::select! { - // Run conditions in order! - biased; - - // Abort if a sibling job has failed - _ = self.context.abort_token.cancelled() => { - trace!( - index = self.node_index, - "Job aborted", - ); - - action.finish(ActionStatus::Aborted); - } - - // Cancel if we receive a shutdown signal - _ = self.context.cancel_token.cancelled() => { - trace!( - index = self.node_index, - "Job cancelled (via signal)", - ); - - action.finish(ActionStatus::Skipped); - } - - // Or run the job to completion - _ = run_action( - &mut action, - self.action_context, - self.app_context, - self.context.workspace_graph.clone(), - self.context.toolchain_registry.clone(), - self.context.emitter.clone(), - ) => {}, - }; + // Don't use `select!` here because if the abort or cancel tokens + // are triggered, then the async task running the task child process + // is cancelled, immediately terminating the process, and ignoring + // any signals we attempt to pass down! + + run_action( + &mut action, + self.action_context, + self.app_context, + self.context.workspace_graph.clone(), + self.context.toolchain_registry.clone(), + self.context.emitter.clone(), + ) + .await + .unwrap(); + + // tokio::select! { + // // Run conditions in order! + // biased; + + // // Abort if a sibling job has failed + // _ = self.context.abort_token.cancelled() => { + // trace!( + // index = self.node_index, + // "Job aborted", + // ); + + // action.finish(ActionStatus::Aborted); + // } + + // // Cancel if we receive a shutdown signal + // _ = self.context.cancel_token.cancelled() => { + // trace!( + // index = self.node_index, + // "Job cancelled (via signal)", + // ); + + // action.finish(ActionStatus::Skipped); + // } + + // // Or run the job to completion + // _ = run_action( + // &mut action, + // self.action_context, + // self.app_context, + // self.context.workspace_graph.clone(), + // self.context.toolchain_registry.clone(), + // self.context.emitter.clone(), + // ) => {}, + // }; // Send the result back to the pipeline self.context.send_result(action).await; diff --git a/crates/action-pipeline/src/subscribers/console_subscriber.rs b/crates/action-pipeline/src/subscribers/console_subscriber.rs index 7696ff541ca..b7c68c5fa2f 100644 --- a/crates/action-pipeline/src/subscribers/console_subscriber.rs +++ b/crates/action-pipeline/src/subscribers/console_subscriber.rs @@ -24,25 +24,20 @@ impl Subscriber for ConsoleSubscriber { } Event::PipelineCompleted { actions, - aborted, duration, error_report, + status, .. } => { let item = PipelineReportItem { duration: *duration, summarize: self.summarize, + status: **status, }; - if *aborted { - self.console - .reporter - .on_pipeline_aborted(actions, &item, *error_report)?; - } else { - self.console - .reporter - .on_pipeline_completed(actions, &item, *error_report)?; - } + self.console + .reporter + .on_pipeline_completed(actions, &item, *error_report)?; } Event::ActionStarted { action, .. } => { self.console.reporter.on_action_started(action)?; diff --git a/crates/action-pipeline/src/subscribers/reports_subscriber.rs b/crates/action-pipeline/src/subscribers/reports_subscriber.rs index 07d3ddc0998..b77e8f9c02e 100644 --- a/crates/action-pipeline/src/subscribers/reports_subscriber.rs +++ b/crates/action-pipeline/src/subscribers/reports_subscriber.rs @@ -1,7 +1,7 @@ use crate::event_emitter::{Event, Subscriber}; use crate::reports::estimate::Estimate; use async_trait::async_trait; -use moon_action::Action; +use moon_action::{Action, ActionPipelineStatus}; use moon_action_context::ActionContext; use moon_cache::CacheEngine; use serde::Serialize; @@ -22,6 +22,8 @@ pub struct RunReport<'data> { /// Estimates around how much time was saved using moon, /// compared to another product or baseline. pub comparison_estimate: Estimate, + + pub status: &'data ActionPipelineStatus, } pub struct ReportsSubscriber { @@ -50,6 +52,7 @@ impl Subscriber for ReportsSubscriber { if let Event::PipelineCompleted { actions, duration: Some(duration), + status, .. } = event { @@ -62,6 +65,7 @@ impl Subscriber for ReportsSubscriber { context: &self.action_context, duration, comparison_estimate: estimate, + status, }; self.cache_engine.write(&self.report_name, &report)?; diff --git a/crates/action/src/action.rs b/crates/action/src/action.rs index 147dacc74c1..dd8a481b368 100644 --- a/crates/action/src/action.rs +++ b/crates/action/src/action.rs @@ -6,6 +6,17 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::{Duration, Instant}; +#[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum ActionPipelineStatus { + Aborted, + Completed, + Interrupted, + Terminated, + #[default] + Pending, +} + #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Serialize)] #[serde(rename_all = "kebab-case")] pub enum ActionStatus { diff --git a/crates/actions/src/operations/run_plugin_operation.rs b/crates/actions/src/operations/run_plugin_operation.rs index 5a8a6af9dac..3113049a70e 100644 --- a/crates/actions/src/operations/run_plugin_operation.rs +++ b/crates/actions/src/operations/run_plugin_operation.rs @@ -8,16 +8,14 @@ pub async fn run_plugin_operation(operation: PluginOperation) -> miette::Result< Operation::process_execution(&process.command) .track_async_with_check( || async { - let mut builder = Command::new(process.command); - builder.args(process.args); - builder.envs(process.env); + let mut command = Command::new(process.command); + command.args(process.args); + command.envs(process.env); if let Some(cwd) = process.working_dir.and_then(|cwd| cwd.real_path()) { - builder.cwd(cwd); + command.cwd(cwd); } - let mut command = builder.create_async(); - if process.stream { command.exec_stream_output().await } else { diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 9d72c6bd58a..49baf5107a6 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -23,6 +23,7 @@ moon_docker = { path = "../docker" } moon_env = { path = "../env" } moon_extension_plugin = { path = "../extension-plugin" } moon_plugin = { path = "../plugin" } +moon_process = { path = "../process" } moon_project = { path = "../project" } moon_project_graph = { path = "../project-graph" } moon_query = { path = "../query" } diff --git a/crates/app/src/commands/node/run_script.rs b/crates/app/src/commands/node/run_script.rs index 527c56516fc..49028cf04b0 100644 --- a/crates/app/src/commands/node/run_script.rs +++ b/crates/app/src/commands/node/run_script.rs @@ -55,7 +55,7 @@ pub async fn run_script(session: CliSession, args: RunScriptArgs) -> AppResult { )); } - command.create_async().exec_stream_output().await?; + command.exec_stream_output().await?; Ok(None) } diff --git a/crates/app/src/session.rs b/crates/app/src/session.rs index bf9421b60c2..96b1f57a485 100644 --- a/crates/app/src/session.rs +++ b/crates/app/src/session.rs @@ -13,6 +13,7 @@ use moon_console_reporter::DefaultReporter; use moon_env::MoonEnvironment; use moon_extension_plugin::*; use moon_plugin::{PluginHostData, PluginId}; +use moon_process::ProcessRegistry; use moon_project_graph::ProjectGraph; use moon_task_graph::TaskGraph; use moon_toolchain_plugin::*; @@ -337,6 +338,11 @@ impl AppSession for CliSession { } async fn shutdown(&mut self) -> AppResult { + // Ensure all child processes have finished running + ProcessRegistry::instance() + .wait_for_running_to_shutdown() + .await; + self.console.close()?; Ok(None) diff --git a/crates/codegen/src/codegen.rs b/crates/codegen/src/codegen.rs index 72d6e127ead..acda4fd122d 100644 --- a/crates/codegen/src/codegen.rs +++ b/crates/codegen/src/codegen.rs @@ -274,7 +274,6 @@ async fn clone_and_checkout_git_repository( .args(args) .cwd(cwd) .without_shell() - .create_async() .exec_capture_output() .await?; diff --git a/crates/console-reporter/src/default_reporter.rs b/crates/console-reporter/src/default_reporter.rs index e8d49e38f06..aea3ef7bc8e 100644 --- a/crates/console-reporter/src/default_reporter.rs +++ b/crates/console-reporter/src/default_reporter.rs @@ -1,4 +1,6 @@ -use moon_action::{Action, ActionNode, ActionStatus, Operation, OperationList}; +use moon_action::{ + Action, ActionNode, ActionPipelineStatus, ActionStatus, Operation, OperationList, +}; use moon_common::color::paint; use moon_common::{color, is_test_env}; use moon_config::TaskOutputStyle; @@ -251,6 +253,15 @@ impl DefaultReporter { if passed_count == cached_count && failed_count == 0 { elapsed_time = format!("{} {}", elapsed_time, label_to_the_moon()); + } else if matches!( + item.status, + ActionPipelineStatus::Interrupted | ActionPipelineStatus::Terminated + ) { + elapsed_time = format!( + "{} {}", + elapsed_time, + color::muted_light(format!("({:?})", item.status).to_lowercase()) + ); } if item.summarize { @@ -321,6 +332,12 @@ impl Reporter for DefaultReporter { return Ok(()); } + // A task failed, so instead of showing the stats, + // we'll render the error that was bubbled up + if matches!(item.status, ActionPipelineStatus::Aborted) { + return Ok(()); + } + // If no summary, only show stats. This is typically for local! if !item.summarize { self.out.write_newline()?; diff --git a/crates/console/src/reporter.rs b/crates/console/src/reporter.rs index 627a240cd63..af4d2d08b72 100644 --- a/crates/console/src/reporter.rs +++ b/crates/console/src/reporter.rs @@ -1,7 +1,7 @@ use crate::buffer::ConsoleBuffer; use crate::console::ConsoleTheme; use miette::Error as Report; -use moon_action::{Action, ActionNode, Operation, OperationList}; +use moon_action::{Action, ActionNode, ActionPipelineStatus, Operation, OperationList}; use moon_config::TaskOutputStyle; use moon_target::Target; use std::sync::Arc; @@ -11,6 +11,7 @@ use std::time::Duration; pub struct PipelineReportItem { pub duration: Option, pub summarize: bool, + pub status: ActionPipelineStatus, } #[derive(Clone, Debug, Default)] @@ -41,15 +42,6 @@ pub trait Reporter: Send + Sync { Ok(()) } - fn on_pipeline_aborted( - &self, - _actions: &[Action], - _item: &PipelineReportItem, - _error: Option<&Report>, - ) -> miette::Result<()> { - Ok(()) - } - fn on_action_started(&self, _action: &Action) -> miette::Result<()> { Ok(()) } diff --git a/crates/process/Cargo.toml b/crates/process/Cargo.toml index f3b7cc3f28b..bccd2f972fb 100644 --- a/crates/process/Cargo.toml +++ b/crates/process/Cargo.toml @@ -15,12 +15,22 @@ moon_console = { path = "../console" } cached = { workspace = true } miette = { workspace = true } once_cell = { workspace = true } +process-wrap = { version = "8.2.0", default-features = false, features = [ + "job-object", + "kill-on-drop", + "process-group", + "tokio1", + "tracing", +] } rustc-hash = { workspace = true } starbase_shell = { workspace = true } system_env = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } -tokio = { workspace = true, features = ["io-util"] } +tokio = { workspace = true, features = ["io-util", "signal", "sync"] } + +[target.'cfg(unix)'.dependencies] +libc = "0.2.169" [lints] workspace = true diff --git a/crates/process/src/async_command.rs b/crates/process/src/async_command.rs deleted file mode 100644 index 8fb3c27f812..00000000000 --- a/crates/process/src/async_command.rs +++ /dev/null @@ -1,257 +0,0 @@ -use crate::command_inspector::CommandInspector; -use crate::output_to_error; -use crate::process_error::ProcessError; -use moon_console::Console; -use std::process::{Output, Stdio}; -use std::sync::{Arc, RwLock}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{Child, Command}; -use tokio::task; - -pub struct AsyncCommand<'cmd> { - pub console: Option>, - pub inner: Command, - pub inspector: CommandInspector<'cmd>, - - pub current_id: Option, -} - -impl AsyncCommand<'_> { - pub async fn exec_capture_output(&mut self) -> miette::Result { - self.inspector.log_command(); - - let command = &mut self.inner; - let output: Output; - - if self.inspector.should_pass_stdin() { - let mut child = command - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|error| ProcessError::Capture { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - - self.write_input_to_child(&mut child).await?; - - self.current_id = child.id(); - - output = child - .wait_with_output() - .await - .map_err(|error| ProcessError::Capture { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - } else { - output = command - .output() - .await - .map_err(|error| ProcessError::Capture { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - } - - self.handle_nonzero_status(&output, true)?; - - Ok(output) - } - - pub async fn exec_stream_output(&mut self) -> miette::Result { - self.inspector.log_command(); - - let command = &mut self.inner; - let mut child: Child; - - if self.inspector.should_pass_stdin() { - child = - command - .stdin(Stdio::piped()) - .spawn() - .map_err(|error| ProcessError::Stream { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - - self.write_input_to_child(&mut child).await?; - } else { - child = command.spawn().map_err(|error| ProcessError::Stream { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - }; - - self.current_id = child.id(); - - let status = child.wait().await.map_err(|error| ProcessError::Stream { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - - let output = Output { - status, - stderr: vec![], - stdout: vec![], - }; - - self.handle_nonzero_status(&output, false)?; - - Ok(output) - } - - pub async fn exec_stream_and_capture_output(&mut self) -> miette::Result { - self.inspector.log_command(); - - let command = &mut self.inner; - - let mut child = command - .stdin(if self.inspector.should_pass_stdin() { - Stdio::piped() - } else { - Stdio::inherit() - }) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .map_err(|error| ProcessError::StreamCapture { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - - self.current_id = child.id(); - - if self.inspector.should_pass_stdin() { - self.write_input_to_child(&mut child).await?; - } - - // We need to log the child process output to the parent terminal - // AND capture stdout/stderr so that we can cache it for future runs. - // This doesn't seem to be supported natively by `Stdio`, so I have - // this *real ugly* implementation to solve it. There's gotta be a - // better way to do this? - // https://stackoverflow.com/a/49063262 - let stderr = BufReader::new(child.stderr.take().unwrap()); - let stdout = BufReader::new(child.stdout.take().unwrap()); - let mut handles = vec![]; - - let captured_stderr = Arc::new(RwLock::new(vec![])); - let captured_stdout = Arc::new(RwLock::new(vec![])); - let captured_stderr_clone = Arc::clone(&captured_stderr); - let captured_stdout_clone = Arc::clone(&captured_stdout); - - let prefix = Arc::new(self.inspector.get_prefix()); - let stderr_prefix = Arc::clone(&prefix); - let stdout_prefix = Arc::clone(&prefix); - - let console = self - .console - .as_ref() - .expect("A console is required when streaming output!"); - let stderr_stream = Arc::new(console.stderr().to_owned()); - let stdout_stream = Arc::new(console.stdout().to_owned()); - - handles.push(task::spawn(async move { - let mut lines = stderr.lines(); - let mut captured_lines = vec![]; - - while let Ok(Some(line)) = lines.next_line().await { - let _ = if let Some(prefix) = &*stderr_prefix { - stderr_stream.write_line_with_prefix(&line, prefix) - } else { - stderr_stream.write_line(&line) - }; - - captured_lines.push(line); - } - - captured_stderr_clone - .write() - .unwrap() - .extend(captured_lines); - })); - - handles.push(task::spawn(async move { - let mut lines = stdout.lines(); - let mut captured_lines = vec![]; - - while let Ok(Some(line)) = lines.next_line().await { - let _ = if let Some(prefix) = &*stdout_prefix { - stdout_stream.write_line_with_prefix(&line, prefix) - } else { - stdout_stream.write_line(&line) - }; - - captured_lines.push(line); - } - - captured_stdout_clone - .write() - .unwrap() - .extend(captured_lines); - })); - - for handle in handles { - let _ = handle.await; - } - - // Attempt to create the child output - let status = child - .wait() - .await - .map_err(|error| ProcessError::StreamCapture { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - - let output = Output { - status, - stdout: captured_stdout.read().unwrap().join("\n").into_bytes(), - stderr: captured_stderr.read().unwrap().join("\n").into_bytes(), - }; - - self.handle_nonzero_status(&output, true)?; - - Ok(output) - } - - fn get_bin_name(&self) -> String { - self.inner - .as_std() - .get_program() - .to_string_lossy() - .to_string() - } - - fn handle_nonzero_status(&mut self, output: &Output, with_message: bool) -> miette::Result<()> { - self.current_id = None; - - if self.inspector.should_error_nonzero() && !output.status.success() { - return Err(output_to_error(self.get_bin_name(), output, with_message).into()); - } - - Ok(()) - } - - async fn write_input_to_child(&self, child: &mut Child) -> miette::Result<()> { - let input = &self.inspector.get_command_line().input; - - let mut stdin = child.stdin.take().unwrap_or_else(|| { - panic!("Unable to write stdin: {}", input.to_string_lossy()); - }); - - stdin - .write_all(input.as_encoded_bytes()) - .await - .map_err(|error| ProcessError::WriteInput { - bin: self.get_bin_name(), - error: Box::new(error), - })?; - - drop(stdin); - - Ok(()) - } -} diff --git a/crates/process/src/command.rs b/crates/process/src/command.rs index 822bcdbe4c5..ddb300181f3 100644 --- a/crates/process/src/command.rs +++ b/crates/process/src/command.rs @@ -1,22 +1,24 @@ -use crate::{async_command::AsyncCommand, command_inspector::CommandInspector, shell::Shell}; -use moon_common::color; +// This implementation is loosely based on Cargo's: +// https://github.com/rust-lang/cargo/blob/master/crates/cargo-util/src/process_builder.rs + +use crate::shell::Shell; +use moon_common::{color, is_test_env}; use moon_console::Console; -use rustc_hash::FxHashMap; +use rustc_hash::{FxHashMap, FxHasher}; +use std::hash::Hasher; use std::{ ffi::{OsStr, OsString}, - path::{Path, PathBuf}, sync::Arc, }; -use tokio::process::Command as TokioCommand; pub struct Command { pub args: Vec, pub bin: OsString, - pub cwd: Option, + pub cwd: Option, - pub env: FxHashMap, + pub env: FxHashMap>, /// Convert non-zero exits to errors pub error_on_nonzero: bool, @@ -57,12 +59,12 @@ impl Command { } } - pub fn arg>(&mut self, arg: A) -> &mut Command { + pub fn arg>(&mut self, arg: A) -> &mut Self { self.args.push(arg.as_ref().to_os_string()); self } - pub fn arg_if_missing>(&mut self, arg: A) -> &mut Command { + pub fn arg_if_missing>(&mut self, arg: A) -> &mut Self { let arg = arg.as_ref(); let present = self.args.iter().any(|a| a == arg); @@ -73,7 +75,7 @@ impl Command { self } - pub fn args(&mut self, args: I) -> &mut Command + pub fn args(&mut self, args: I) -> &mut Self where I: IntoIterator, S: AsRef, @@ -85,43 +87,24 @@ impl Command { self } - pub fn cwd>(&mut self, dir: P) -> &mut Command { - self.cwd = Some(dir.as_ref().to_path_buf()); + pub fn cwd>(&mut self, dir: P) -> &mut Self { + self.cwd = Some(dir.as_ref().to_os_string()); self } - pub fn create_async(&self) -> AsyncCommand { - let inspector = self.inspect(); - let command_line = inspector.get_command_line(); - - let mut command = TokioCommand::new(&command_line.command[0]); - command.args(&command_line.command[1..]); - command.envs(&self.env); - command.kill_on_drop(true); - - if let Some(cwd) = &self.cwd { - command.current_dir(cwd); - } - - AsyncCommand { - console: self.console.clone(), - inner: command, - inspector, - current_id: None, - } - } - - pub fn env(&mut self, key: K, val: V) -> &mut Command + pub fn env(&mut self, key: K, val: V) -> &mut Self where K: AsRef, V: AsRef, { - self.env - .insert(key.as_ref().to_os_string(), val.as_ref().to_os_string()); + self.env.insert( + key.as_ref().to_os_string(), + Some(val.as_ref().to_os_string()), + ); self } - pub fn env_if_missing(&mut self, key: K, val: V) -> &mut Command + pub fn env_if_missing(&mut self, key: K, val: V) -> &mut Self where K: AsRef, V: AsRef, @@ -133,7 +116,15 @@ impl Command { self } - pub fn envs(&mut self, vars: I) -> &mut Command + pub fn env_remove(&mut self, key: K) -> &mut Self + where + K: AsRef, + { + self.env.insert(key.as_ref().to_os_string(), None); + self + } + + pub fn envs(&mut self, vars: I) -> &mut Self where I: IntoIterator, K: AsRef, @@ -146,11 +137,14 @@ impl Command { self } - pub fn inherit_colors(&mut self) -> &mut Command { + pub fn inherit_colors(&mut self) -> &mut Self { let level = color::supports_color().to_string(); - self.env("FORCE_COLOR", &level); - self.env("CLICOLOR_FORCE", &level); + if !is_test_env() { + self.env_remove("NO_COLOR"); + self.env("FORCE_COLOR", &level); + self.env("CLICOLOR_FORCE", &level); + } // Force a terminal width so that we have consistent sizing // in our cached output, and its the same across all machines @@ -161,7 +155,7 @@ impl Command { self } - pub fn input(&mut self, input: I) -> &mut Command + pub fn input(&mut self, input: I) -> &mut Self where I: IntoIterator, V: AsRef, @@ -173,36 +167,86 @@ impl Command { self } - pub fn inspect(&self) -> CommandInspector { - CommandInspector::new(self) + pub fn get_bin_name(&self) -> String { + self.bin.to_string_lossy().to_string() + } + + pub fn get_cache_key(&self) -> String { + let mut hasher = FxHasher::default(); + + let mut write = |value: &OsString| { + hasher.write(value.as_os_str().as_encoded_bytes()); + }; + + for (key, value) in &self.env { + if let Some(value) = value { + write(key); + write(value); + } + } + + write(&self.bin); + + for arg in &self.args { + write(arg); + } + + if let Some(cwd) = &self.cwd { + write(cwd); + } + + for arg in &self.input { + write(arg); + } + + format!("{}", hasher.finish()) + } + + pub fn get_prefix(&self) -> Option<&str> { + self.prefix.as_deref() } - pub fn set_print_command(&mut self, state: bool) -> &mut Command { + pub fn set_print_command(&mut self, state: bool) -> &mut Self { self.print_command = state; self } - pub fn set_error_on_nonzero(&mut self, state: bool) -> &mut Command { + pub fn set_error_on_nonzero(&mut self, state: bool) -> &mut Self { self.error_on_nonzero = state; self } - pub fn set_prefix(&mut self, prefix: &str) -> &mut Command { + pub fn set_prefix(&mut self, prefix: &str) -> &mut Self { self.prefix = Some(prefix.to_owned()); self } - pub fn with_console(&mut self, console: Arc) -> &mut Command { + pub fn should_error_nonzero(&self) -> bool { + self.error_on_nonzero + } + + pub fn should_pass_stdin(&self) -> bool { + !self.input.is_empty() || self.should_pass_args_stdin() + } + + pub fn should_pass_args_stdin(&self) -> bool { + self.shell + .as_ref() + .map(|shell| shell.command.pass_args_stdin) + .unwrap_or(false) + } + + pub fn with_console(&mut self, console: Arc) -> &mut Self { self.console = Some(console); self } - pub fn with_shell(&mut self, shell: Shell) -> &mut Command { + pub fn with_shell(&mut self, shell: Shell) -> &mut Self { self.shell = Some(shell); self } - pub fn without_shell(&mut self) -> &mut Command { + pub fn without_shell(&mut self) -> &mut Self { self.shell = None; self } diff --git a/crates/process/src/command_inspector.rs b/crates/process/src/command_inspector.rs deleted file mode 100644 index aac9147bb66..00000000000 --- a/crates/process/src/command_inspector.rs +++ /dev/null @@ -1,273 +0,0 @@ -use crate::command::Command; -use moon_args::join_args_os; -use moon_common::color; -use once_cell::sync::OnceCell; -use rustc_hash::FxHashMap; -use std::borrow::Cow; -use std::env; -use std::ffi::{OsStr, OsString}; -use std::fmt::{self, Display}; -use std::path::{Path, PathBuf, MAIN_SEPARATOR}; -use tracing::{debug, enabled}; - -type LineValue<'l> = Cow<'l, OsStr>; - -#[derive(Debug)] -pub struct CommandLine<'l> { - pub command: Vec>, - pub input: LineValue<'l>, - pub main_command: LineValue<'l>, -} - -impl CommandLine<'_> { - pub fn new(command: &Command) -> CommandLine { - let mut command_line: Vec = vec![]; - let mut input_line: Vec = vec![]; - let mut main_line: Vec = vec![]; - // let mut join_input = false; - - let push_to_line = |line: &mut Vec| { - line.push(Cow::Owned(command.bin.to_owned())); - - for arg in &command.args { - line.push(Cow::Owned(arg.to_owned())); - } - }; - - // Extract the main command, without shell, for other purposes! - push_to_line(&mut main_line); - - // If wrapped in a shell, the shell binary and arguments - // must be placed at the start of the line. - if let Some(shell) = &command.shell { - command_line.push(Cow::Borrowed(shell.bin.as_os_str())); - command_line.extend( - shell - .command - .shell_args - .iter() - .map(|arg| Cow::Borrowed(arg.as_os_str())), - ); - - // If the main command should be passed via stdin, - // then append the input line instead of the command line. - if shell.command.pass_args_stdin { - // join_input = true; - push_to_line(&mut input_line); - - // Otherwise append as a *single* argument. This typically - // appears after a "-" argument (should come from shell). - } else { - let mut sub_line: Vec = vec![]; - push_to_line(&mut sub_line); - - command_line.push(Cow::Owned(if command.escape_args { - join_args_os(sub_line) - } else { - sub_line.join(OsStr::new(" ")) - })); - } - - // Otherwise we have a normal command and arguments. - } else { - push_to_line(&mut command_line); - - // That also may have input. - if !command.input.is_empty() { - for input in &command.input { - input_line.push(Cow::Borrowed(input)); - } - } - } - - CommandLine { - command: command_line, - // input: if join_input { - // join_args(input_line) - // } else { - // input_line.join("") - // }, - input: Cow::Owned(input_line.join(OsStr::new(" "))), - main_command: Cow::Owned(join_args_os(main_line)), - } - } -} - -impl Display for CommandLine<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let command = join_args_os(&self.command); - let command = command.to_string_lossy(); - - write!(f, "{}", command)?; - - if !self.input.is_empty() { - let debug_input = env::var("MOON_DEBUG_PROCESS_INPUT").is_ok(); - let input = &self.input; - - if !command.ends_with('-') { - write!(f, " -")?; - } - - write!( - f, - " {}", - if input.len() > 200 && !debug_input { - "(truncated)".into() - } else { - input.to_string_lossy().trim().replace('\n', " ") - } - )?; - } - - Ok(()) - } -} - -pub struct CommandInspector<'cmd> { - command: &'cmd Command, - line_cache: OnceCell>, -} - -impl<'cmd> CommandInspector<'cmd> { - pub fn new(command: &'cmd Command) -> Self { - Self { - command, - line_cache: OnceCell::new(), - } - } - - pub fn get_cache_key(&self) -> String { - let line = self.get_command_line(); - - format!( - "{}::{}::{}", - line.command.join(OsStr::new(" ")).to_string_lossy(), - line.input.to_string_lossy(), - self.command - .cwd - .as_ref() - .and_then(|cwd| cwd.as_os_str().to_str()) - .unwrap_or_default() - ) - } - - pub fn get_command_line(&self) -> &CommandLine { - self.line_cache - .get_or_init(|| CommandLine::new(self.command)) - } - - pub fn get_prefix(&self) -> Option { - self.command.prefix.clone() - } - - pub fn get_workspace_root(&self) -> PathBuf { - let env_key = OsString::from("MOON_WORKSPACE_ROOT"); - - env::var_os(&env_key) - .or_else(|| self.command.env.get(&env_key).cloned()) - .map(PathBuf::from) - .unwrap_or_else(|| env::current_dir().unwrap_or(PathBuf::from("."))) - } - - pub fn should_error_nonzero(&self) -> bool { - self.command.error_on_nonzero - } - - pub fn should_pass_stdin(&self) -> bool { - !self.command.input.is_empty() || self.should_pass_args_stdin() - } - - pub fn should_pass_args_stdin(&self) -> bool { - self.command - .shell - .as_ref() - .map(|s| s.command.pass_args_stdin) - .unwrap_or(false) - } - - pub fn format_command( - &self, - line: &str, - workspace_root: &Path, - working_dir: Option<&Path>, - ) -> String { - let working_dir = working_dir.unwrap_or(workspace_root); - - let target_dir = if working_dir == workspace_root { - "workspace".into() - } else if let Ok(dir) = working_dir.strip_prefix(workspace_root) { - format!(".{}{}", MAIN_SEPARATOR, dir.to_string_lossy()) - } else { - debug!( - working_dir = ?working_dir, - workspace_root = ?workspace_root, - "Unable to determine the directory a task is running in...", - ); - - ".".into() - }; - - format!( - "{} {}", - color::muted_light(line.trim()), - color::muted(format!("(in {target_dir})")) - ) - } - - pub fn log_command(&self) { - let mut workspace_root = None; - - if self.command.print_command { - if let Some(cmd_line) = self.get_command_line().main_command.to_str() { - if let Some(console) = self.command.console.as_ref() { - if !console.out.is_quiet() { - workspace_root = Some(self.get_workspace_root()); - - let cmd_line = self.format_command( - cmd_line, - workspace_root.as_deref().unwrap(), - self.command.cwd.as_deref(), - ); - - let _ = console.out.write_line(cmd_line); - } - } - } - } - - // Avoid all this overhead if we're not logging - if !enabled!(tracing::Level::DEBUG) { - return; - } - - let debug_env = env::var("MOON_DEBUG_PROCESS_ENV").is_ok(); - - let env_vars_field = self - .command - .env - .iter() - .filter(|(key, _)| { - if debug_env { - true - } else { - key.to_str() - .map(|k| k.starts_with("MOON_")) - .unwrap_or_default() - } - }) - .collect::>(); - - if workspace_root.is_none() { - workspace_root = Some(self.get_workspace_root()); - } - - let working_dir_field = self.command.cwd.as_deref().or(workspace_root.as_deref()); - - debug!( - env_vars = ?env_vars_field, - working_dir = ?working_dir_field, - "Running command {}", - color::shell(self.get_command_line().to_string()) - ); - } -} diff --git a/crates/process/src/command_line.rs b/crates/process/src/command_line.rs new file mode 100644 index 00000000000..5cb8e4e17a4 --- /dev/null +++ b/crates/process/src/command_line.rs @@ -0,0 +1,123 @@ +use crate::command::Command; +use moon_args::join_args_os; +use moon_common::color; +use std::env; +use std::ffi::{OsStr, OsString}; +use std::fmt::{self, Display}; +use std::path::Path; + +#[derive(Debug)] +pub struct CommandLine { + pub command: Vec, + pub input: Vec, + pub shell: bool, +} + +impl CommandLine { + pub fn new(command: &Command) -> CommandLine { + let mut command_line: Vec = vec![]; + let mut input_line: Vec = vec![]; + let mut in_shell = false; + + // Extract the main command, without shell, for other purposes! + let mut main_line: Vec = vec![]; + main_line.push(command.bin.clone()); + + for arg in &command.args { + main_line.push(arg.to_owned()); + } + + // If wrapped in a shell, the shell binary and arguments + // must be placed at the start of the line. + if let Some(shell) = &command.shell { + in_shell = true; + command_line.push(shell.bin.as_os_str().to_owned()); + command_line.extend(shell.command.shell_args.clone()); + + // If the main command should be passed via stdin, + // then append the input line instead of the command line. + if shell.command.pass_args_stdin { + input_line.extend(main_line); + } + // Otherwise append as a *single* argument. This typically + // appears after a "-" argument (should come from shell). + else { + command_line.push(if command.escape_args { + join_args_os(main_line) + } else { + main_line.join(OsStr::new(" ")) + }); + } + + // Otherwise we have a normal command and arguments. + } else { + command_line.extend(main_line); + + // That also may have input. + if !command.input.is_empty() { + for input in &command.input { + input_line.push(input.to_owned()); + } + } + } + + CommandLine { + command: command_line, + input: input_line, + shell: in_shell, + } + } + + pub fn get_line(&self, with_shell: bool, with_input: bool) -> String { + let mut command = if !with_shell && self.shell { + self.command.last().cloned().unwrap_or_else(OsString::new) + } else { + join_args_os(&self.command) + }; + + if with_input && !self.input.is_empty() { + let debug_input = env::var("MOON_DEBUG_PROCESS_INPUT").is_ok(); + let input = join_args_os(&self.input); + + if command + .as_os_str() + .to_str() + .is_some_and(|cmd| cmd.ends_with('-')) + { + command.push(" "); + } else { + command.push(" - "); + } + + if input.len() > 200 && !debug_input { + command.push("(truncated)"); + } else { + command.push(&input); + } + } + + command.to_string_lossy().trim().replace('\n', " ") + } + + pub fn format(command: &str, workspace_root: &Path, working_dir: &Path) -> String { + let dir = if working_dir == workspace_root { + "workspace".into() + } else if let Ok(dir) = working_dir.strip_prefix(workspace_root) { + format!(".{}{}", std::path::MAIN_SEPARATOR, dir.to_string_lossy()) + } else { + ".".into() + }; + + format!( + "{} {}", + color::muted_light(command.trim()), + color::muted(format!("(in {dir})")) + ) + } +} + +impl Display for CommandLine { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.get_line(true, true)) + } +} diff --git a/crates/process/src/exec_command.rs b/crates/process/src/exec_command.rs new file mode 100644 index 00000000000..0b78889fdac --- /dev/null +++ b/crates/process/src/exec_command.rs @@ -0,0 +1,485 @@ +use crate::command::Command; +use crate::command_line::CommandLine; +// use crate::output_stream::capture_stream; +use crate::output_to_error; +use crate::process_error::ProcessError; +use crate::process_registry::ProcessRegistry; +use crate::shared_child::SharedChild; +use moon_common::color; +use process_wrap::tokio::*; +use rustc_hash::FxHashMap; +use std::env; +use std::ffi::{OsStr, OsString}; +use std::path::PathBuf; +use std::process::{Output, Stdio}; +use std::sync::{Arc, RwLock}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::Command as TokioCommand; +use tokio::task; +use tracing::{debug, enabled}; + +fn wrap_command(command: TokioCommand) -> TokioCommandWrap { + let mut command = TokioCommandWrap::from(command); + command.wrap(KillOnDrop); + + // #[cfg(unix)] + // { + // command.wrap(ProcessGroup::leader()); + // } + + // #[cfg(windows)] + // { + // command.wrap(JobObject); + // } + + command +} + +impl Command { + pub async fn exec_capture_output(&mut self) -> miette::Result { + let registry = ProcessRegistry::instance(); + let (mut command, line) = self.create_async_command(); + + let child = if self.should_pass_stdin() { + command + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + let mut child = + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::Capture { + bin: self.get_bin_name(), + error: Box::new(error), + })?; + + self.write_input_to_child(&mut child, &line).await?; + + child + } else { + command.stdout(Stdio::piped()).stderr(Stdio::piped()); + + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::Capture { + bin: self.get_bin_name(), + error: Box::new(error), + })? + }; + + let shared_child = registry.add_running(child).await; + + self.log_command(&line, &shared_child); + + let result = shared_child + .wait_with_output() + .await + .map_err(|error| ProcessError::Capture { + bin: self.get_bin_name(), + error: Box::new(error), + }); + + registry.remove_running(shared_child).await; + + let output = result?; + + self.handle_nonzero_status(&output, true)?; + + Ok(output) + } + + pub async fn exec_stream_output(&mut self) -> miette::Result { + let registry = ProcessRegistry::instance(); + let (mut command, line) = self.create_async_command(); + + let child = if self.should_pass_stdin() { + command.stdin(Stdio::piped()); + + let mut child = + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::Stream { + bin: self.get_bin_name(), + error: Box::new(error), + })?; + + self.write_input_to_child(&mut child, &line).await?; + + child + } else { + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::Stream { + bin: self.get_bin_name(), + error: Box::new(error), + })? + }; + + let shared_child = registry.add_running(child).await; + + self.log_command(&line, &shared_child); + + let result = shared_child + .wait() + .await + .map_err(|error| ProcessError::Stream { + bin: self.get_bin_name(), + error: Box::new(error), + }); + + registry.remove_running(shared_child).await; + + let status = result?; + let output = Output { + status, + stderr: vec![], + stdout: vec![], + }; + + self.handle_nonzero_status(&output, false)?; + + Ok(output) + } + + pub async fn exec_stream_and_capture_output(&mut self) -> miette::Result { + let registry = ProcessRegistry::instance(); + let (mut command, line) = self.create_async_command(); + + command + .stdin(if self.should_pass_stdin() { + Stdio::piped() + } else { + Stdio::inherit() + }) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()); + + let mut child = + wrap_command(command) + .spawn() + .map_err(|error| ProcessError::StreamCapture { + bin: self.get_bin_name(), + error: Box::new(error), + })?; + + if self.should_pass_stdin() { + self.write_input_to_child(&mut child, &line).await?; + } + + let shared_child = registry.add_running(child).await; + + // We need to log the child process output to the parent terminal + // AND capture stdout/stderr so that we can cache it for future runs. + // This doesn't seem to be supported natively by `Stdio`, so I have + // this *real ugly* implementation to solve it. There's gotta be a + // better way to do this? + // https://stackoverflow.com/a/49063262 + let stderr = BufReader::new(shared_child.take_stderr().await.unwrap()); + let stdout = BufReader::new(shared_child.take_stdout().await.unwrap()); + let mut handles = vec![]; + + let captured_stderr = Arc::new(RwLock::new(vec![])); + let captured_stdout = Arc::new(RwLock::new(vec![])); + let captured_stderr_clone = Arc::clone(&captured_stderr); + let captured_stdout_clone = Arc::clone(&captured_stdout); + + let prefix = Arc::new(self.get_prefix().map(|prefix| prefix.to_owned())); + let stderr_prefix = Arc::clone(&prefix); + let stdout_prefix = Arc::clone(&prefix); + + let console = self + .console + .as_ref() + .expect("A console is required when streaming output!"); + let stderr_stream = Arc::new(console.stderr().to_owned()); + let stdout_stream = Arc::new(console.stdout().to_owned()); + + handles.push(task::spawn(async move { + let mut lines = stderr.lines(); + let mut captured_lines = vec![]; + + while let Ok(Some(line)) = lines.next_line().await { + let _ = if let Some(prefix) = &*stderr_prefix { + stderr_stream.write_line_with_prefix(&line, prefix) + } else { + stderr_stream.write_line(&line) + }; + + captured_lines.push(line); + } + + captured_stderr_clone + .write() + .unwrap() + .extend(captured_lines); + })); + + handles.push(task::spawn(async move { + let mut lines = stdout.lines(); + let mut captured_lines = vec![]; + + while let Ok(Some(line)) = lines.next_line().await { + let _ = if let Some(prefix) = &*stdout_prefix { + stdout_stream.write_line_with_prefix(&line, prefix) + } else { + stdout_stream.write_line(&line) + }; + + captured_lines.push(line); + } + + captured_stdout_clone + .write() + .unwrap() + .extend(captured_lines); + })); + + for handle in handles { + let _ = handle.await; + } + + self.log_command(&line, &shared_child); + + // Attempt to create the child output + let result = shared_child + .wait() + .await + .map_err(|error| ProcessError::StreamCapture { + bin: self.get_bin_name(), + error: Box::new(error), + }); + + registry.remove_running(shared_child).await; + + let status = result?; + let output = Output { + status, + stdout: captured_stdout.read().unwrap().join("\n").into_bytes(), + stderr: captured_stderr.read().unwrap().join("\n").into_bytes(), + }; + + self.handle_nonzero_status(&output, true)?; + + Ok(output) + } + + // pub async fn exec_stream_and_capture_output_new(&mut self) -> miette::Result { + // let registry = ProcessRegistry::instance(); + // let (mut command, line) = self.create_async_command(); + + // let mut child = command + // .stdin(if self.should_pass_stdin() { + // Stdio::piped() + // } else { + // Stdio::inherit() + // }) + // .stderr(Stdio::piped()) + // .stdout(Stdio::piped()) + // .spawn() + // .map_err(|error| ProcessError::StreamCapture { + // bin: self.get_bin_name(), + // error: Box::new(error), + // })?; + + // if self.should_pass_stdin() { + // self.write_input_to_child(&mut child, &line).await?; + // } + + // let shared_child = registry.add_running(child).await; + + // // Stream and attempt to capture the output + // let stderr = shared_child.take_stderr().await.unwrap(); + // let mut stderr_buffer = Vec::new(); + // let mut stderr_pos = 0; + + // let stdout = shared_child.take_stdout().await.unwrap(); + // let mut stdout_buffer = Vec::new(); + // let mut stdout_pos = 0; + + // let prefix = self.get_prefix(); + // let console = self + // .console + // .as_ref() + // .expect("A console is required when streaming output!"); + + // capture_stream(stdout, stderr, &mut |is_out, data, eof| { + // let (pos, buf) = if is_out { + // (&mut stdout_pos, &mut stdout_buffer) + // } else { + // (&mut stderr_pos, &mut stderr_buffer) + // }; + + // let idx = if eof { + // data.len() + // } else { + // match data[*pos..].iter().rposition(|b| *b == b'\n') { + // Some(i) => *pos + i + 1, + // None => { + // *pos = data.len(); + // return; + // } + // } + // }; + + // let new_lines = &data[..idx]; + + // for line in String::from_utf8_lossy(new_lines).lines() { + // let stream = if is_out { &console.out } else { &console.err }; + + // let _ = if let Some(p) = &prefix { + // stream.write_line_with_prefix(line.trim(), p) + // } else { + // stream.write_line(line.trim()) + // }; + // } + + // buf.extend(new_lines); + // data.drain(..idx); + // *pos = 0; + // }) + // .await + // .map_err(|error| ProcessError::StreamCapture { + // bin: self.get_bin_name(), + // error: Box::new(error), + // })?; + + // self.log_command(&line, &shared_child); + + // // Attempt to create the child output + // let result = shared_child + // .wait() + // .await + // .map_err(|error| ProcessError::StreamCapture { + // bin: self.get_bin_name(), + // error: Box::new(error), + // }); + + // registry.remove_running(shared_child).await; + + // let status = result?; + // let output = Output { + // status, + // stdout: stdout_buffer, + // stderr: stderr_buffer, + // }; + + // self.handle_nonzero_status(&output, true)?; + + // Ok(output) + // } + + fn create_async_command(&self) -> (TokioCommand, CommandLine) { + let command_line = self.create_command_line(); + + let mut command = TokioCommand::new(&command_line.command[0]); + command.args(&command_line.command[1..]); + + for (key, value) in &self.env { + if let Some(value) = value { + command.env(key, value); + } else { + command.env_remove(key); + } + } + + if let Some(cwd) = &self.cwd { + command.current_dir(cwd); + } + + (command, command_line) + } + + fn create_command_line(&self) -> CommandLine { + CommandLine::new(self) + } + + fn handle_nonzero_status(&mut self, output: &Output, with_message: bool) -> miette::Result<()> { + if self.should_error_nonzero() && !output.status.success() { + return Err(output_to_error(self.get_bin_name(), output, with_message).into()); + } + + Ok(()) + } + + fn log_command(&self, line: &CommandLine, child: &SharedChild) { + let workspace_env_key = OsString::from("MOON_WORKSPACE_ROOT"); + let workspace_root = if let Some(Some(value)) = self.env.get(&workspace_env_key) { + PathBuf::from(value) + } else { + env::var_os(&workspace_env_key).map_or_else( + || env::current_dir().unwrap_or(PathBuf::from(".")), + PathBuf::from, + ) + }; + let working_dir = PathBuf::from(self.cwd.as_deref().unwrap_or(workspace_root.as_os_str())); + + if let Some(console) = self.console.as_ref() { + if self.print_command && !console.out.is_quiet() { + let _ = console.out.write_line(CommandLine::format( + &line.get_line(false, false), + &workspace_root, + &working_dir, + )); + } + } + + // Avoid all this overhead if we're not logging + if !enabled!(tracing::Level::DEBUG) { + return; + } + + let debug_env = env::var("MOON_DEBUG_PROCESS_ENV").is_ok(); + let env_vars: FxHashMap<&OsString, &OsString> = self + .env + .iter() + .filter_map(|(key, value)| { + if value.is_none() { + None + } else if debug_env + || key + .to_str() + .map(|k| k.starts_with("MOON_")) + .unwrap_or_default() + { + Some((key, value.as_ref().unwrap())) + } else { + None + } + }) + .collect(); + + debug!( + pid = child.id(), + shell = self.shell.as_ref().map(|sh| &sh.bin_name), + env_vars = ?env_vars, + working_dir = ?working_dir, + "Running command {}", + color::shell(line.to_string()) + ); + } + + async fn write_input_to_child( + &self, + child: &mut Box, + line: &CommandLine, + ) -> miette::Result<()> { + let input = line.input.join(OsStr::new(" ")); + + let mut stdin = child.stdin().take().unwrap_or_else(|| { + panic!("Unable to write stdin: {}", input.to_string_lossy()); + }); + + stdin + .write_all(input.as_encoded_bytes()) + .await + .map_err(|error| ProcessError::WriteInput { + bin: self.get_bin_name(), + error: Box::new(error), + })?; + + drop(stdin); + + Ok(()) + } +} diff --git a/crates/process/src/lib.rs b/crates/process/src/lib.rs index 413365410ac..273c7434820 100644 --- a/crates/process/src/lib.rs +++ b/crates/process/src/lib.rs @@ -1,13 +1,20 @@ -mod async_command; mod command; -mod command_inspector; +mod command_line; +mod exec_command; mod output; +// mod output_stream; mod process_error; +mod process_registry; +mod shared_child; mod shell; +mod signal; -pub use async_command::*; pub use command::*; +pub use command_line::*; pub use moon_args as args; pub use output::*; pub use process_error::*; +pub use process_registry::*; +pub use shared_child::*; pub use shell::*; +pub use signal::*; diff --git a/crates/process/src/output_stream.rs b/crates/process/src/output_stream.rs new file mode 100644 index 00000000000..cfae2257695 --- /dev/null +++ b/crates/process/src/output_stream.rs @@ -0,0 +1,203 @@ +// This code is copied from Cargo, but rewritten to support tokio/async. +// Original copyright belongs to them! +// https://github.com/rust-lang/cargo/blob/master/crates/cargo-util/src/read2.rs + +pub use self::imp::capture_stream; + +#[cfg(unix)] +mod imp { + use libc::{c_int, fcntl, F_GETFL, F_SETFL, O_NONBLOCK}; + use std::io; + use std::mem; + use std::os::unix::prelude::*; + use tokio::io::{AsyncBufReadExt, BufReader}; + use tokio::process::{ChildStderr, ChildStdout}; + + fn set_nonblock(fd: c_int) -> io::Result<()> { + let flags = unsafe { fcntl(fd, F_GETFL) }; + + if flags == -1 || unsafe { fcntl(fd, F_SETFL, flags | O_NONBLOCK) } == -1 { + return Err(io::Error::last_os_error()); + } + + Ok(()) + } + + pub async fn capture_stream( + out_pipe: ChildStdout, + err_pipe: ChildStderr, + data: &mut (dyn FnMut(bool, &mut Vec, bool) + Send), + ) -> io::Result<()> { + let out_fd = out_pipe.as_raw_fd(); + let err_fd = err_pipe.as_raw_fd(); + + set_nonblock(out_fd)?; + set_nonblock(err_fd)?; + + let mut out_buf = BufReader::new(out_pipe); + let mut err_buf = BufReader::new(err_pipe); + let mut out_done = false; + let mut err_done = false; + let mut out = String::new(); + let mut err = String::new(); + + let mut fds: [libc::pollfd; 2] = unsafe { mem::zeroed() }; + fds[0].fd = out_fd; + fds[0].events = libc::POLLIN; + fds[1].fd = err_fd; + fds[1].events = libc::POLLIN; + let mut nfds = 2; + let mut errfd = 1; + + while nfds > 0 { + // Wait for either pipe to become readable using `poll` + let r = unsafe { libc::poll(fds.as_mut_ptr(), nfds, -1) }; + + if r == -1 { + let err = io::Error::last_os_error(); + + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + + return Err(err); + } + + // Read as much as we can from each pipe, ignoring EWOULDBLOCK or + // EAGAIN. If we hit EOF, then this will happen because the underlying + // reader will return Ok(0), in which case we'll see `Ok` ourselves. In + // this case we flip the other fd back into blocking mode and read + // whatever's leftover on that file descriptor. + let handle = |res: io::Result| match res { + Ok(size) => Ok(size == 0), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + Ok(false) + } else { + Err(e) + } + } + }; + + if !err_done && fds[errfd].revents != 0 && handle(err_buf.read_line(&mut err).await)? { + err_done = true; + nfds -= 1; + } + + data(false, &mut mem::take(&mut err).into_bytes(), err_done); + + if !out_done && fds[0].revents != 0 && handle(out_buf.read_line(&mut out).await)? { + out_done = true; + fds[0].fd = err_fd; + errfd = 0; + nfds -= 1; + } + + data(true, &mut mem::take(&mut out).into_bytes(), out_done); + } + Ok(()) + } +} + +#[cfg(windows)] +mod imp { + use std::io; + use std::os::windows::prelude::*; + use std::process::{ChildStderr, ChildStdout}; + use std::slice; + + use miow::iocp::{CompletionPort, CompletionStatus}; + use miow::pipe::NamedPipe; + use miow::Overlapped; + use windows_sys::Win32::Foundation::ERROR_BROKEN_PIPE; + + struct Pipe<'a> { + dst: &'a mut Vec, + overlapped: Overlapped, + pipe: NamedPipe, + done: bool, + } + + pub fn read2( + out_pipe: ChildStdout, + err_pipe: ChildStderr, + data: &mut dyn FnMut(bool, &mut Vec, bool), + ) -> io::Result<()> { + let mut out = Vec::new(); + let mut err = Vec::new(); + + let port = CompletionPort::new(1)?; + port.add_handle(0, &out_pipe)?; + port.add_handle(1, &err_pipe)?; + + unsafe { + let mut out_pipe = Pipe::new(out_pipe, &mut out); + let mut err_pipe = Pipe::new(err_pipe, &mut err); + + out_pipe.read()?; + err_pipe.read()?; + + let mut status = [CompletionStatus::zero(), CompletionStatus::zero()]; + + while !out_pipe.done || !err_pipe.done { + for status in port.get_many(&mut status, None)? { + if status.token() == 0 { + out_pipe.complete(status); + data(true, out_pipe.dst, out_pipe.done); + out_pipe.read()?; + } else { + err_pipe.complete(status); + data(false, err_pipe.dst, err_pipe.done); + err_pipe.read()?; + } + } + } + + Ok(()) + } + } + + impl<'a> Pipe<'a> { + unsafe fn new(p: P, dst: &'a mut Vec) -> Pipe<'a> { + Pipe { + dst, + pipe: NamedPipe::from_raw_handle(p.into_raw_handle()), + overlapped: Overlapped::zero(), + done: false, + } + } + + unsafe fn read(&mut self) -> io::Result<()> { + let dst = slice_to_end(self.dst); + match self.pipe.read_overlapped(dst, self.overlapped.raw()) { + Ok(_) => Ok(()), + Err(e) => { + if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) { + self.done = true; + Ok(()) + } else { + Err(e) + } + } + } + } + + unsafe fn complete(&mut self, status: &CompletionStatus) { + let prev = self.dst.len(); + self.dst.set_len(prev + status.bytes_transferred() as usize); + if status.bytes_transferred() == 0 { + self.done = true; + } + } + } + + unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { + if v.capacity() == 0 { + v.reserve(16); + } + if v.capacity() == v.len() { + v.reserve(1); + } + slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len()) + } +} diff --git a/crates/process/src/process_registry.rs b/crates/process/src/process_registry.rs new file mode 100644 index 00000000000..44e69416d85 --- /dev/null +++ b/crates/process/src/process_registry.rs @@ -0,0 +1,165 @@ +use crate::shared_child::*; +use crate::signal::*; +use core::time::Duration; +use process_wrap::tokio::TokioChildWrapper; +use rustc_hash::FxHashMap; +use std::sync::{Arc, OnceLock}; +use tokio::sync::broadcast::{self, error::RecvError, Receiver, Sender}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tokio::time::sleep; +use tracing::{debug, trace, warn}; + +static INSTANCE: OnceLock> = OnceLock::new(); + +pub struct ProcessRegistry { + running: Arc>>, + signal_sender: Sender, + signal_wait_handle: JoinHandle<()>, + signal_shutdown_handle: JoinHandle<()>, +} + +impl Default for ProcessRegistry { + fn default() -> Self { + let processes = Arc::new(RwLock::new(FxHashMap::default())); + let processes_bg = Arc::clone(&processes); + + let (sender, receiver) = broadcast::channel::(10); + let sender_bg = sender.clone(); + + let signal_wait_handle = tokio::spawn(async move { + wait_for_signal(sender_bg).await; + }); + + let signal_shutdown_handle = tokio::spawn(async move { + shutdown_processes_with_signal(receiver, processes_bg).await; + }); + + Self { + running: processes, + signal_sender: sender, + signal_wait_handle, + signal_shutdown_handle, + } + } +} + +impl ProcessRegistry { + pub fn instance() -> Arc { + Arc::clone(INSTANCE.get_or_init(|| Arc::new(ProcessRegistry::default()))) + } + + pub async fn add_running(&self, child: Box) -> SharedChild { + let shared = SharedChild::new(child); + + self.running + .write() + .await + .insert(shared.id(), shared.clone()); + + shared + } + + pub async fn get_running_by_pid(&self, id: u32) -> Option { + self.running.read().await.get(&id).cloned() + } + + pub async fn remove_running(&self, child: SharedChild) { + self.remove_running_by_pid(child.id()).await + } + + pub async fn remove_running_by_pid(&self, id: u32) { + self.running.write().await.remove(&id); + } + + pub fn receive_signal(&self) -> Receiver { + self.signal_sender.subscribe() + } + + pub fn terminate_running(&self) { + let _ = self.signal_sender.send(SignalType::Terminate); + } + + pub async fn wait_for_running_to_shutdown(&self) { + let mut count = 0; + let mut terminated = false; + + loop { + // After 1.5 second of waiting, terminate all running, + // as some of them may have "press ctrl+c again" logic + if !terminated && count >= 1500 { + self.terminate_running(); + terminated = true; + } + + // After 3 seconds of waiting, just exit immediately + if count >= 3000 { + break; + } + + // Wait for all running processes to have stopped + if self.running.read().await.is_empty() { + break; + } + + sleep(Duration::from_millis(50)).await; + count += 50; + } + } +} + +impl Drop for ProcessRegistry { + fn drop(&mut self) { + self.terminate_running(); + self.signal_wait_handle.abort(); + self.signal_shutdown_handle.abort(); + } +} + +async fn shutdown_processes_with_signal( + mut receiver: Receiver, + processes: Arc>>, +) { + let signal: SignalType; + + loop { + signal = match receiver.recv().await { + Ok(signal) => signal, + Err(RecvError::Closed) => SignalType::Terminate, + _ => continue, + }; + + break; + } + + let mut children = processes.write().await; + + if children.is_empty() { + return; + } + + debug!( + pids = ?children.keys().collect::>(), + signal = ?signal, + "Shutting down {} running child processes", + children.len() + ); + + // children.clear(); + + let mut futures = vec![]; + + for (pid, child) in children.drain() { + trace!(pid, "Killing child process"); + + futures.push(tokio::spawn(async move { + if let Err(error) = child.kill_with_signal(signal).await { + warn!(pid, "Failed to kill child process: {error}"); + } + })); + } + + for future in futures { + let _ = future.await; + } +} diff --git a/crates/process/src/shared_child.rs b/crates/process/src/shared_child.rs new file mode 100644 index 00000000000..74f1d757606 --- /dev/null +++ b/crates/process/src/shared_child.rs @@ -0,0 +1,113 @@ +use crate::signal::*; +use process_wrap::tokio::TokioChildWrapper; +use std::io; +use std::process::{ExitStatus, Output}; +use std::sync::Arc; +use tokio::process::{ChildStderr, ChildStdin, ChildStdout}; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct SharedChild { + inner: Arc>>, + pid: u32, +} + +impl SharedChild { + pub fn new(child: Box) -> Self { + Self { + pid: child.id().unwrap(), + inner: Arc::new(Mutex::new(child)), + } + } + + pub fn id(&self) -> u32 { + self.pid + } + + pub async fn take_stdin(&self) -> Option { + self.inner.lock().await.as_mut().stdin().take() + } + + pub async fn take_stdout(&self) -> Option { + self.inner.lock().await.as_mut().stdout().take() + } + + pub async fn take_stderr(&self) -> Option { + self.inner.lock().await.as_mut().stderr().take() + } + + pub async fn kill(&self) -> io::Result<()> { + let mut child = self.inner.lock().await; + + Box::into_pin(child.kill()).await?; + + Ok(()) + } + + pub async fn kill_with_signal(&self, signal: SignalType) -> io::Result<()> { + let mut child = self.inner.lock().await; + + dbg!("kill_with_signal", self.id(), signal); + + // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/unix/process/process_unix.rs#L947 + #[cfg(unix)] + { + child.signal(match signal { + SignalType::Interrupt => 2, // SIGINT + SignalType::Quit => 3, // SIGQUIT + SignalType::Terminate => 15, // SIGTERM + })?; + } + + // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/windows/process.rs#L658 + #[cfg(windows)] + { + child.start_kill()?; + } + + Box::into_pin(child.wait()).await?; + + Ok(()) + } + + pub(crate) async fn wait(&self) -> io::Result { + let mut child = self.inner.lock().await; + + Box::into_pin(child.wait()).await + } + + // This method re-implements the tokio `wait_with_output` method + // but does not take ownership of self. This is required to be able + // to call `kill`, otherwise the child does not exist. + pub(crate) async fn wait_with_output(&self) -> io::Result { + use tokio::{io::AsyncReadExt, try_join}; + + let mut child = self.inner.lock().await; + + async fn read_to_end(data: &mut Option) -> io::Result> { + let mut vec = Vec::new(); + if let Some(data) = data.as_mut() { + data.read_to_end(&mut vec).await?; + } + Ok(vec) + } + + let mut stdout_pipe = child.stdout().take(); + let mut stderr_pipe = child.stderr().take(); + + let stdout_fut = read_to_end(&mut stdout_pipe); + let stderr_fut = read_to_end(&mut stderr_pipe); + + let (status, stdout, stderr) = + try_join!(Box::into_pin(child.wait()), stdout_fut, stderr_fut)?; + + drop(stdout_pipe); + drop(stderr_pipe); + + Ok(Output { + status, + stdout, + stderr, + }) + } +} diff --git a/crates/process/src/shell.rs b/crates/process/src/shell.rs index 26e81357725..dbfe660d48e 100644 --- a/crates/process/src/shell.rs +++ b/crates/process/src/shell.rs @@ -21,18 +21,19 @@ fn get_default_shell() -> ShellType { #[inline] pub fn is_windows_script>(bin: T) -> bool { - let bin = bin.as_ref().to_string_lossy(); - - bin.ends_with(".cmd") - || bin.ends_with(".bat") - || bin.ends_with(".ps1") - || bin.ends_with(".CMD") - || bin.ends_with(".BAT") - || bin.ends_with(".PS1") + bin.as_ref().to_str().is_some_and(|bin| { + bin.ends_with(".cmd") + || bin.ends_with(".bat") + || bin.ends_with(".ps1") + || bin.ends_with(".CMD") + || bin.ends_with(".BAT") + || bin.ends_with(".PS1") + }) } pub struct Shell { pub bin: PathBuf, + pub bin_name: String, pub command: ShellCommand, } @@ -42,7 +43,8 @@ impl Shell { let command = type_of.build().get_exec_command(); Self { - bin: find_command_on_path(bin_name.clone()).unwrap_or_else(|| bin_name.into()), + bin: find_command_on_path(bin_name.clone()).unwrap_or_else(|| bin_name.clone().into()), + bin_name, command, } } diff --git a/crates/process/src/signal.rs b/crates/process/src/signal.rs new file mode 100644 index 00000000000..d4b5dcdbebb --- /dev/null +++ b/crates/process/src/signal.rs @@ -0,0 +1,89 @@ +// https://www.math.stonybrook.edu/~ccc/dfc/dfc/signals.html +// https://sunshowers.io/posts/beyond-ctrl-c-signals/ + +use tokio::sync::broadcast::Sender; +use tracing::debug; + +#[derive(Clone, Copy, Debug)] +pub enum SignalType { + Interrupt, + Quit, + Terminate, +} + +#[cfg(unix)] +pub async fn wait_for_signal(sender: Sender) { + use tokio::signal::unix::{signal, SignalKind}; + + debug!("Listening for SIGINT, SIGQUIT, and SIGTERM signals"); + + let mut signal_terminate = signal(SignalKind::terminate()).unwrap(); + let mut signal_interrupt = signal(SignalKind::interrupt()).unwrap(); + let mut signal_quit = signal(SignalKind::quit()).unwrap(); + + let _ = tokio::select! { + _ = signal_terminate.recv() => { + debug!("Received SIGTERM signal"); + sender.send(SignalType::Terminate) + }, + _ = signal_interrupt.recv() => { + debug!("Received SIGINT signal"); + sender.send(SignalType::Interrupt) + }, + _ = signal_quit.recv() => { + debug!("Received SIGQUIT signal"); + sender.send(SignalType::Quit) + }, + }; +} + +#[cfg(windows)] +pub async fn wait_for_signal(sender: Sender) { + use tokio::signal::windows; + + debug!("Listening for CTRL-C, BREAK, CLOSE, and SHUTDOWN signals"); + + let mut signal_c = windows::ctrl_c().unwrap(); // Interrupt + let mut signal_break = windows::ctrl_break().unwrap(); // Interrupt + let mut signal_close = windows::ctrl_close().unwrap(); // Quit + let mut signal_shutdown = windows::ctrl_shutdown().unwrap(); // Terminate + + let _ = tokio::select! { + _ = signal_c.recv() => { + debug!("Received CTRL-C signal"); + sender.send(SignalType::Interrupt) + }, + _ = signal_break.recv() => { + debug!("Received CTRL-BREAK signal"); + sender.send(SignalType::Interrupt) + }, + _ = signal_close.recv() => { + debug!("Received CTRL-CLOSE signal"); + sender.send(SignalType::Quit) + }, + _ = signal_shutdown.recv() => { + debug!("Received CTRL-SHUTDOWN signal"); + sender.send(SignalType::Terminate) + }, + }; +} + +#[cfg(unix)] +pub fn kill(pid: u32, signal: SignalType) -> std::io::Result<()> { + let result = unsafe { + libc::kill( + pid as i32, + match signal { + SignalType::Interrupt => 2, // SIGINT + SignalType::Quit => 3, // SIGQUIT + SignalType::Terminate => 15, // SIGTERM + }, + ) + }; + + if result != 0 { + return Err(std::io::Error::last_os_error()); + } + + Ok(()) +} diff --git a/crates/task-expander/tests/task_expander_test.rs b/crates/task-expander/tests/task_expander_test.rs index a5c55a08a69..76d3f6220ee 100644 --- a/crates/task-expander/tests/task_expander_test.rs +++ b/crates/task-expander/tests/task_expander_test.rs @@ -26,9 +26,7 @@ mod task_expander { task.input_files.insert("project/source/out".into()); let context = create_context(sandbox.path()); - let task = TaskExpander::new(&project, &context) - .expand(&mut task) - .unwrap(); + let task = TaskExpander::new(&project, &context).expand(&task).unwrap(); assert!(task.input_files.is_empty()); assert_eq!( @@ -48,9 +46,7 @@ mod task_expander { task.input_globs.insert("project/source/out/**/*".into()); let context = create_context(sandbox.path()); - let task = TaskExpander::new(&project, &context) - .expand(&mut task) - .unwrap(); + let task = TaskExpander::new(&project, &context).expand(&task).unwrap(); assert!(task.input_globs.is_empty()); assert_eq!( @@ -72,9 +68,7 @@ mod task_expander { task.inputs = vec![InputPath::ProjectFile("dir".into())]; let context = create_context(sandbox.path()); - let task = TaskExpander::new(&project, &context) - .expand(&mut task) - .unwrap(); + let task = TaskExpander::new(&project, &context).expand(&task).unwrap(); assert!(task.input_files.is_empty()); assert_eq!( diff --git a/crates/task-runner/src/command_executor.rs b/crates/task-runner/src/command_executor.rs index 4d11db1ede3..8e88eab2fb3 100644 --- a/crates/task-runner/src/command_executor.rs +++ b/crates/task-runner/src/command_executor.rs @@ -1,10 +1,10 @@ use moon_action::{ActionNode, ActionStatus, Operation, OperationList}; use moon_action_context::{ActionContext, TargetState}; use moon_app_context::AppContext; -use moon_common::{color, is_ci, is_test_env}; +use moon_common::{is_ci, is_test_env}; use moon_config::TaskOutputStyle; use moon_console::TaskReportItem; -use moon_process::{args::join_args, AsyncCommand, Command}; +use moon_process::{args::join_args, Command, CommandLine}; use moon_project::Project; use moon_task::Task; use std::process::Output; @@ -108,7 +108,7 @@ impl<'task> CommandExecutor<'task> { // Attempt to execute command async fn execute_command( - mut command: AsyncCommand<'_>, + command: &mut Command, stream: bool, interactive: bool, ) -> miette::Result { @@ -134,7 +134,7 @@ impl<'task> CommandExecutor<'task> { // Or run the job to completion result = execute_command( - self.command.create_async(), + &mut self.command, self.stream, self.interactive, ) => result.map(Some), @@ -340,22 +340,25 @@ impl<'task> CommandExecutor<'task> { } } + // We don't use `Command::print_command` because we need to explicitly + // control the workspace root and working directory! fn print_command_line(&self, command_line: &str) -> miette::Result<()> { if !self.app.workspace_config.runner.log_running_command { return Ok(()); } - let message = color::muted_light(self.command.inspect().format_command( + let workspace_root = &self.app.workspace_root; + let working_dir = if self.task.options.run_from_workspace_root { + &self.app.workspace_root + } else { + &self.project.root + }; + + self.app.console.out.write_line(CommandLine::format( command_line, - &self.app.workspace_root, - Some(if self.task.options.run_from_workspace_root { - &self.app.workspace_root - } else { - &self.project.root - }), - )); - - self.app.console.out.write_line(message)?; + workspace_root, + working_dir, + ))?; Ok(()) } diff --git a/crates/task-runner/tests/command_builder_test.rs b/crates/task-runner/tests/command_builder_test.rs index b43c17877d2..b53a7e2f8ee 100644 --- a/crates/task-runner/tests/command_builder_test.rs +++ b/crates/task-runner/tests/command_builder_test.rs @@ -15,7 +15,7 @@ fn get_env<'a>(command: &'a Command, key: &str) -> Option<&'a str> { command .env .get(&OsString::from(key)) - .map(|v| v.to_str().unwrap()) + .map(|v| v.as_ref().unwrap().to_str().unwrap()) } fn get_args(command: &Command) -> Vec<&str> { @@ -34,7 +34,10 @@ mod command_builder { let container = TaskRunnerContainer::new("builder", "base").await; let command = container.create_command(ActionContext::default()).await; - assert_eq!(command.cwd, Some(container.sandbox.path().join("project"))); + assert_eq!( + command.cwd.as_deref(), + Some(container.sandbox.path().join("project").as_os_str()) + ); } #[tokio::test] @@ -46,7 +49,10 @@ mod command_builder { }) .await; - assert_eq!(command.cwd, Some(container.sandbox.path().to_path_buf())); + assert_eq!( + command.cwd.as_deref(), + Some(container.sandbox.path().as_os_str()) + ); } mod args { diff --git a/crates/vcs/src/process_cache.rs b/crates/vcs/src/process_cache.rs index d5459cce8f7..ddcfdf1a852 100644 --- a/crates/vcs/src/process_cache.rs +++ b/crates/vcs/src/process_cache.rs @@ -85,12 +85,11 @@ impl ProcessCache { pub async fn run_command_with_formatter( &self, - command: Command, + mut command: Command, trim: bool, format: impl FnOnce(String) -> String, ) -> miette::Result> { - let mut executor = command.create_async(); - let cache_key = executor.inspector.get_cache_key(); + let cache_key = command.get_cache_key(); // First check if the data has already been cached if let Some(cache) = self.cache.read_async(&cache_key, |_, v| v.clone()).await { @@ -101,7 +100,7 @@ impl ProcessCache { let cache = match self.cache.entry_async(cache_key).await { Entry::Occupied(o) => o.get().clone(), Entry::Vacant(v) => { - let output = executor.exec_capture_output().await?; + let output = command.exec_capture_output().await?; let value = output_to_string(&output.stdout); let cache = Arc::new(format(if trim { value.trim().to_owned() } else { value })); @@ -116,11 +115,10 @@ impl ProcessCache { pub async fn run_command_without_cache( &self, - command: Command, + mut command: Command, trim: bool, ) -> miette::Result> { - let mut executor = command.create_async(); - let output = executor.exec_capture_output().await?; + let output = command.exec_capture_output().await?; let value = output_to_string(&output.stdout); Ok(Arc::new(if trim { value.trim().to_owned() } else { value })) diff --git a/legacy/bun/tool/src/bun_tool.rs b/legacy/bun/tool/src/bun_tool.rs index 9bdfec743e9..9e066ad02e5 100644 --- a/legacy/bun/tool/src/bun_tool.rs +++ b/legacy/bun/tool/src/bun_tool.rs @@ -90,7 +90,7 @@ impl BunTool { cmd.arg("bun.lockb"); cmd.cwd(cwd); - let output = cmd.create_async().exec_capture_output().await?; + let output = cmd.exec_capture_output().await?; Arc::new(output_to_string(&output.stdout)) }; @@ -250,8 +250,6 @@ impl DependencyManager<()> for BunTool { .cwd(working_dir) .set_print_command(log); - let mut cmd = cmd.create_async(); - if env::var("MOON_TEST_HIDE_INSTALL_OUTPUT").is_ok() { cmd.exec_capture_output().await?; } else { @@ -275,7 +273,7 @@ impl DependencyManager<()> for BunTool { cmd.arg("--production"); } - cmd.create_async().exec_stream_output().await?; + cmd.exec_stream_output().await?; Ok(()) } diff --git a/legacy/deno/platform/src/deno_platform.rs b/legacy/deno/platform/src/deno_platform.rs index 2c98601de3d..70f43c5b343 100644 --- a/legacy/deno/platform/src/deno_platform.rs +++ b/legacy/deno/platform/src/deno_platform.rs @@ -252,7 +252,6 @@ impl Platform for DenoPlatform { deno.create_command(&())? .args(args) .cwd(working_dir) - .create_async() .exec_stream_output() .await }) diff --git a/legacy/deno/tool/src/deno_tool.rs b/legacy/deno/tool/src/deno_tool.rs index 39fb6759740..d30c20de3de 100644 --- a/legacy/deno/tool/src/deno_tool.rs +++ b/legacy/deno/tool/src/deno_tool.rs @@ -217,8 +217,6 @@ impl DependencyManager<()> for DenoTool { .cwd(working_dir) .set_print_command(log); - let mut cmd = cmd.create_async(); - if env::var("MOON_TEST_HIDE_INSTALL_OUTPUT").is_ok() { cmd.exec_capture_output().await?; } else { diff --git a/legacy/node/tool/src/bun_tool.rs b/legacy/node/tool/src/bun_tool.rs index 6b6f8226045..468c52b2abc 100644 --- a/legacy/node/tool/src/bun_tool.rs +++ b/legacy/node/tool/src/bun_tool.rs @@ -89,7 +89,7 @@ impl BunTool { cmd.arg("bun.lockb"); cmd.cwd(cwd); - let output = cmd.create_async().exec_capture_output().await?; + let output = cmd.exec_capture_output().await?; Arc::new(output_to_string(&output.stdout)) }; @@ -251,8 +251,6 @@ impl DependencyManager for BunTool { .cwd(working_dir) .set_print_command(log); - let mut cmd = cmd.create_async(); - if env::var("MOON_TEST_HIDE_INSTALL_OUTPUT").is_ok() { cmd.exec_capture_output().await?; } else { @@ -277,7 +275,7 @@ impl DependencyManager for BunTool { // cmd.arg("--production"); // } - cmd.create_async().exec_stream_output().await?; + cmd.exec_stream_output().await?; Ok(()) } diff --git a/legacy/node/tool/src/node_tool.rs b/legacy/node/tool/src/node_tool.rs index 17ce7329f28..215d4e99edd 100644 --- a/legacy/node/tool/src/node_tool.rs +++ b/legacy/node/tool/src/node_tool.rs @@ -154,11 +154,7 @@ impl NodeTool { } }; - cmd.args(args) - .cwd(working_dir) - .create_async() - .exec_stream_output() - .await?; + cmd.args(args).cwd(working_dir).exec_stream_output().await?; Ok(()) } diff --git a/legacy/node/tool/src/npm_tool.rs b/legacy/node/tool/src/npm_tool.rs index e9b4484f233..1e11aa08c63 100644 --- a/legacy/node/tool/src/npm_tool.rs +++ b/legacy/node/tool/src/npm_tool.rs @@ -154,7 +154,6 @@ impl DependencyManager for NpmTool { .args(["dedupe"]) .cwd(working_dir) .set_print_command(log) - .create_async() .exec_capture_output() .await?; @@ -209,8 +208,6 @@ impl DependencyManager for NpmTool { .cwd(working_dir) .set_print_command(log); - let mut cmd = cmd.create_async(); - if env::var("MOON_TEST_HIDE_INSTALL_OUTPUT").is_ok() { cmd.exec_capture_output().await?; } else { @@ -238,7 +235,7 @@ impl DependencyManager for NpmTool { cmd.args(["--workspace", package_name]); } - cmd.create_async().exec_stream_output().await?; + cmd.exec_stream_output().await?; Ok(()) } diff --git a/legacy/node/tool/src/pnpm_tool.rs b/legacy/node/tool/src/pnpm_tool.rs index 3ae225ce14b..02cb763f242 100644 --- a/legacy/node/tool/src/pnpm_tool.rs +++ b/legacy/node/tool/src/pnpm_tool.rs @@ -177,7 +177,6 @@ impl DependencyManager for PnpmTool { .arg("dedupe") .cwd(working_dir) .set_print_command(log) - .create_async() .exec_capture_output() .await?; @@ -227,8 +226,6 @@ impl DependencyManager for PnpmTool { .cwd(working_dir) .set_print_command(log); - let mut cmd = cmd.create_async(); - if env::var("MOON_TEST_HIDE_INSTALL_OUTPUT").is_ok() { cmd.exec_capture_output().await?; } else { @@ -263,7 +260,7 @@ impl DependencyManager for PnpmTool { cmd.arg(format!("{package}...")); } - cmd.create_async().exec_stream_output().await?; + cmd.exec_stream_output().await?; Ok(()) } diff --git a/legacy/node/tool/src/yarn_tool.rs b/legacy/node/tool/src/yarn_tool.rs index 4170bb6d5a8..a049c7a0914 100644 --- a/legacy/node/tool/src/yarn_tool.rs +++ b/legacy/node/tool/src/yarn_tool.rs @@ -70,7 +70,6 @@ impl YarnTool { for plugin in &self.config.plugins { self.create_command(node)? .args(["plugin", "import", plugin]) - .create_async() .exec_capture_output() .await?; } @@ -207,7 +206,6 @@ impl DependencyManager for YarnTool { .arg("dedupe") .cwd(working_dir) .set_print_command(log) - .create_async() .exec_capture_output() .await?; } else { @@ -261,8 +259,6 @@ impl DependencyManager for YarnTool { .cwd(working_dir) .set_print_command(log); - let mut cmd = cmd.create_async(); - if env::var("MOON_TEST_HIDE_INSTALL_OUTPUT").is_ok() { cmd.exec_capture_output().await?; } else { @@ -304,7 +300,7 @@ impl DependencyManager for YarnTool { cmd.arg("--production"); } - cmd.create_async().exec_stream_output().await?; + cmd.exec_stream_output().await?; Ok(()) } diff --git a/legacy/python/tool/src/python_tool.rs b/legacy/python/tool/src/python_tool.rs index 54bee3df22f..37bf2fe21a1 100644 --- a/legacy/python/tool/src/python_tool.rs +++ b/legacy/python/tool/src/python_tool.rs @@ -106,7 +106,7 @@ impl PythonTool { cmd.env("PROTO_PYTHON_VERSION", version); } - cmd.create_async().exec_stream_output().await?; + cmd.exec_stream_output().await?; Ok(()) } diff --git a/legacy/rust/tool/src/rust_tool.rs b/legacy/rust/tool/src/rust_tool.rs index 606eaab089d..cb6c86a7f5f 100644 --- a/legacy/rust/tool/src/rust_tool.rs +++ b/legacy/rust/tool/src/rust_tool.rs @@ -88,7 +88,6 @@ impl RustTool { ) .cwd(working_dir) .with_console(self.console.clone()) - .create_async() .exec_stream_output() .await?; @@ -109,7 +108,6 @@ impl RustTool { ) .cwd(working_dir) .with_console(self.console.clone()) - .create_async() .exec_stream_output() .await?; diff --git a/package.json b/package.json index 7e161b7d7a6..b4ecbc2f2e3 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ }, "workspaces": [ "packages/*", + "scenarios/*", "website" ], "engines": { diff --git a/scenarios/signals/moon.yml b/scenarios/signals/moon.yml new file mode 100644 index 00000000000..007eaf6e096 --- /dev/null +++ b/scenarios/signals/moon.yml @@ -0,0 +1,18 @@ +language: 'javascript' + +tasks: + dev-1: + command: 'node signals.mjs' + preset: 'server' + dev-2: + command: 'node signals.mjs' + preset: 'server' + dev-3: + command: 'node signals.mjs' + preset: 'server' + dev: + deps: ['dev-1', 'dev-2', 'dev-3'] + +toolchain: + typescript: + disabled: true diff --git a/scenarios/signals/signals.mjs b/scenarios/signals/signals.mjs new file mode 100644 index 00000000000..b609267bfc7 --- /dev/null +++ b/scenarios/signals/signals.mjs @@ -0,0 +1,15 @@ +let target = process.env.MOON_TARGET; + +console.log(`[${target}] Running`); + +for (let event of ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM', 'SIGBREAK']) { + process.on(event, (signal, code) => { + console.log(`[${target}] Received ${signal} (${code})!`); + process.exit(128 + code); + }); +} + +// Cause the process to take a while! +await new Promise((resolve) => { + setTimeout(resolve, 30000); +}); diff --git a/tsconfig.json b/tsconfig.json index 8a469614b6b..528eab34904 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -17,6 +17,9 @@ { "path": "packages/visualizer" }, + { + "path": "scenarios/signals" + }, { "path": "website" }