From cf9e76a98c61f6f87273993c6c4055c27fd4985c Mon Sep 17 00:00:00 2001 From: Max Desiatov Date: Thu, 25 Jan 2024 12:11:37 +0000 Subject: [PATCH] Update `AsyncProcess`, use it in `Shell` implementation (#77) These changes make `AsyncProcess` implementation more robust and increase its code coverage. Now we no longer have to rely on `Foundation.Process` directly in `Shell`, as `AsyncProcess` abstracts rough edges away for us and also provides an `async`-friendly API. --------- Co-authored-by: Johannes Weiss Co-authored-by: Euan Harris --- Package.resolved | 25 +- Package.swift | 6 +- Sources/AsyncProcess/FileContentStream.swift | 42 +- Sources/AsyncProcess/NIOAsyncPipeWriter.swift | 12 +- .../ProcessExecutor+Convenience.swift | 244 ++++++++++- Sources/AsyncProcess/ProcessExecutor.swift | 380 ++++++++++++++---- .../SwiftSDKGenerator+Download.swift | 19 +- .../Generator/SwiftSDKGenerator.swift | 16 +- .../Queries/CMakeBuildQuery.swift | 9 +- .../SystemUtils/ByteBuffer+Utils.swift | 17 +- .../SystemUtils/FileOperationError.swift | 3 +- .../SystemUtils/GeneratorError.swift | 12 + .../SwiftSDKGenerator/SystemUtils/Shell.swift | 174 +++----- .../AsyncProcessTests/IntegrationTests.swift | 315 +++++++++++++-- 14 files changed, 972 insertions(+), 302 deletions(-) diff --git a/Package.resolved b/Package.resolved index d2636d7..57181a0 100644 --- a/Package.resolved +++ b/Package.resolved @@ -41,8 +41,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-collections.git", "state" : { - "revision" : "937e904258d22af6e447a0b72c0bc67583ef64a2", - "version" : "1.0.4" + "revision" : "d029d9d39c87bed85b1c50adee7c41795261a192", + "version" : "1.0.6" } }, { @@ -54,6 +54,15 @@ "version" : "3.1.0" } }, + { + "identity" : "swift-http-types", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-http-types", + "state" : { + "revision" : "12358d55a3824bd5fed310b999ea8cf83a9a1a65", + "version" : "1.0.3" + } + }, { "identity" : "swift-log", "kind" : "remoteSourceControl", @@ -68,8 +77,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-nio.git", "state" : { - "revision" : "3db5c4aeee8100d2db6f1eaf3864afdad5dc68fd", - "version" : "2.59.0" + "revision" : "635b2589494c97e48c62514bc8b37ced762e0a62", + "version" : "2.63.0" } }, { @@ -77,8 +86,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-nio-extras.git", "state" : { - "revision" : "0e0d0aab665ff1a0659ce75ac003081f2b1c8997", - "version" : "1.19.0" + "revision" : "363da63c1966405764f380c627409b2f9d9e710b", + "version" : "1.21.0" } }, { @@ -86,8 +95,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-nio-http2.git", "state" : { - "revision" : "860622124b01cc1863b4e65dc449b6b457ca5704", - "version" : "1.25.1" + "revision" : "0904bf0feb5122b7e5c3f15db7df0eabe623dd87", + "version" : "1.30.0" } }, { diff --git a/Package.swift b/Package.swift index 18799c0..2b89ed3 100644 --- a/Package.swift +++ b/Package.swift @@ -21,10 +21,10 @@ let package = Package( .package(url: "https://github.com/apple/swift-argument-parser", from: "1.2.2"), .package(url: "https://github.com/apple/swift-async-algorithms.git", exact: "1.0.0-beta.1"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.1.0"), - .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.4"), + .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.5"), .package(url: "https://github.com/apple/swift-crypto.git", from: "3.1.0"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.58.0"), - .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.19.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"), + .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.20.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.5.3"), .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0"), .package(url: "https://github.com/apple/swift-syntax.git", from: "509.0.2"), diff --git a/Sources/AsyncProcess/FileContentStream.swift b/Sources/AsyncProcess/FileContentStream.swift index 911cc85..304cad7 100644 --- a/Sources/AsyncProcess/FileContentStream.swift +++ b/Sources/AsyncProcess/FileContentStream.swift @@ -67,10 +67,13 @@ struct FileContentStream: AsyncSequence { guard let blockingPool else { throw IOError(errnoValue: EINVAL) } - let fileHandle = NIOFileHandle(descriptor: dupedFD) + let fileHandle = NIOLoopBound( + NIOFileHandle(descriptor: dupedFD), + eventLoop: eventLoop + ) NonBlockingFileIO(threadPool: blockingPool) .readChunked( - fileHandle: fileHandle, + fileHandle: fileHandle.value, byteCount: .max, allocator: ByteBufferAllocator(), eventLoop: eventLoop, @@ -81,7 +84,7 @@ struct FileContentStream: AsyncSequence { } ) .whenComplete { result in - try! fileHandle.close() + try! fileHandle.value.close() switch result { case let .failure(error): asyncChannel.fail(error) @@ -96,20 +99,16 @@ struct FileContentStream: AsyncSequence { } .withConnectedSocket(dupedFD) case S_IFIFO: - let deadPipe = Pipe() NIOPipeBootstrap(group: eventLoop) .channelInitializer { channel in channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel)) } - .takingOwnershipOfDescriptors( - input: dupedFD, - output: dup(deadPipe.fileHandleForWriting.fileDescriptor) + .takingOwnershipOfDescriptor( + input: dupedFD ) .whenSuccess { channel in channel.close(mode: .output, promise: nil) } - try! deadPipe.fileHandleForReading.close() - try! deadPipe.fileHandleForWriting.close() case S_IFDIR: throw IOError(errnoValue: EISDIR) case S_IFBLK, S_IFCHR, S_IFLNK: @@ -206,25 +205,30 @@ private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler { private func sendOneItem(_ data: ReceivedEvent, context: ChannelHandlerContext) { context.eventLoop.assertInEventLoop() assert(self.shouldRead == false, "sendOneItem in unexpected state \(self.state)") - context.eventLoop.makeFutureWithTask { + let eventLoop = context.eventLoop + let sink = self.sink + let `self` = NIOLoopBound(self, eventLoop: context.eventLoop) + let context = NIOLoopBound(context, eventLoop: context.eventLoop) + eventLoop.makeFutureWithTask { + // note: We're _not_ on an EventLoop thread here switch data { case let .chunk(data): - await self.sink.send(data) + await sink.send(data) case .finish: - self.sink.finish() + sink.finish() } }.map { - if let moreToSend = self.state.didSendOne() { - self.sendOneItem(moreToSend, context: context) + if let moreToSend = self.value.state.didSendOne() { + self.value.sendOneItem(moreToSend, context: context.value) } else { - if self.heldUpRead { - context.eventLoop.execute { - context.read() + if self.value.heldUpRead { + eventLoop.execute { + context.value.read() } } } }.whenFailure { error in - self.state.fail(error) + self.value.state.fail(error) } } @@ -268,7 +272,7 @@ extension FileContentStream { } } -public extension AsyncSequence where Element == ByteBuffer { +public extension AsyncSequence where Element == ByteBuffer, Self: Sendable { func splitIntoLines( dropTerminator: Bool = true, maximumAllowableBufferSize: Int = 1024 * 1024, diff --git a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift index 0bbce32..c656dee 100644 --- a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift +++ b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift @@ -17,22 +17,16 @@ import NIOExtras struct NIOAsyncPipeWriter where Chunks.Element == ByteBuffer { static func sinkSequenceInto( _ chunks: Chunks, - fileDescriptor fd: CInt, + takingOwnershipOfFD fd: CInt, eventLoop: EventLoop ) async throws { - // Just so we've got an input. - // (workaround for https://github.com/apple/swift-nio/issues/2444) - let deadPipe = Pipe() let channel = try await NIOPipeBootstrap(group: eventLoop) .channelOption(ChannelOptions.allowRemoteHalfClosure, value: true) .channelOption(ChannelOptions.autoRead, value: false) - .takingOwnershipOfDescriptors( - input: dup(deadPipe.fileHandleForReading.fileDescriptor), - output: dup(fd) + .takingOwnershipOfDescriptor( + output: fd ).get() channel.close(mode: .input, promise: nil) - try! deadPipe.fileHandleForReading.close() - try! deadPipe.fileHandleForWriting.close() defer { channel.close(promise: nil) } diff --git a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift index d9d0382..3fa236b 100644 --- a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift +++ b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift @@ -14,9 +14,9 @@ import AsyncAlgorithms import Logging import NIO -public struct OutputLoggingSettings { +public struct OutputLoggingSettings: Sendable { /// Where should the output line put to? - public enum WhereTo { + public enum WhereTo: Sendable { /// Put the output line into the logMessage itself. case logMessage @@ -55,14 +55,28 @@ public struct OutputLoggingSettings { public extension ProcessExecutor { /// Run child process, discarding all its output. - static func run( - group: EventLoopGroup, + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you + /// don't want to + /// provide input. + /// - logger: Where to log diagnostic messages to (default to no where) + static func run( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], standardInput: StandardInput, environment: [String: String] = [:], - logger: Logger - ) async throws -> ProcessExitReason { + logger: Logger = ProcessExecutor.disableLogging + ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { let p = Self( group: group, executable: executable, @@ -76,16 +90,31 @@ public extension ProcessExecutor { return try await p.run() } - /// Run child process, logging all its output line by line. - static func runLogOutput( - group: EventLoopGroup, + /// Run child process, logging all its output. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you + /// don't want to + /// provide input. + /// - logger: Where to log diagnostic and output messages to + /// - logConfiguration: How to log the output lines + static func runLogOutput( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], standardInput: StandardInput, environment: [String: String] = [:], logger: Logger, logConfiguration: OutputLoggingSettings - ) async throws -> ProcessExitReason { + ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { let exe = ProcessExecutor( group: group, executable: executable, @@ -124,17 +153,34 @@ public extension ProcessExecutor { } } - /// Run child process, process all its output (`stdout` and `stderr`) using a closure. - static func runProcessingOutput( - group: EventLoopGroup, + /// Run child process, processing all its output (`stdout` and `stderr`) using a closure. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you + /// don't want to + /// provide input. + /// - outputProcessor: The closure that'll be called for every chunk of output + /// - splitOutputIntoLines: Whether to call the closure with full lines (`true`) or arbitrary chunks of output + /// (`false`) + /// - logger: Where to log diagnostic and output messages to + static func runProcessingOutput( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], standardInput: StandardInput, outputProcessor: @escaping @Sendable (ProcessOutputStream, ByteBuffer) async throws -> (), splitOutputIntoLines: Bool = false, environment: [String: String] = [:], - logger: Logger - ) async throws -> ProcessExitReason { + logger: Logger = ProcessExecutor.disableLogging + ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { let exe = ProcessExecutor( group: group, executable: executable, @@ -195,8 +241,27 @@ public extension ProcessExecutor { case standardError(ByteBuffer?) } - static func runCollectingOutput( - group: EventLoopGroup, + /// Run child process, collecting its output (`stdout` and `stderr`) into memory. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you + /// don't want to + /// provide input. + /// - collectStandardOutput: If `true`, collect all of the child process' standard output into memory, discard if + /// `false` + /// - collectStandardError: If `true`, collect all of the child process' standard error into memory, discard if + /// `false` + /// - logger: Where to log diagnostic and output messages to + static func runCollectingOutput( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], standardInput: StandardInput, @@ -204,8 +269,8 @@ public extension ProcessExecutor { collectStandardError: Bool, perStreamCollectionLimitBytes: Int = 128 * 1024, environment: [String: String] = [:], - logger: Logger - ) async throws -> ProcessExitReasonAndOutput { + logger: Logger = ProcessExecutor.disableLogging + ) async throws -> ProcessExitReasonAndOutput where StandardInput.Element == ByteBuffer { let exe = ProcessExecutor( group: group, executable: executable, @@ -220,7 +285,7 @@ public extension ProcessExecutor { return try await withThrowingTaskGroup(of: ProcessExitInformationPiece.self) { group in group.addTask { if collectStandardOutput { - var output: ByteBuffer? + var output: ByteBuffer? = nil for try await chunk in await exe.standardOutput { guard (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes else { throw TooMuchProcessOutputError(stream: .standardOutput) @@ -235,7 +300,7 @@ public extension ProcessExecutor { group.addTask { if collectStandardError { - var output: ByteBuffer? + var output: ByteBuffer? = nil for try await chunk in await exe.standardError { guard (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes else { throw TooMuchProcessOutputError(stream: .standardError) @@ -267,3 +332,140 @@ public extension ProcessExecutor { } } } + +public extension ProcessExecutor { + /// Run child process, discarding all its output. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - logger: Where to log diagnostic messages to (default to no where) + static func run( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, + executable: String, + _ arguments: [String], + environment: [String: String] = [:], + logger: Logger = ProcessExecutor.disableLogging + ) async throws -> ProcessExitReason { + try await self.run( + group: group, + executable: executable, + arguments, + standardInput: EOFSequence(), + environment: environment, + logger: logger + ) + } + + /// Run child process, logging all its output. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - logger: Where to log diagnostic and output messages to + /// - logConfiguration: How to log the output lines + static func runLogOutput( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, + executable: String, + _ arguments: [String], + environment: [String: String] = [:], + logger: Logger, + logConfiguration: OutputLoggingSettings + ) async throws -> ProcessExitReason { + try await self.runLogOutput( + group: group, + executable: executable, + arguments, + standardInput: EOFSequence(), + environment: environment, + logger: logger, + logConfiguration: logConfiguration + ) + } + + /// Run child process, processing all its output (`stdout` and `stderr`) using a closure. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - outputProcessor: The closure that'll be called for every chunk of output + /// - splitOutputIntoLines: Whether to call the closure with full lines (`true`) or arbitrary chunks of output + /// (`false`) + /// - logger: Where to log diagnostic and output messages to + static func runProcessingOutput( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, + executable: String, + _ arguments: [String], + outputProcessor: @escaping @Sendable (ProcessOutputStream, ByteBuffer) async throws -> (), + splitOutputIntoLines: Bool = false, + environment: [String: String] = [:], + logger: Logger = ProcessExecutor.disableLogging + ) async throws -> ProcessExitReason { + try await self.runProcessingOutput( + group: group, + executable: executable, + arguments, + standardInput: EOFSequence(), + outputProcessor: outputProcessor, + splitOutputIntoLines: splitOutputIntoLines, + environment: environment, + logger: logger + ) + } + + /// Run child process, collecting its output (`stdout` and `stderr`) into memory. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - collectStandardOutput: If `true`, collect all of the child process' standard output into memory, discard if + /// `false` + /// - collectStandardError: If `true`, collect all of the child process' standard error into memory, discard if + /// `false` + /// - logger: Where to log diagnostic and output messages to + static func runCollectingOutput( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, + executable: String, + _ arguments: [String], + collectStandardOutput: Bool, + collectStandardError: Bool, + perStreamCollectionLimitBytes: Int = 128 * 1024, + environment: [String: String] = [:], + logger: Logger = ProcessExecutor.disableLogging + ) async throws -> ProcessExitReasonAndOutput { + try await self.runCollectingOutput( + group: group, + executable: executable, + arguments, standardInput: EOFSequence(), + collectStandardOutput: collectStandardOutput, + collectStandardError: collectStandardError, + perStreamCollectionLimitBytes: perStreamCollectionLimitBytes, + environment: environment, + logger: logger + ) + } +} diff --git a/Sources/AsyncProcess/ProcessExecutor.swift b/Sources/AsyncProcess/ProcessExecutor.swift index f0cce75..ac7f070 100644 --- a/Sources/AsyncProcess/ProcessExecutor.swift +++ b/Sources/AsyncProcess/ProcessExecutor.swift @@ -17,6 +17,8 @@ import NIO @_exported import struct SystemPackage.FileDescriptor +import Foundation + public struct ProcessOutputStream: Sendable & Hashable & CustomStringConvertible { enum Backing { case standardOutput @@ -40,11 +42,12 @@ public struct ProcessOutputStream: Sendable & Hashable & CustomStringConvertible } /// What to do with a given stream (`stdout`/`stderr`) in the spawned child process. -public struct ProcessOutput { +public struct ProcessOutput: Sendable { enum Backing { case discard case inherit - case fileDescriptor(FileDescriptor) + case fileDescriptorOwned(FileDescriptor) + case fileDescriptorShared(FileDescriptor) case stream } @@ -60,9 +63,22 @@ public struct ProcessOutput { /// Take ownership of `fd` and install that as the child process' file descriptor. /// + /// You may use the same `fd` with `.fileDescriptor(takingOwnershipOf: fd)` and `.fileDescriptor(sharing: fd)` at + /// the same time. For example to redirect standard output and standard error into the same file. + /// /// - warning: After passing a `FileDescriptor` to this method you _must not_ perform _any_ other operations on it. public static func fileDescriptor(takingOwnershipOf fd: FileDescriptor) -> Self { - .init(backing: .fileDescriptor(fd)) + .init(backing: .fileDescriptorOwned(fd)) + } + + /// Install `fd` as the child process' file descriptor, leaving the fd ownership with the user. + /// + /// You may use the same `fd` with `.fileDescriptor(takingOwnershipOf: fd)` and `.fileDescriptor(sharing: fd)` at + /// the same time. For example to redirect standard output and standard error into the same file. + /// + /// - note: `fd` is required to be closed by the user after the process has started running (and _not_ before). + public static func fileDescriptor(sharing fd: FileDescriptor) -> Self { + .init(backing: .fileDescriptorShared(fd)) } /// Stream this using the ``ProcessExecutor.standardOutput`` / ``ProcessExecutor.standardError`` ``AsyncStream``s. @@ -103,18 +119,42 @@ private struct OutputConsumptionState: OptionSet { } } +/// Type-erasing type analogous to `AnySequence` from the Swift standard library. +private struct AnyAsyncSequence: AsyncSequence { + private let iteratorFactory: () -> AsyncIterator + + init(_ asyncSequence: S) where S.Element == Element { + self.iteratorFactory = { + var iterator = asyncSequence.makeAsyncIterator() + return AsyncIterator { try await iterator.next() } + } + } + + struct AsyncIterator: AsyncIteratorProtocol { + let underlying: () async throws -> Element? + + func next() async throws -> Element? { + try await self.underlying() + } + } + + func makeAsyncIterator() -> AsyncIterator { + self.iteratorFactory() + } +} + /// Execute a sub-process. /// /// - warning: Currently, the default for `standardOutput` & `standardError` is ``ProcessOutput.stream`` which means /// you _must_ consume ``ProcessExecutor.standardOutput`` & ``ProcessExecutor.standardError``. If you prefer /// to not consume it, please set them to ``ProcessOutput.discard`` explicitly. -public actor ProcessExecutor where StandardInput.Element == ByteBuffer { +public final actor ProcessExecutor { private let logger: Logger private let group: EventLoopGroup private let executable: String private let arguments: [String] - private let environment: [String: String]? - private let standardInput: StandardInput + private let environment: [String: String] + private let standardInput: AnyAsyncSequence private let standardInputPipe: Pipe? private let standardOutputWriteHandle: FileHandle? private let standardErrorWriteHandle: FileHandle? @@ -122,6 +162,8 @@ public actor ProcessExecutor where Stan private let _standardError: ChunkSequence private let processIsRunningApproximation = ManagedAtomic(RunningStateApproximation.neverStarted.rawValue) private let processOutputConsumptionApproximation = ManagedAtomic(UInt8(0)) + private let ownsStandardOutputWriteHandle: Bool + private let ownsStandardErrorWriteHandle: Bool public var standardOutput: ChunkSequence { let afterValue = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( @@ -153,21 +195,42 @@ public actor ProcessExecutor where Stan case finishedExecuting = 3 } - public init( - group: EventLoopGroup, + /// Create a ``ProcessExecutor`` to spawn a single child process. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - standardInput: An `AsyncSequence` providing the standard input, pass `EOFSequence(of: ByteBuffer.self)` if you + /// don't want to + /// provide input. + /// - standardOutput: A description of what to do with the standard output of the child process (defaults to + /// ``ProcessOutput/stream`` + /// which requires to consume it via ``ProcessExecutor/standardOutput``. + /// - standardError: A description of what to do with the standard output of the child process (defaults to + /// ``ProcessOutput/stream`` + /// which requires to consume it via ``ProcessExecutor/standardError``. + /// - logger: Where to log diagnostic messages to (default to no where) + public init( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, executable: String, _ arguments: [String], - environment: [String: String]? = nil, + environment: [String: String] = [:], standardInput: StandardInput, standardOutput: ProcessOutput = .stream, standardError: ProcessOutput = .stream, - logger: Logger - ) { + logger: Logger = ProcessExecutor.disableLogging + ) where StandardInput.Element == ByteBuffer { self.group = group self.executable = executable self.environment = environment self.arguments = arguments - self.standardInput = standardInput + self.standardInput = AnyAsyncSequence(standardInput) self.logger = logger self.standardInputPipe = StandardInput.self == EOFSequence.self ? nil : Pipe() @@ -178,13 +241,23 @@ public actor ProcessExecutor where Stan with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed ) + self.ownsStandardOutputWriteHandle = true self.standardOutputWriteHandle = FileHandle(forWritingAtPath: "/dev/null") self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) - case let .fileDescriptor(fd): + case let .fileDescriptorOwned(fd): + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stdoutNotStreamed.rawValue, + ordering: .relaxed + ) + self.ownsStandardOutputWriteHandle = true + self.standardOutputWriteHandle = FileHandle(fileDescriptor: fd.rawValue) + self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case let .fileDescriptorShared(fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed ) + self.ownsStandardOutputWriteHandle = false self.standardOutputWriteHandle = FileHandle(fileDescriptor: fd.rawValue) self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) case .inherit: @@ -192,10 +265,12 @@ public actor ProcessExecutor where Stan with: OutputConsumptionState.stdoutNotStreamed.rawValue, ordering: .relaxed ) + self.ownsStandardOutputWriteHandle = true self.standardOutputWriteHandle = nil self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) case .stream: let (stdoutSequence, stdoutWriteHandle) = Self.makeWriteStream(group: group) + self.ownsStandardOutputWriteHandle = true self._standardOutput = stdoutSequence self.standardOutputWriteHandle = stdoutWriteHandle } @@ -206,13 +281,23 @@ public actor ProcessExecutor where Stan with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed ) + self.ownsStandardErrorWriteHandle = true self.standardErrorWriteHandle = FileHandle(forWritingAtPath: "/dev/null") self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) - case let .fileDescriptor(fd): + case let .fileDescriptorOwned(fd): + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stderrNotStreamed.rawValue, + ordering: .relaxed + ) + self.ownsStandardErrorWriteHandle = true + self.standardErrorWriteHandle = FileHandle(fileDescriptor: fd.rawValue) + self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case let .fileDescriptorShared(fd): _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed ) + self.ownsStandardErrorWriteHandle = false self.standardErrorWriteHandle = FileHandle(fileDescriptor: fd.rawValue) self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) case .inherit: @@ -220,10 +305,12 @@ public actor ProcessExecutor where Stan with: OutputConsumptionState.stderrNotStreamed.rawValue, ordering: .relaxed ) + self.ownsStandardErrorWriteHandle = true self.standardErrorWriteHandle = nil self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) case .stream: let (stdoutSequence, stdoutWriteHandle) = Self.makeWriteStream(group: group) + self.ownsStandardErrorWriteHandle = true self._standardError = stdoutSequence self.standardErrorWriteHandle = stdoutWriteHandle } @@ -245,7 +332,7 @@ public actor ProcessExecutor where Stan runningState == RunningStateApproximation.finishedExecuting.rawValue, """ Did you create a ProcessExecutor without run()ning it? \ - That's currently illegal: \ + That's currently illegal: illegal running state \(runningState) in deinit """ ) @@ -279,6 +366,17 @@ public actor ProcessExecutor where Stan ) } + /// Run the process. + /// + /// Calling `run()` will run the (sub-)process and return its ``ProcessExitReason`` when the execution completes. + /// Unless `standardOutput` and `standardError` were both set to ``ProcessOutput/discard``, + /// ``ProcessOutput/fileDescriptor(takingOwnershipOf:)`` or ``ProcessOutput/inherit`` you must consume the + /// `AsyncSequence`s + /// ``ProcessExecutor/standardOutput`` and ``ProcessExecutor/standardError`` concurrently to ``run()``ing the process. + /// + /// If you prefer to get the standard output and error in one (non-stremed) piece upon exit, consider the `static` + /// methods such as + /// ``ProcessExecutor/runCollectingOutput(group:executable:_:standardInput:collectStandardOutput:collectStandardError:perStreamCollectionLimitBytes:environment:logger:)``. public func run() async throws -> ProcessExitReason { let p = Process() #if canImport(Darwin) @@ -291,7 +389,7 @@ public actor ProcessExecutor where Stan p.executableURL = URL(fileURLWithPath: self.executable) #endif p.arguments = self.arguments - p.environment = self.environment ?? [:] + p.environment = self.environment p.standardInput = nil if let standardOutputWriteHandle = self.standardOutputWriteHandle { @@ -304,92 +402,135 @@ public actor ProcessExecutor where Stan } p.standardInput = self.standardInputPipe - @Sendable - func go() async throws -> ProcessExitReason { - try await withCheckedThrowingContinuation { - (continuation: CheckedContinuation) in - p.terminationHandler = { p in - self.logger.debug( - "finished running command", - metadata: [ - "termination-reason": p.terminationReason == .uncaughtSignal ? "signal" : "exit", - "termination-status": "\(p.terminationStatus)", - "pid": "\(p.processIdentifier)", - ] - ) - let (worked, original) = self.processIsRunningApproximation.compareExchange( - expected: RunningStateApproximation.running.rawValue, - desired: RunningStateApproximation.finishedExecuting.rawValue, - ordering: .relaxed - ) - precondition(worked, "illegal running state \(original)") + let (terminationStreamConsumer, terminationStreamProducer) = AsyncStream.justMakeIt( + elementType: ProcessExitReason.self + ) - if p.terminationReason == .uncaughtSignal { - continuation.resume(returning: .signal(p.terminationStatus)) - } else { - continuation.resume(returning: .exit(p.terminationStatus)) - } - } - do { - let (worked, original) = self.processIsRunningApproximation.compareExchange( - expected: RunningStateApproximation.neverStarted.rawValue, - desired: RunningStateApproximation.running.rawValue, - ordering: .relaxed - ) - precondition(worked, "illegal running state \(original)") - try p.run() - self.logger.debug( - "running command", - metadata: [ - "executable": "\(self.executable)", - "arguments": "\(self.arguments)", - "pid": "\(p.processIdentifier)", - ] - ) - } catch { - continuation.resume(throwing: error) - } + p.terminationHandler = { p in + self.logger.debug( + "finished running command", + metadata: [ + "executable": "\(self.executable)", + "arguments": .array(self.arguments.map { .string($0) }), + "termination-reason": p.terminationReason == .uncaughtSignal ? "signal" : "exit", + "termination-status": "\(p.terminationStatus)", + "pid": "\(p.processIdentifier)", + ] + ) + let (worked, original) = self.processIsRunningApproximation.compareExchange( + expected: RunningStateApproximation.running.rawValue, + desired: RunningStateApproximation.finishedExecuting.rawValue, + ordering: .relaxed + ) + precondition(worked, "illegal running state \(original)") - try! self.standardInputPipe?.fileHandleForReading.close() // Must work. - try! self.standardOutputWriteHandle?.close() // Must work. - try! self.standardErrorWriteHandle?.close() // Must work. + if p.terminationReason == .uncaughtSignal { + terminationStreamProducer.yield(.signal(p.terminationStatus)) + } else { + terminationStreamProducer.yield(.exit(p.terminationStatus)) } + terminationStreamProducer.finish() + } + + let (worked, original) = self.processIsRunningApproximation.compareExchange( + expected: RunningStateApproximation.neverStarted.rawValue, + desired: RunningStateApproximation.running.rawValue, + ordering: .relaxed + ) + precondition( + worked, + "Did you run() twice? That's currently not allowed: illegal running state \(original)" + ) + do { + try p.run() + } catch { + let (worked, original) = self.processIsRunningApproximation.compareExchange( + expected: RunningStateApproximation.running.rawValue, + desired: RunningStateApproximation.finishedExecuting.rawValue, + ordering: .relaxed + ) + terminationStreamProducer.finish() // The termination handler will never have fired. + assert(worked) // We just set it to running above, shouldn't be able to race (no `await`). + assert(original == RunningStateApproximation.running.rawValue) // We compare-and-exchange it. + throw error + } + + // At this point, the process is running, we should therefore have a process ID. + assert(p.processIdentifier != 0) + self.logger.debug( + "running command", + metadata: [ + "executable": "\(self.executable)", + "arguments": "\(self.arguments)", + "pid": "\(p.processIdentifier)", + ] + ) + + try! self.standardInputPipe?.fileHandleForReading.close() // Must work. + if self.ownsStandardOutputWriteHandle { + try! self.standardOutputWriteHandle?.close() // Must work. + } + if self.ownsStandardErrorWriteHandle { + try! self.standardErrorWriteHandle?.close() // Must work. } @Sendable func cancel() { - guard p.processIdentifier != 0 else { - self.logger.warning("leaking Process \(p) because it hasn't been started yet") + let childPid = p.processIdentifier + guard childPid != 0 else { + self.logger.warning( + "leaking Process because it hasn't got a process identifier (likely a Foundation.Process bug)", + metadata: ["process": "\(p)"] + ) return } - self.logger.info("terminating process", metadata: ["pid": "\(p.processIdentifier)"]) - #if os(Linux) - // workaround: https://github.com/apple/swift-corelibs-foundation/issues/4772 if p.isRunning { - kill(p.processIdentifier, SIGKILL) + self.logger.info("terminating process", metadata: ["pid": "\(childPid)"]) + kill(childPid, SIGKILL) + } else { + self.logger.debug("child process already dead", metadata: ["pid-if-available": "\(childPid)"]) + } + } + + @Sendable + func waitForChildToExit() async -> ProcessExitReason { + // We do need for the child to exit (and it will, we SIGKILL'd it) + await withUncancelledTask(returning: ProcessExitReason.self) { + var iterator = terminationStreamConsumer.makeAsyncIterator() + + // Let's wait for the process to finish (it will) + guard let terminationStatus = await iterator.next() else { + fatalError("terminationStream finished without giving us a result") + } + + // Just double check that `finish()` has immediately been called too. + let thisMustBeNil = await iterator.next() + precondition(thisMustBeNil == nil) + return terminationStatus } - #else - p.terminate() - #endif } return try await withThrowingTaskGroup(of: ProcessExitReason?.self, returning: ProcessExitReason.self) { group in group.addTask { - try await withTaskCancellationHandler(operation: go, onCancel: cancel) + await withTaskCancellationHandler(operation: waitForChildToExit, onCancel: cancel) } group.addTask { if let stdinPipe = self.standardInputPipe { - try await NIOAsyncPipeWriter.sinkSequenceInto( + let fdForNIO = dup(stdinPipe.fileHandleForWriting.fileDescriptor) + try! stdinPipe.fileHandleForWriting.close() + + try await NIOAsyncPipeWriter>.sinkSequenceInto( self.standardInput, - fileDescriptor: stdinPipe.fileHandleForWriting.fileDescriptor, + takingOwnershipOfFD: fdForNIO, eventLoop: self.group.any() ) } return nil } - var exitReason: ProcessExitReason? + var exitReason: ProcessExitReason? = nil + // cannot fix this warning yet (rdar://113844171) while let result = try await group.next() { if let result { exitReason = result @@ -399,3 +540,90 @@ public actor ProcessExecutor where Stan } } } + +public extension ProcessExecutor { + /// A globally shared, singleton `EventLoopGroup` that's suitable for ``ProcessExecutor``. + /// + /// At present this is always `MultiThreadedEventLoopGroup.singleton`. + static var defaultEventLoopGroup: any EventLoopGroup { + globalDefaultEventLoopGroup + } + + /// The default `Logger` for ``ProcessExecutor`` that's used if you do not override it. It won't log anything. + static var disableLogging: Logger { + globalDisableLoggingLogger + } +} + +public extension ProcessExecutor { + /// Create a ``ProcessExecutor`` to spawn a single child process. + /// + /// - note: The `environment` defaults to the empty environment. + /// + /// - Parameters: + /// - group: The `EventLoopGroup` to run the I/O on + /// - executable: The full path to the executable to spawn + /// - arguments: The arguments to the executable (not including `argv[0]`) + /// - environment: The environment variables to pass to the child process. + /// If you want to inherit the calling process' environment into the child, specify + /// `ProcessInfo.processInfo.environment` + /// - standardOutput: A description of what to do with the standard output of the child process (defaults to + /// ``ProcessOutput/stream`` + /// which requires to consume it via ``ProcessExecutor/standardOutput``. + /// - standardError: A description of what to do with the standard output of the child process (defaults to + /// ``ProcessOutput/stream`` + /// which requires to consume it via ``ProcessExecutor/standardError``. + /// - logger: Where to log diagnostic messages to (default to no where) + init( + group: EventLoopGroup = ProcessExecutor.defaultEventLoopGroup, + executable: String, + _ arguments: [String], + environment: [String: String] = [:], + standardOutput: ProcessOutput = .stream, + standardError: ProcessOutput = .stream, + logger: Logger = ProcessExecutor.disableLogging + ) { + self.init( + group: group, + executable: executable, + arguments, + environment: environment, + standardInput: EOFSequence(), + standardOutput: standardOutput, + standardError: standardError, + logger: logger + ) + } +} + +private let globalDefaultEventLoopGroup: MultiThreadedEventLoopGroup = .singleton +private let globalDisableLoggingLogger: Logger = .init( + label: "swift-async-process -- never logs", + factory: { _ in SwiftLogNoOpLogHandler() } +) + +extension AsyncStream { + static func justMakeIt(elementType: Element.Type = Element.self) -> ( + consumer: AsyncStream, producer: AsyncStream.Continuation + ) { + var _producer: AsyncStream.Continuation? + let stream = AsyncStream { producer in + _producer = producer + } + + return (stream, _producer!) + } +} + +func withUncancelledTask( + returning: R.Type = R.self, + _ body: @Sendable @escaping () async -> R +) async -> R { + // This looks unstructured but it isn't, please note that we `await` `.value` of this task. + // The reason we need this separate `Task` is that in general, we cannot assume that code performs to our + // expectations if the task we run it on is already cancelled. However, in some cases we need the code to + // run regardless -- even if our task is already cancelled. Therefore, we create a new, uncancelled task here. + await Task { + await body() + }.value +} diff --git a/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator+Download.swift b/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator+Download.swift index a21b075..0306d05 100644 --- a/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator+Download.swift +++ b/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator+Download.swift @@ -157,17 +157,12 @@ extension HTTPClient { private func downloadUbuntuPackagesList( from url: String, isVerbose: Bool - ) async throws -> String { - guard let packages = try await get(url: url).get().body else { - throw FileOperationError.downloadFailed(URL(string: url)!) + ) async throws -> String? { + guard let packages = try await get(url: url).get().body?.unzip(isVerbose: isVerbose) else { + throw FileOperationError.downloadFailed(url) } - var result = "" - for try await chunk in try packages.unzip(isVerbose: isVerbose) { - result.append(String(data: chunk, encoding: .utf8)!) - } - - return result + return String(buffer: packages) } func parseUbuntuPackagesList( @@ -190,10 +185,12 @@ extension HTTPClient { )/Packages.gz """ - let packages = try await downloadUbuntuPackagesList( + guard let packages = try await downloadUbuntuPackagesList( from: packagesListURL, isVerbose: isVerbose - ) + ) else { + throw GeneratorError.ubuntuPackagesDecompressionFailure + } let packageRef = Reference(Substring.self) let pathRef = Reference(Substring.self) diff --git a/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator.swift b/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator.swift index 70df257..2d2300a 100644 --- a/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator.swift +++ b/Sources/SwiftSDKGenerator/Generator/SwiftSDKGenerator.swift @@ -222,7 +222,7 @@ public actor SwiftSDKGenerator { } func gunzip(file: FilePath, into directoryPath: FilePath) async throws { - try await Shell.run("gzip -d \(file)", currentDirectory: directoryPath, shouldLogCommands: self.isVerbose) + try await Shell.run(#"cd "\#(directoryPath)" && gzip -d "\#(file)""#, shouldLogCommands: self.isVerbose) } func untar( @@ -236,8 +236,7 @@ public actor SwiftSDKGenerator { "" } try await Shell.run( - "tar \(stripComponentsOption) -xzf \(file)", - currentDirectory: directoryPath, + #"tar -C "\#(directoryPath)" \#(stripComponentsOption) -xzf \#(file)"#, shouldLogCommands: self.isVerbose ) } @@ -245,11 +244,11 @@ public actor SwiftSDKGenerator { func unpack(debFile: FilePath, into directoryPath: FilePath) async throws { let isVerbose = self.isVerbose try await self.inTemporaryDirectory { _, tmp in - try await Shell.run("ar -x \(debFile)", currentDirectory: tmp, shouldLogCommands: isVerbose) + try await Shell.run(#"cd "\#(tmp)" && ar -x "\#(debFile)""#, shouldLogCommands: isVerbose) + try await print(Shell.readStdout("ls \(tmp)")) try await Shell.run( - "tar -xf \(tmp)/data.tar.*", - currentDirectory: directoryPath, + #"tar -C "\#(directoryPath)" -xf "\#(tmp)"/data.tar.*"#, shouldLogCommands: isVerbose ) } @@ -258,10 +257,9 @@ public actor SwiftSDKGenerator { func unpack(pkgFile: FilePath, into directoryPath: FilePath) async throws { let isVerbose = self.isVerbose try await self.inTemporaryDirectory { _, tmp in - try await Shell.run("xar -xf \(pkgFile)", currentDirectory: tmp, shouldLogCommands: isVerbose) + try await Shell.run(#"xar -C "\#(tmp)" -xf "\#(pkgFile)""#, shouldLogCommands: isVerbose) try await Shell.run( - "cat \(tmp)/*.pkg/Payload | gunzip -cd | cpio -i", - currentDirectory: directoryPath, + #"cat "\#(tmp)"/*.pkg/Payload | gunzip -cd | (cd "\#(directoryPath)" && cpio -i)"#, shouldLogCommands: isVerbose ) } diff --git a/Sources/SwiftSDKGenerator/Queries/CMakeBuildQuery.swift b/Sources/SwiftSDKGenerator/Queries/CMakeBuildQuery.swift index b100955..643ac80 100644 --- a/Sources/SwiftSDKGenerator/Queries/CMakeBuildQuery.swift +++ b/Sources/SwiftSDKGenerator/Queries/CMakeBuildQuery.swift @@ -23,13 +23,16 @@ struct CMakeBuildQuery { func run(engine: Engine) async throws -> FilePath { try await Shell.run( """ - cmake -B build -G Ninja -S llvm -DCMAKE_BUILD_TYPE=Release \(self.options) + cmake -S "\(self.sourcesDirectory)"/llvm -B "\( + self + .sourcesDirectory + )"/build -G Ninja -DCMAKE_BUILD_TYPE=Release \(self.options) """, - currentDirectory: self.sourcesDirectory + logStdout: true ) let buildDirectory = self.sourcesDirectory.appending("build") - try await Shell.run("ninja \(FilePath(".").appending(self.outputBinarySubpath))", currentDirectory: buildDirectory) + try await Shell.run(#"ninja -C "\#(buildDirectory)""#, logStdout: true) return self.outputBinarySubpath.reduce(into: buildDirectory) { $0.append($1) } } diff --git a/Sources/SwiftSDKGenerator/SystemUtils/ByteBuffer+Utils.swift b/Sources/SwiftSDKGenerator/SystemUtils/ByteBuffer+Utils.swift index fed24c4..693808b 100644 --- a/Sources/SwiftSDKGenerator/SystemUtils/ByteBuffer+Utils.swift +++ b/Sources/SwiftSDKGenerator/SystemUtils/ByteBuffer+Utils.swift @@ -10,16 +10,23 @@ // //===----------------------------------------------------------------------===// +import AsyncProcess import Foundation import NIOCore import NIOFoundationCompat public extension ByteBuffer { - func unzip(isVerbose: Bool) throws -> AsyncThrowingStream { - let gzip = try Shell("gzip -cd", shouldLogCommands: isVerbose) - gzip.stdin.write(Data(buffer: self)) - try gzip.stdin.close() + func unzip(isVerbose: Bool) async throws -> ByteBuffer? { + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/usr/bin/gzip", ["-cd"], + standardInput: [self].async, + collectStandardOutput: true, + collectStandardError: false, + perStreamCollectionLimitBytes: 10 * 1024 * 1024 + ) - return gzip.stdout + try result.exitReason.throwIfNonZero() + + return result.standardOutput } } diff --git a/Sources/SwiftSDKGenerator/SystemUtils/FileOperationError.swift b/Sources/SwiftSDKGenerator/SystemUtils/FileOperationError.swift index 3a69729..305239d 100644 --- a/Sources/SwiftSDKGenerator/SystemUtils/FileOperationError.swift +++ b/Sources/SwiftSDKGenerator/SystemUtils/FileOperationError.swift @@ -17,8 +17,7 @@ import struct SystemPackage.FilePath public enum FileOperationError: Error { case downloadFailed(URL, HTTPResponseStatus) case directoryCreationFailed(FilePath) - case downloadFailed(URL) + case downloadFailed(String) case unknownArchiveFormat(String?) - case nonZeroExitCode(Int32, CommandInfo) case symlinkFixupFailed(source: FilePath, destination: FilePath) } diff --git a/Sources/SwiftSDKGenerator/SystemUtils/GeneratorError.swift b/Sources/SwiftSDKGenerator/SystemUtils/GeneratorError.swift index bbf532c..0705d0b 100644 --- a/Sources/SwiftSDKGenerator/SystemUtils/GeneratorError.swift +++ b/Sources/SwiftSDKGenerator/SystemUtils/GeneratorError.swift @@ -15,6 +15,9 @@ import enum NIOHTTP1.HTTPResponseStatus import struct SystemPackage.FilePath enum GeneratorError: Error { + case noProcessOutput(String) + case unhandledChildProcessSignal(CInt, CommandInfo) + case nonZeroExitCode(CInt, CommandInfo) case unknownLinuxDistribution(name: String, version: String?) case unknownMacOSVersion(String) case unknownCPUArchitecture(String) @@ -22,12 +25,19 @@ enum GeneratorError: Error { case distributionSupportsOnlyDockerGenerator(LinuxDistribution) case fileDoesNotExist(FilePath) case fileDownloadFailed(URL, HTTPResponseStatus) + case ubuntuPackagesDecompressionFailure case ubuntuPackagesParsingFailure(expectedPackages: Int, actual: Int) } extension GeneratorError: CustomStringConvertible { var description: String { switch self { + case let .noProcessOutput(process): + "Failed to read standard output of a launched process: \(process)" + case let .unhandledChildProcessSignal(signal, commandInfo): + "Process launched with \(commandInfo) finished due to signal \(signal)" + case let .nonZeroExitCode(exitCode, commandInfo): + "Process launched with \(commandInfo) failed with exit code \(exitCode)" case let .unknownLinuxDistribution(name, version): "Linux distribution `\(name)`\(version.map { " with version \($0)" } ?? "")` is not supported by this generator." case let .unknownMacOSVersion(version): @@ -45,6 +55,8 @@ extension GeneratorError: CustomStringConvertible { "Expected to find a file at path `\(filePath)`." case let .fileDownloadFailed(url, status): "File could not be downloaded from a URL `\(url)`, the server returned status `\(status)`." + case .ubuntuPackagesDecompressionFailure: + "Failed to decompress the list of Ubuntu packages" case let .ubuntuPackagesParsingFailure(expected, actual): "Failed to parse Ubuntu packages manifest, expected \(expected), found \(actual) packages." } diff --git a/Sources/SwiftSDKGenerator/SystemUtils/Shell.swift b/Sources/SwiftSDKGenerator/SystemUtils/Shell.swift index f9bd654..de62dfb 100644 --- a/Sources/SwiftSDKGenerator/SystemUtils/Shell.swift +++ b/Sources/SwiftSDKGenerator/SystemUtils/Shell.swift @@ -10,99 +10,74 @@ // //===----------------------------------------------------------------------===// -import Foundation +import AsyncProcess +import class Foundation.ProcessInfo import struct SystemPackage.FilePath public struct CommandInfo: Sendable { let command: String - let currentDirectory: FilePath? let file: String let line: Int } -final class Shell { - // FIXME: using Foundation's `Process` under the hood might not work on Linux due to these bugs: - // https://github.com/apple/swift-corelibs-foundation/issues/3275 - // https://github.com/apple/swift-corelibs-foundation/issues/3276 - private let process: Process +struct Shell { + private let process: ProcessExecutor private let commandInfo: CommandInfo + private let logStdout: Bool + private let logStderr: Bool - /// Writable handle to the standard input of the command. - let stdin: FileHandle - - /// Readable stream of data chunks that the running command writes to the standard output I/O - /// handle. - let stdout: AsyncThrowingStream - - /// Readable stream of data chunks that the running command writes to the standard error I/O - /// handle. - let stderr: AsyncThrowingStream - - init( + private init( _ command: String, - currentDirectory: FilePath? = nil, - shouldDisableIOStreams: Bool = false, shouldLogCommands: Bool, + logStdout: Bool = false, + logStderr: Bool = true, file: String = #file, line: Int = #line ) throws { + self.process = ProcessExecutor( + executable: "/bin/sh", + ["-c", command], + environment: ProcessInfo.processInfo.environment, + standardOutput: logStdout ? .stream : .discard, + standardError: logStderr ? .stream : .discard + ) self.commandInfo = CommandInfo( command: command, - currentDirectory: currentDirectory, file: file, line: line ) - let process = Process() - - if let currentDirectory { - process.currentDirectoryURL = URL(fileURLWithPath: currentDirectory.string) - } - process.executableURL = URL(string: "file:///bin/sh") - process.arguments = ["-c", command] - - let stdinPipe = Pipe() - self.stdin = stdinPipe.fileHandleForWriting - process.standardInput = stdinPipe - - if shouldDisableIOStreams { - self.stdout = .init { $0.finish() } - self.stderr = .init { $0.finish() } - } else { - self.stdout = .init(process, pipeKeyPath: \.standardOutput, commandInfo: self.commandInfo) - self.stderr = .init(process, pipeKeyPath: \.standardError, commandInfo: self.commandInfo) - } - - self.process = process + self.logStdout = logStdout + self.logStderr = logStderr if shouldLogCommands { print(command) } - - try process.run() - } - - private func check(exitCode: Int32) throws { - guard exitCode == 0 else { - throw FileOperationError.nonZeroExitCode(exitCode, self.commandInfo) - } } /// Wait for the process to exit in a non-blocking way. - func waitUntilExit() async throws { - guard self.process.isRunning else { - return try self.check(exitCode: self.process.terminationStatus) - } - - try await withTaskCancellationHandler { - let exitCode = await withCheckedContinuation { continuation in - self.process.terminationHandler = { - continuation.resume(returning: $0.terminationStatus) + private func waitUntilExit() async throws { + let result = try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + if self.logStdout { + try await self.process.standardOutput.printChunksAsStrings() } } - - try check(exitCode: exitCode) - } onCancel: { - self.process.interrupt() + group.addTask { + if self.logStderr { + try await self.process.standardError.printChunksAsStrings() + } + } + return try await self.process.run() + } + do { + try result.throwIfNonZero() + } catch { + switch result { + case let .exit(code): + throw GeneratorError.nonZeroExitCode(code, self.commandInfo) + case .signal: + fatalError() + } } } @@ -112,16 +87,17 @@ final class Shell { /// - currentDirectory: current working directory for the command. static func run( _ command: String, - currentDirectory: FilePath? = nil, shouldLogCommands: Bool = false, + logStdout: Bool = false, + logStderr: Bool = true, file: String = #file, line: Int = #line ) async throws { try await Shell( command, - currentDirectory: currentDirectory, - shouldDisableIOStreams: true, shouldLogCommands: shouldLogCommands, + logStdout: logStdout, + logStderr: logStderr, file: file, line: line ) @@ -130,61 +106,35 @@ final class Shell { static func readStdout( _ command: String, - currentDirectory: FilePath? = nil, shouldLogCommands: Bool = false, file: String = #file, line: Int = #line ) async throws -> String { - let process = try Shell( - command, - currentDirectory: currentDirectory, - shouldDisableIOStreams: false, - shouldLogCommands: shouldLogCommands, - file: file, - line: line + if shouldLogCommands { + print(command) + } + + let result = try await ProcessExecutor.runCollectingOutput( + executable: "/bin/sh", + ["-c", command], + collectStandardOutput: true, + collectStandardError: false, + perStreamCollectionLimitBytes: 10 * 1024 * 1024, + environment: ProcessInfo.processInfo.environment ) - try await process.waitUntilExit() + try result.exitReason.throwIfNonZero() - var output = "" - for try await chunk in process.stdout { - output.append(String(data: chunk, encoding: .utf8)!) - } - return output + guard let stdOutBuffer = result.standardOutput else { throw GeneratorError.noProcessOutput(command) } + + return String(buffer: stdOutBuffer) } } -@available(*, unavailable) -extension Shell: Sendable {} - -private extension AsyncThrowingStream where Element == Data, Failure == any Error { - init( - _ process: Process, - pipeKeyPath: ReferenceWritableKeyPath, - commandInfo: CommandInfo - ) { - self.init { continuation in - let pipe = Pipe() - pipe.fileHandleForReading.readabilityHandler = { [unowned pipe] fileHandle in - let data = fileHandle.availableData - if !data.isEmpty { - continuation.yield(data) - } else { - if !process.isRunning && process.terminationStatus != 0 { - continuation.finish( - throwing: FileOperationError.nonZeroExitCode(process.terminationStatus, commandInfo) - ) - } else { - continuation.finish() - } - - // Clean up the handler to prevent repeated calls and continuation finishes for the same - // process. - pipe.fileHandleForReading.readabilityHandler = nil - } - } - - process[keyPath: pipeKeyPath] = pipe +extension ChunkSequence { + func printChunksAsStrings() async throws { + for try await line in self.splitIntoLines(dropTerminator: true) { + print(String(buffer: line)) } } } diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift index 9db35c0..2effc69 100644 --- a/Tests/AsyncProcessTests/IntegrationTests.swift +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -105,7 +105,7 @@ final class IntegrationTests: XCTestCase { ) try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in group.addTask { - var lastLine: String? + var lastLine: String? = nil for try await line in await exe.standardOutput.splitIntoLines(dropTerminator: false) { if line.readableBytes > 72 { lastLine = String(buffer: line) @@ -145,13 +145,7 @@ final class IntegrationTests: XCTestCase { switch furtherReturn { case let .some(result): // the `exe.run()` task - #if os(Linux) - // Because of the workaround for https://github.com/apple/swift-corelibs-foundation/issues/4772 , - // we can't use `Process.terminate()` on Linux... XCTAssertEqual(.signal(SIGKILL), result) - #else - XCTAssertEqual(.signal(SIGTERM), result) - #endif case .none: // stderr task () @@ -269,6 +263,38 @@ final class IntegrationTests: XCTestCase { XCTAssertEqual("", String(buffer: all.standardError)) } + func testSimplePipe() async throws { + self.logger.logLevel = .debug + let echo = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "echo foo;"], + standardInput: EOFSequence(), + standardError: .discard, + logger: self.logger + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await echo.run().throwIfNonZero() + } + group.addTask { + let echoOutput = await echo.standardOutput + + let sed = ProcessExecutor( + group: self.group, + executable: "/usr/bin/tr", + ["[:lower:]", "[:upper:]"], + standardInput: echoOutput, + logger: self.logger + ) + let output = try await sed.runGetAllOutput() + XCTAssertEqual(String(buffer: output.standardOutput), "FOO\n") + } + try await group.waitForAll() + } + } + func testStressTestVeryLittleOutput() async throws { for _ in 0..<128 { let exe = ProcessExecutor( @@ -498,7 +524,7 @@ final class IntegrationTests: XCTestCase { recordedLogger.logLevel = .info // don't give us the normal messages recordedLogger[metadataKey: "yo"] = "hey" - try await ProcessExecutor.runLogOutput( + try await ProcessExecutor.runLogOutput( group: self.group, executable: "/bin/sh", ["-c", "echo 1; echo >&2 2; echo 3; echo >&2 4; echo 5; echo >&2 6; echo 7; echo >&2 8;"], @@ -520,7 +546,7 @@ final class IntegrationTests: XCTestCase { recordedLogger.logLevel = .info // don't give us the normal messages recordedLogger[metadataKey: "yo"] = "hey" - try await ProcessExecutor.runLogOutput( + try await ProcessExecutor.runLogOutput( group: self.group, executable: "/bin/sh", ["-c", "echo 1; echo >&2 2; echo 3; echo >&2 4; echo 5; echo >&2 6; echo 7; echo >&2 8;"], @@ -537,7 +563,7 @@ final class IntegrationTests: XCTestCase { func testProcessOutputByLine() async throws { let collectedLines: NIOLockedValueBox<[(String, String)]> = NIOLockedValueBox([]) - try await ProcessExecutor.runProcessingOutput( + try await ProcessExecutor.runProcessingOutput( group: self.group, executable: "/bin/sh", [ @@ -564,7 +590,7 @@ final class IntegrationTests: XCTestCase { func testProcessOutputInChunks() async throws { let collectedBytes = ManagedAtomic(0) - try await ProcessExecutor.runProcessingOutput( + try await ProcessExecutor.runProcessingOutput( group: self.group, executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "count=20", "status=none"], @@ -581,7 +607,7 @@ final class IntegrationTests: XCTestCase { } func testBasicRunMethodWorks() async throws { - try await ProcessExecutor.run( + try await ProcessExecutor.run( group: self.group, executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "count=100"], standardInput: EOFSequence(), @@ -590,7 +616,7 @@ final class IntegrationTests: XCTestCase { } func testCollectJustStandardOutput() async throws { - let allInfo = try await ProcessExecutor.runCollectingOutput( + let allInfo = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "count=1"], standardInput: EOFSequence(), @@ -605,7 +631,7 @@ final class IntegrationTests: XCTestCase { } func testCollectJustStandardError() async throws { - let allInfo = try await ProcessExecutor.runCollectingOutput( + let allInfo = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/sh", ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=1 status=none"], standardInput: EOFSequence(), @@ -620,7 +646,7 @@ final class IntegrationTests: XCTestCase { } func testCollectNothing() async throws { - let allInfo = try await ProcessExecutor.runCollectingOutput( + let allInfo = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/sh", ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=100 status=none"], standardInput: EOFSequence(), @@ -635,7 +661,7 @@ final class IntegrationTests: XCTestCase { } func testCollectStdOutAndErr() async throws { - let allInfo = try await ProcessExecutor.runCollectingOutput( + let allInfo = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/sh", [ @@ -658,7 +684,7 @@ final class IntegrationTests: XCTestCase { func testTooMuchToCollectStdout() async throws { do { - let result = try await ProcessExecutor.runCollectingOutput( + let result = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "count=1"], standardInput: EOFSequence(), @@ -669,17 +695,17 @@ final class IntegrationTests: XCTestCase { ) XCTFail("should've thrown but got result: \(result)") } catch { - XCTAssertTrue(error is ProcessExecutor>.TooMuchProcessOutputError) + XCTAssertTrue(error is ProcessExecutor.TooMuchProcessOutputError) XCTAssertEqual( ProcessOutputStream.standardOutput, - (error as? ProcessExecutor.TooMuchProcessOutputError)?.stream + (error as? ProcessExecutor.TooMuchProcessOutputError)?.stream ) } } func testTooMuchToCollectStderr() async throws { do { - let result = try await ProcessExecutor.runCollectingOutput( + let result = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "of=/dev/stderr", "count=1", "status=none"], @@ -691,16 +717,16 @@ final class IntegrationTests: XCTestCase { ) XCTFail("should've thrown but got result: \(result)") } catch { - XCTAssertTrue(error is ProcessExecutor>.TooMuchProcessOutputError) + XCTAssertTrue(error is ProcessExecutor.TooMuchProcessOutputError) XCTAssertEqual( ProcessOutputStream.standardError, - (error as? ProcessExecutor.TooMuchProcessOutputError)?.stream + (error as? ProcessExecutor.TooMuchProcessOutputError)?.stream ) } } func testCollectEmptyStringFromStdoutAndErr() async throws { - let allInfo = try await ProcessExecutor.runCollectingOutput( + let allInfo = try await ProcessExecutor.runCollectingOutput( group: self.group, executable: "/bin/sh", ["-c", ""], @@ -715,6 +741,247 @@ final class IntegrationTests: XCTestCase { XCTAssertEqual(ByteBuffer(), allInfo.standardOutput) } + func testExecutableDoesNotExist() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/dev/null/does/not/exist", + [], + standardInput: EOFSequence(), + standardOutput: .discard, + standardError: .discard, + logger: self.logger + ) + do { + let result = try await exe.run() + XCTFail("got result for bad executable: \(result)") + } catch { + XCTAssertEqual(NSCocoaErrorDomain, (error as NSError).domain) + #if canImport(Darwin) + // https://github.com/apple/swift-corelibs-foundation/issues/4810 + XCTAssertEqual(NSFileNoSuchFileError, (error as NSError).code) + #endif + } + } + + func testAPIsWithoutELGOrLoggerArguments() async throws { + let exe = ProcessExecutor( + executable: "/bin/sh", ["-c", "true"], + standardInput: EOFSequence(), + standardOutput: .discard, + standardError: .discard + ) + try await exe.run().throwIfNonZero() + + try await ProcessExecutor.run( + executable: "/bin/sh", ["-c", "true"], + standardInput: EOFSequence() + ).throwIfNonZero() + + try await ProcessExecutor.runCollectingOutput( + executable: "/bin/sh", + ["-c", "true"], + standardInput: EOFSequence(), + collectStandardOutput: false, + collectStandardError: false + ).exitReason.throwIfNonZero() + + try await ProcessExecutor.runProcessingOutput( + executable: "/bin/sh", + ["-c", "true"], + standardInput: EOFSequence() + ) { _, _ in + }.throwIfNonZero() + + try await ProcessExecutor.runLogOutput( + executable: "/bin/sh", + ["-c", "true"], + standardInput: EOFSequence(), + logger: self.logger, + logConfiguration: .init(logLevel: .critical, to: .logMessage) + ).throwIfNonZero() + } + + func testAPIsWithoutELGStandardInputOrLoggerArguments() async throws { + let exe = ProcessExecutor( + executable: "/bin/sh", ["-c", "true"], + standardOutput: .discard, + standardError: .discard + ) + try await exe.run().throwIfNonZero() + + let exeStream = ProcessExecutor(executable: "/bin/sh", ["-c", "true"]) + #if compiler(>=5.8) + async let stdout = Array(exeStream.standardOutput) + async let stderr = Array(exeStream.standardError) + #else + async let stdout = { + var chunks: [ByteBuffer] = [] + for try await chunk in await exeStream.standardOutput { + chunks.append(chunk) + } + return chunks + }() + async let stderr = { + var chunks: [ByteBuffer] = [] + for try await chunk in await exeStream.standardError { + chunks.append(chunk) + } + return chunks + }() + #endif + try await exeStream.run().throwIfNonZero() + let out = try await stdout + let err = try await stderr + XCTAssertEqual([], out) + XCTAssertEqual([], err) + + try await ProcessExecutor.run(executable: "/bin/sh", ["-c", "true"]).throwIfNonZero() + + try await ProcessExecutor.runCollectingOutput( + executable: "/bin/sh", + ["-c", "true"], + collectStandardOutput: false, + collectStandardError: false + ).exitReason.throwIfNonZero() + + try await ProcessExecutor.runProcessingOutput(executable: "/bin/sh", ["-c", "true"]) { _, _ in + }.throwIfNonZero() + + try await ProcessExecutor.runLogOutput( + executable: "/bin/sh", + ["-c", "true"], + logger: self.logger, + logConfiguration: .init(logLevel: .critical, to: .logMessage) + ).throwIfNonZero() + } + + func testStdoutAndStderrToSameFileWorks() async throws { + let tempDir = URL(fileURLWithPath: NSTemporaryDirectory()) + .appendingPathComponent("AsyncProcessTests-\(getpid())-\(UUID())") + try FileManager.default.createDirectory(at: tempDir, withIntermediateDirectories: false) + defer { + XCTAssertNoThrow(try FileManager.default.removeItem(at: tempDir)) + } + + for (stdoutMode, stderrMode) in [("shared", "shared"), ("shared", "owned"), ("owned", "shared")] { + let filePath = tempDir.appendingPathComponent("file-\(stdoutMode)-\(stderrMode)") + let fd = try FileDescriptor.open( + .init(filePath.path.removingPercentEncoding!), + .writeOnly, + options: .create, + permissions: [.ownerRead, .ownerWrite] + ) + defer { + if stdoutMode == "shared" && stderrMode == "shared" { + XCTAssertNoThrow(try fd.close()) + } + } + + let stdout: ProcessOutput + let stderr: ProcessOutput + + if stdoutMode == "owned" { + stdout = .fileDescriptor(takingOwnershipOf: fd) + } else { + stdout = .fileDescriptor(sharing: fd) + } + if stderrMode == "owned" { + stderr = .fileDescriptor(takingOwnershipOf: fd) + } else { + stderr = .fileDescriptor(sharing: fd) + } + + #if canImport(Darwin) + let command = + "for o in 1 2; do i=1000; while [ $i -gt 0 ]; do echo $o >&$o; i=$(( $i - 1 )); done & done; wait" + #else + // workaround for + // https://github.com/apple/swift-corelibs-foundation/issues/4772 + // which causes `SIGCHLD` being blocked in the shell so it can't wait for its children :| + let command = + "for o in 1 2; do i=1000; while [ $i -gt 0 ]; do echo $o >&$o; i=$(( $i - 1 )); done & done; sleep 10" + #endif + + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", command], + standardInput: EOFSequence(), + standardOutput: stdout, + standardError: stderr, + logger: self.logger + ) + try await exe.run().throwIfNonZero() + let actualOutput = try Data(contentsOf: filePath) + XCTAssertEqual(4000, actualOutput.count, "\(stdoutMode)-\(stderrMode)") + + var expectedOutput = Data() + expectedOutput.append(Data(repeating: UInt8(ascii: "\n"), count: 2000)) + expectedOutput.append(Data(repeating: UInt8(ascii: "1"), count: 1000)) + expectedOutput.append(Data(repeating: UInt8(ascii: "2"), count: 1000)) + XCTAssertEqual(expectedOutput, Data(actualOutput.sorted()), "\(stdoutMode)-\(stderrMode)") + } + } + + func testCanReliablyKillProcessesEvenWithSigmask() async throws { + let exitReason = try await withThrowingTaskGroup( + of: ProcessExitReason?.self, + returning: ProcessExitReason.self + ) { group in + group.addTask { + try await ProcessExecutor.run( + executable: "/bin/sh", + ["-c", "trap 'echo no' TERM; while true; do sleep 1; done"] + ) + } + group.addTask { + try? await Task.sleep(nanoseconds: 10_000_000) + return nil + } + + while let result = try await group.next() { + group.cancelAll() + if let result { + return result + } + } + preconditionFailure("this should be impossible, task should've returned a result") + } + XCTAssertEqual(.signal(SIGKILL), exitReason) + } + + func testCancelProcessVeryEarlyOnStressTest() async throws { + for i in 0..<1000 { + self.logger.debug("iteration go", metadata: ["iteration-number": "\(i)"]) + let exitReason = try await withThrowingTaskGroup( + of: ProcessExitReason?.self, + returning: ProcessExitReason.self + ) { group in + group.addTask { + try await ProcessExecutor.run( + executable: "/bin/sleep", ["100000"], + logger: self.logger + ) + } + group.addTask { + let waitNS = UInt64.random(in: 0..<10_000_000) + self.logger.info("waiting", metadata: ["wait-ns": "\(waitNS)"]) + try? await Task.sleep(nanoseconds: waitNS) + return nil + } + + while let result = try await group.next() { + group.cancelAll() + if let result { + return result + } + } + preconditionFailure("this should be impossible, task should've returned a result") + } + XCTAssertEqual(.signal(SIGKILL), exitReason, "iteration \(i)") + } + } + // MARK: - Setup/teardown override func setUp() async throws { @@ -745,7 +1012,7 @@ extension AsyncStream { extension AsyncSequence where Element == ByteBuffer { func pullAllOfIt() async throws -> ByteBuffer { - var buffer: ByteBuffer? + var buffer: ByteBuffer? = nil for try await chunk in self { buffer.setOrWriteImmutableBuffer(chunk) }