diff --git a/Package.swift b/Package.swift index 8094ef8..68b7b0b 100644 --- a/Package.swift +++ b/Package.swift @@ -76,9 +76,22 @@ let package = Package( ] ), .systemLibrary(name: "SystemSQLite", pkgConfig: "sqlite3"), + + // `AsyncProcess` modules and dependencies + + .target(name: "CProcessSpawnSync"), + .target( + name: "ProcessSpawnSync", + dependencies: [ + "CProcessSpawnSync", + .product(name: "Atomics", package: "swift-atomics"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + ] + ), .target( name: "AsyncProcess", dependencies: [ + "ProcessSpawnSync", .product(name: "Atomics", package: "swift-atomics"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), .product(name: "Logging", package: "swift-log"), diff --git a/Sources/AsyncProcess/FileContentStream.swift b/Sources/AsyncProcess/FileContentStream.swift index c704ea9..01a45de 100644 --- a/Sources/AsyncProcess/FileContentStream.swift +++ b/Sources/AsyncProcess/FileContentStream.swift @@ -19,7 +19,8 @@ import NIO // - Known issues: // - no tests // - most configurations have never run -struct FileContentStream: AsyncSequence { +typealias FileContentStream = _FileContentStream +public struct _FileContentStream: AsyncSequence & Sendable { public typealias Element = ByteBuffer typealias Underlying = AsyncThrowingChannel @@ -47,7 +48,17 @@ struct FileContentStream: AsyncSequence { private let asyncChannel: AsyncThrowingChannel - public init( + public static func makeReader( + fileDescriptor: CInt, + eventLoop: EventLoop = MultiThreadedEventLoopGroup.singleton.any(), + blockingPool: NIOThreadPool = .singleton + ) async throws -> _FileContentStream { + try await eventLoop.submit { + try FileContentStream(fileDescriptor: fileDescriptor, eventLoop: eventLoop, blockingPool: blockingPool) + }.get() + } + + init( fileDescriptor: CInt, eventLoop: EventLoop, blockingPool: NIOThreadPool? = nil diff --git a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift index b7815cb..c0fc396 100644 --- a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift +++ b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift @@ -75,6 +75,7 @@ public extension ProcessExecutor { _ arguments: [String], standardInput: StandardInput, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { let p = Self( @@ -85,6 +86,7 @@ public extension ProcessExecutor { standardInput: standardInput, standardOutput: .discard, standardError: .discard, + teardownSequence: teardownSequence, logger: logger ) return try await p.run() @@ -112,6 +114,7 @@ public extension ProcessExecutor { _ arguments: [String], standardInput: StandardInput, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger, logConfiguration: OutputLoggingSettings ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { @@ -123,6 +126,7 @@ public extension ProcessExecutor { standardInput: standardInput, standardOutput: .stream, standardError: .stream, + teardownSequence: teardownSequence, logger: logger ) return try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in @@ -179,6 +183,7 @@ public extension ProcessExecutor { outputProcessor: @escaping @Sendable (ProcessOutputStream, ByteBuffer) async throws -> (), splitOutputIntoLines: Bool = false, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason where StandardInput.Element == ByteBuffer { let exe = ProcessExecutor( @@ -189,6 +194,7 @@ public extension ProcessExecutor { standardInput: standardInput, standardOutput: .stream, standardError: .stream, + teardownSequence: teardownSequence, logger: logger ) return try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in @@ -269,6 +275,7 @@ public extension ProcessExecutor { collectStandardError: Bool, perStreamCollectionLimitBytes: Int = 128 * 1024, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReasonAndOutput where StandardInput.Element == ByteBuffer { let exe = ProcessExecutor( @@ -279,6 +286,7 @@ public extension ProcessExecutor { standardInput: standardInput, standardOutput: collectStandardOutput ? .stream : .discard, standardError: collectStandardError ? .stream : .discard, + teardownSequence: teardownSequence, logger: logger ) @@ -351,6 +359,7 @@ public extension ProcessExecutor { executable: String, _ arguments: [String], environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason { try await self.run( @@ -359,6 +368,7 @@ public extension ProcessExecutor { arguments, standardInput: EOFSequence(), environment: environment, + teardownSequence: teardownSequence, logger: logger ) } @@ -381,6 +391,7 @@ public extension ProcessExecutor { executable: String, _ arguments: [String], environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger, logConfiguration: OutputLoggingSettings ) async throws -> ProcessExitReason { @@ -390,6 +401,7 @@ public extension ProcessExecutor { arguments, standardInput: EOFSequence(), environment: environment, + teardownSequence: teardownSequence, logger: logger, logConfiguration: logConfiguration ) @@ -417,6 +429,7 @@ public extension ProcessExecutor { outputProcessor: @escaping @Sendable (ProcessOutputStream, ByteBuffer) async throws -> (), splitOutputIntoLines: Bool = false, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReason { try await self.runProcessingOutput( @@ -427,6 +440,7 @@ public extension ProcessExecutor { outputProcessor: outputProcessor, splitOutputIntoLines: splitOutputIntoLines, environment: environment, + teardownSequence: teardownSequence, logger: logger ) } @@ -455,6 +469,7 @@ public extension ProcessExecutor { collectStandardError: Bool, perStreamCollectionLimitBytes: Int = 128 * 1024, environment: [String: String] = [:], + teardownSequence: TeardownSequence = TeardownSequence(), logger: Logger = ProcessExecutor.disableLogging ) async throws -> ProcessExitReasonAndOutput { try await self.runCollectingOutput( @@ -465,6 +480,7 @@ public extension ProcessExecutor { collectStandardError: collectStandardError, perStreamCollectionLimitBytes: perStreamCollectionLimitBytes, environment: environment, + teardownSequence: teardownSequence, logger: logger ) } diff --git a/Sources/AsyncProcess/ProcessExecutor.swift b/Sources/AsyncProcess/ProcessExecutor.swift index 59f1d0a..946ae92 100644 --- a/Sources/AsyncProcess/ProcessExecutor.swift +++ b/Sources/AsyncProcess/ProcessExecutor.swift @@ -10,13 +10,32 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import Atomics import Logging import NIO +import ProcessSpawnSync @_exported import struct SystemPackage.FileDescriptor +#if os(Linux) || ASYNC_PROCESS_FORCE_PS_PROCESS +// Foundation.Process is too buggy on Linux +// +// - Foundation.Process on Linux throws error Error Domain=NSCocoaErrorDomain Code=256 "(null)" if executable not found +// https://github.com/swiftlang/swift-corelibs-foundation/issues/4810 +// - Foundation.Process on Linux doesn't correctly detect when child process dies (creating zombie processes) +// https://github.com/swiftlang/swift-corelibs-foundation/issues/4795 +// - Foundation.Process on Linux seems to inherit the Process.run()-calling thread's signal mask, even SIGTERM blocked +// https://github.com/swiftlang/swift-corelibs-foundation/issues/4772 +typealias Process = PSProcess +#endif + +#if os(iOS) || os(tvOS) || os(watchOS) +// Note: Process() in iOS/tvOS/watchOS is available in internal builds only under Foundation Private/headers +import Foundation_Private.NSTask +#else import Foundation +#endif public struct ProcessOutputStream: Sendable & Hashable & CustomStringConvertible { enum Backing { @@ -509,12 +528,22 @@ public final actor ProcessExecutor { ) p.terminationHandler = { p in - let pidExchangeWorked = self.processPid.compareExchange( - expected: p.processIdentifier, - desired: -1, - ordering: .sequentiallyConsistent - ).exchanged - assert(pidExchangeWorked) + let pProcessID = p.processIdentifier + var terminationPidExchange: (exchanged: Bool, original: pid_t) = (false, -1) + while !terminationPidExchange.exchanged { + terminationPidExchange = self.processPid.compareExchange( + expected: pProcessID, + desired: -1, + ordering: .sequentiallyConsistent + ) + if !terminationPidExchange.exchanged { + precondition( + terminationPidExchange.original == 0, + "termination pid exchange failed: \(terminationPidExchange)" + ) + Thread.sleep(forTimeInterval: 0.01) + } + } self.logger.debug( "finished running command", metadata: [ @@ -560,6 +589,8 @@ public final actor ProcessExecutor { ordering: .relaxed ) terminationStreamProducer.finish() // The termination handler will never have fired. + try! self.standardOutputWriteHandle?.close() + try! self.standardErrorWriteHandle?.close() assert(worked) // We just set it to running above, shouldn't be able to race (no `await`). assert(original == RunningStateApproximation.running.rawValue) // We compare-and-exchange it. throw error @@ -567,7 +598,12 @@ public final actor ProcessExecutor { // At this point, the process is running, we should therefore have a process ID (unless we're already dead). let childPid = p.processIdentifier - _ = self.processPid.compareExchange(expected: 0, desired: childPid, ordering: .sequentiallyConsistent) + let runPidExchange = self.processPid.compareExchange( + expected: 0, + desired: childPid, + ordering: .sequentiallyConsistent + ) + precondition(runPidExchange.exchanged, "run pid exchange failed: \(runPidExchange)") assert(childPid != 0 || !p.isRunning) self.logger.debug( "running command", @@ -658,6 +694,7 @@ public final actor ProcessExecutor { } var exitReason: ProcessExitReason? = nil + // cannot fix this warning yet (rdar://113844171) while let result = try await runProcessGroup.next() { if let result { exitReason = result diff --git a/Sources/CProcessSpawnSync/include/CProcessSpawnSync.h b/Sources/CProcessSpawnSync/include/CProcessSpawnSync.h new file mode 100644 index 0000000..0e88e3f --- /dev/null +++ b/Sources/CProcessSpawnSync/include/CProcessSpawnSync.h @@ -0,0 +1,13 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#include "ps-api.h" diff --git a/Sources/CProcessSpawnSync/include/ps-api.h b/Sources/CProcessSpawnSync/include/ps-api.h new file mode 100644 index 0000000..1b6fa8a --- /dev/null +++ b/Sources/CProcessSpawnSync/include/ps-api.h @@ -0,0 +1,73 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#ifndef PS_API_H +#define PS_API_H + +#include +#include + +typedef enum ps_error_kind_s { + PS_ERROR_KIND_EXECVE = 1, + PS_ERROR_KIND_PIPE = 2, + PS_ERROR_KIND_FCNTL = 3, + PS_ERROR_KIND_SIGNAL = 4, + PS_ERROR_KIND_SIGPROC_MASK = 5, + PS_ERROR_KIND_CHDIR = 6, + PS_ERROR_KIND_SETSID = 7, + PS_ERROR_KIND_DUP2 = 8, + PS_ERROR_KIND_READ_FROM_CHILD = 9, + PS_ERROR_KIND_DUP = 10, + PS_ERROR_KIND_SIGMASK_THREAD = 11, +} ps_error_kind; + +typedef struct ps_error_s { + ps_error_kind pse_kind; + int pse_code; + const char *pse_file; + int pse_line; + int pse_extra_info; +} ps_error; + +typedef enum ps_fd_setup_kind_s { + PS_MAP_FD = 1, + PS_CLOSE_FD = 2, +} ps_fd_setup_kind; + +typedef struct ps_fd_setup_s { + ps_fd_setup_kind psfd_kind; + int psfd_parent_fd; +} ps_fd_setup; + +typedef struct ps_process_configuration_s { + const char *psc_path; + + // including argv[0] + char **psc_argv; + + char **psc_env; + + const char *psc_cwd; + + + int psc_fd_setup_count; + const ps_fd_setup *psc_fd_setup_instructions; + + bool psc_new_session; + bool psc_close_other_fds; +} ps_process_configuration; + +pid_t ps_spawn_process(ps_process_configuration *config, ps_error *out_error); + +void ps_convert_exit_status(int in_status, bool *out_has_exited, bool *out_is_exit_code, int *out_code); + +#endif diff --git a/Sources/CProcessSpawnSync/internal-helpers.h b/Sources/CProcessSpawnSync/internal-helpers.h new file mode 100644 index 0000000..9d5e0ca --- /dev/null +++ b/Sources/CProcessSpawnSync/internal-helpers.h @@ -0,0 +1,87 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#ifndef INTERNAL_HELPERS_H +#define INTERNAL_HELPERS_H +#include + +static int positive_int_parse(const char *str) { + int out = 0; + char c = 0; + + while ((c = *str++) != 0) { + out *= 10; + if (c >= '0' && c <= '9') { + out += c - '0'; + } else { + return -1; + } + } + return out; +} + +static int highest_possibly_open_fd_dir(const char *fd_dir) { + int highest_fd_so_far = 0; + DIR *dir_ptr = opendir(fd_dir); + if (dir_ptr == NULL) { + return -1; + } + + struct dirent *dir_entry = NULL; + while ((dir_entry = readdir(dir_ptr)) != NULL) { + char *entry_name = dir_entry->d_name; + int number = positive_int_parse(entry_name); + if (number > (long)highest_fd_so_far) { + highest_fd_so_far = number; + } + } + + closedir(dir_ptr); + return highest_fd_so_far; +} + +static int highest_possibly_open_fd(void) { +#if defined(__APPLE__) + int hi = highest_possibly_open_fd_dir("/dev/fd"); + if (hi < 0) { + hi = getdtablesize(); + } +#elif defined(__linux__) + int hi = highest_possibly_open_fd_dir("/proc/self/fd"); + if (hi < 0) { + hi = getdtablesize(); + } +#else + int hi = 1024; +#endif + + return hi; +} + +static int block_everything_but_something_went_seriously_wrong_signals(sigset_t *old_mask) { + sigset_t mask; + int r = 0; + r |= sigfillset(&mask); + r |= sigdelset(&mask, SIGABRT); + r |= sigdelset(&mask, SIGBUS); + r |= sigdelset(&mask, SIGFPE); + r |= sigdelset(&mask, SIGILL); + r |= sigdelset(&mask, SIGKILL); + r |= sigdelset(&mask, SIGSEGV); + r |= sigdelset(&mask, SIGSTOP); + r |= sigdelset(&mask, SIGSYS); + r |= sigdelset(&mask, SIGTRAP); + + r |= pthread_sigmask(SIG_BLOCK, &mask, old_mask); + return r; +} +#endif diff --git a/Sources/CProcessSpawnSync/spawner.c b/Sources/CProcessSpawnSync/spawner.c new file mode 100644 index 0000000..e65910f --- /dev/null +++ b/Sources/CProcessSpawnSync/spawner.c @@ -0,0 +1,284 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "internal-helpers.h" + +#define MAKE_PS_ERROR_FROM_ERRNO(__kind) \ +(ps_error){ \ + .pse_kind = (__kind), \ + .pse_code = errno, \ + .pse_file = __FILE__, \ + .pse_line = __LINE__ \ +} + +#if __apple__ +# define PS_SIG_MAX __DARWIN_NSIG +#else +# define PS_SIG_MAX 32 +#endif + +#define ps_precondition(__cond) do { \ + int eval = (__cond); \ + if (!eval) { \ + __builtin_trap(); \ + } \ +} while(0) + +struct child_scratch { + int duplicated_fd; +}; + +static void setup_and_execve_child(ps_process_configuration *config, int error_pipe, struct child_scratch *scratch) { + ps_error error = { 0 }; + sigset_t sigset = { 0 }; + int err = -1; + + /* reset signal handlers */ + for (int signo = 1; signo < PS_SIG_MAX; signo++) { + if (signo == SIGKILL || signo == SIGSTOP) { + continue; + } + void (*err_ptr)(int) = signal(signo, SIG_DFL); + if (err_ptr != SIG_ERR) { + continue; + } + + if (errno == EINVAL) { + break; // probably too high of a signal + } + + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGNAL); + error.pse_extra_info = signo; + goto write_fail; + } + + /* reset signal mask */ + sigemptyset(&sigset); + err = sigprocmask(SIG_SETMASK, &sigset, NULL) != 0; + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGPROC_MASK); + goto write_fail; + } + + if (config->psc_new_session) { + err = setsid(); + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SETSID); + goto write_fail; + } + } + + for (int child_fd=0; child_fdpsc_fd_setup_count; child_fd++) { + ps_fd_setup setup = config->psc_fd_setup_instructions[child_fd]; + + switch (setup.psfd_kind) { + case PS_MAP_FD: + scratch[child_fd].duplicated_fd = fcntl(setup.psfd_parent_fd, F_DUPFD_CLOEXEC, config->psc_fd_setup_count); + if (scratch[child_fd].duplicated_fd == -1) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_DUP); + error.pse_extra_info = child_fd; + goto write_fail; + } + break; + case PS_CLOSE_FD: + scratch[child_fd].duplicated_fd = -1; + break; + default: + ps_precondition(0); + } + } + + for (int child_fd=0; child_fdpsc_fd_setup_count; child_fd++) { + ps_fd_setup setup = config->psc_fd_setup_instructions[child_fd]; + switch (setup.psfd_kind) { + case PS_MAP_FD: + ps_precondition(scratch[child_fd].duplicated_fd > child_fd); + err = dup2(scratch[child_fd].duplicated_fd, child_fd); + if (err == -1) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_DUP2); + error.pse_extra_info = child_fd; + goto write_fail; + } + break; + case PS_CLOSE_FD: + ps_precondition(scratch[child_fd].duplicated_fd == -1); + close(child_fd); + break; + default: + ps_precondition(0); + } + } + + if (config->psc_close_other_fds) { + for (int i=config->psc_fd_setup_count; ipsc_cwd) { + err = chdir(config->psc_cwd); + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_CHDIR); + goto write_fail; + } + } + + /* finally, exec */ + err = execve(config->psc_path, config->psc_argv, config->psc_env); + if (err) { + error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_EXECVE); + goto write_fail; + } + + __builtin_unreachable(); + + write_fail: + write(error_pipe, &error, sizeof(error)); + close(error_pipe); +} + +pid_t ps_spawn_process(ps_process_configuration *config, ps_error *out_error) { + pid_t pid = -1; + sigset_t old_sigmask; + struct child_scratch *scratch = NULL; + int error_pid_fd[2] = { -1, -1 }; + int err = pipe(error_pid_fd); + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_PIPE); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + err = fcntl(error_pid_fd[1], F_SETFD, FD_CLOEXEC); + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_FCNTL); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + scratch = calloc(config->psc_fd_setup_count, sizeof(*scratch)); + + /* block all signals on this thread, don't want things to go wrong post-fork, pre-execve */ + err = block_everything_but_something_went_seriously_wrong_signals(&old_sigmask); + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGMASK_THREAD); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + if ((pid = fork()) == 0) { + /* child */ + close(error_pid_fd[0]); + error_pid_fd[0] = -1; + setup_and_execve_child(config, error_pid_fd[1], scratch); + abort(); + } else { + /* parent */ + err = pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL); /* restore old sigmask */ + if (err) { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_SIGMASK_THREAD); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + + if (pid > 0) { + ps_error child_error = { 0 }; + close(error_pid_fd[1]); + error_pid_fd[1] = -1; + + free(scratch); + scratch = NULL; + + while (true) { + ssize_t read_res = read(error_pid_fd[0], &child_error, sizeof(child_error)); + if (read_res == 0) { + /* EOF, that's good, execve worked. */ + close(error_pid_fd[0]); + error_pid_fd[0] = -1; + return pid; + } else if (read_res > 0) { + ps_precondition(read_res == sizeof(child_error)); + if (out_error) { + *out_error = child_error; + } + goto error_cleanup; + } else { + if (errno == EINTR) { + continue; + } else { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_READ_FROM_CHILD); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + } + } + } else { + ps_error error = MAKE_PS_ERROR_FROM_ERRNO(PS_ERROR_KIND_FCNTL); + if (out_error) { + *out_error = error; + } + goto error_cleanup; + } + } + +error_cleanup: + if (error_pid_fd[0] != -1) { + close(error_pid_fd[0]); + } + if (error_pid_fd[1] != -1) { + close(error_pid_fd[1]); + } + free(scratch); + ps_precondition((!out_error) || (out_error->pse_kind != 0)); + return 0; +} + +void ps_convert_exit_status(int in_status, bool *out_has_exited, bool *out_is_exit_code, int *out_code) { + if (WIFEXITED(in_status)) { + *out_has_exited = true; + *out_is_exit_code = true; + *out_code = WEXITSTATUS(in_status); + } else if (WIFSIGNALED(in_status)) { + *out_has_exited = true; + *out_is_exit_code = false; + *out_code = WTERMSIG(in_status); + } else { + *out_has_exited = false; + *out_is_exit_code = false; + *out_code = -1; + } +} diff --git a/Sources/ProcessSpawnSync/ProcessSpawner.swift b/Sources/ProcessSpawnSync/ProcessSpawner.swift new file mode 100644 index 0000000..a1f5e1f --- /dev/null +++ b/Sources/ProcessSpawnSync/ProcessSpawner.swift @@ -0,0 +1,355 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Atomics +import CProcessSpawnSync +import Foundation +import NIOConcurrencyHelpers + +extension ps_error_s { + private func makeDescription() -> String { + """ + PSError(\ + kind: \(self.pse_kind.rawValue), \ + errno: \(self.pse_code), \ + file: \(String(cString: self.pse_file)), \ + line: \(self.pse_line)\ + \( + self.pse_extra_info != 0 ? ", extra: \(self.pse_extra_info)" : "" + ) + """ + } +} + +#if compiler(>=6.0) +extension ps_error_s: @retroactive CustomStringConvertible { + public var description: String { + self.makeDescription() + } +} +#else +extension ps_error_s: CustomStringConvertible { + public var description: String { + self.makeDescription() + } +} +#endif + +public struct PSProcessUnknownError: Error & CustomStringConvertible { + var reason: String + + public var description: String { + self.reason + } +} + +public final class PSProcess: Sendable { + struct State: Sendable { + var executableURL: URL? = nil + var arguments: [String] = [] + var environment: [String: String] = [:] + private(set) var pidWhenRunning: pid_t? = nil + var standardInput: Pipe? = nil + var standardOutput: FileHandle? = nil + var standardError: FileHandle? = nil + var terminationHandler: (@Sendable (PSProcess) -> ())? = nil + private(set) var procecesIdentifier: pid_t? = nil + private(set) var terminationStatus: (Process.TerminationReason, CInt)? = nil + + mutating func setRunning(pid: pid_t, isRunningApproximation: ManagedAtomic) { + assert(self.pidWhenRunning == nil) + self.pidWhenRunning = pid + self.procecesIdentifier = pid + isRunningApproximation.store(true, ordering: .relaxed) + } + + mutating func setNotRunning( + terminationStaus: (Process.TerminationReason, CInt), + isRunningApproximation: ManagedAtomic + ) -> @Sendable (PSProcess) -> () { + assert(self.pidWhenRunning != nil) + isRunningApproximation.store(false, ordering: .relaxed) + self.pidWhenRunning = nil + self.terminationStatus = terminationStaus + let terminationHandler = self.terminationHandler ?? { _ in } + self.terminationHandler = nil + return terminationHandler + } + } + + let state = NIOLockedValueBox(State()) + let isRunningApproximation = ManagedAtomic(false) + + public init() {} + + public func run() throws { + let state = self.state.withLockedValue { $0 } + + guard let pathString = state.executableURL?.path.removingPercentEncoding else { + throw PSProcessUnknownError(reason: "executableURL is nil") + } + let path = copyOwnedCTypedString(pathString) + defer { + path.deallocate() + } + let args = copyOwnedCTypedStringArray([pathString] + state.arguments) + defer { + var index = 0 + var arg = args[index] + while arg != nil { + arg!.deallocate() + index += 1 + arg = args[index] + } + } + let envs = copyOwnedCTypedStringArray((state.environment.map { k, v in "\(k)=\(v)" })) + defer { + var index = 0 + var env = envs[index] + while env != nil { + env!.deallocate() + index += 1 + env = envs[index] + } + } + + let psSetup: [ps_fd_setup] = [ + ps_fd_setup( + psfd_kind: PS_MAP_FD, + psfd_parent_fd: state.standardInput?.fileHandleForReading.fileDescriptor ?? STDIN_FILENO + ), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: state.standardOutput?.fileDescriptor ?? STDOUT_FILENO), + ps_fd_setup(psfd_kind: PS_MAP_FD, psfd_parent_fd: state.standardError?.fileDescriptor ?? STDERR_FILENO), + ] + let (pid, error) = psSetup.withUnsafeBufferPointer { psSetupPtr -> (pid_t, ps_error) in + var config = ps_process_configuration_s( + psc_path: path, + psc_argv: args, + psc_env: envs, + psc_cwd: nil, + psc_fd_setup_count: CInt(psSetupPtr.count), + psc_fd_setup_instructions: psSetupPtr.baseAddress!, + psc_new_session: false, + psc_close_other_fds: true + ) + var error = ps_error() + let pid = ps_spawn_process(&config, &error) + return (pid, error) + } + try! state.standardInput?.fileHandleForReading.close() + guard pid > 0 else { + switch (error.pse_kind, error.pse_code) { + case (PS_ERROR_KIND_EXECVE, ENOENT), (PS_ERROR_KIND_EXECVE, ENOTDIR): + throw NSError(domain: NSCocoaErrorDomain, code: NSFileNoSuchFileError) + default: + throw PSProcessUnknownError(reason: "\(error)") + } + } + self.state.withLockedValue { state in + state.setRunning(pid: pid, isRunningApproximation: self.isRunningApproximation) + } + + let q = DispatchQueue(label: "q") + let source = DispatchSource.makeSignalSource(signal: SIGCHLD, queue: q) + source.setEventHandler { + if let terminationHandler = self.terminationHandlerFinishedRunning() { + source.cancel() + terminationHandler(self) + } + } + source.setRegistrationHandler { + if let terminationHandler = self.terminationHandlerFinishedRunning() { + source.cancel() + q.async { + terminationHandler(self) + } + } + } + source.resume() + } + + public var processIdentifier: pid_t { + self.state.withLockedValue { state in + state.procecesIdentifier! + } + } + + public var terminationReason: Process.TerminationReason { + self.state.withLockedValue { state in + state.terminationStatus!.0 + } + } + + public var terminationStatus: CInt { + self.state.withLockedValue { state in + state.terminationStatus!.1 + } + } + + public var isRunning: Bool { + self.isRunningApproximation.load(ordering: .relaxed) + } + + func terminationHandlerFinishedRunning() -> (@Sendable (PSProcess) -> ())? { + self.state.withLockedValue { state -> (@Sendable (PSProcess) -> ())? in + guard let pid = state.pidWhenRunning else { + return nil + } + var status: CInt = -1 + while true { + let err = waitpid(pid, &status, WNOHANG) + if err == -1 { + if errno == EINTR { + continue + } else { + preconditionFailure("waitpid failed with \(errno)") + } + } else { + var hasExited = false + var isExitCode = false + var code: CInt = 0 + ps_convert_exit_status(status, &hasExited, &isExitCode, &code) + if hasExited { + return state.setNotRunning( + terminationStaus: (isExitCode ? .exit : .uncaughtSignal, code), + isRunningApproximation: self.isRunningApproximation + ) + } else { + return nil + } + } + } + } + } + + public var executableURL: URL? { + get { + self.state.withLockedValue { state in + state.executableURL + } + } + set { + self.state.withLockedValue { state in + state.executableURL = newValue + } + } + } + + public var launchPath: String? { + get { + self.state.withLockedValue { state in + state.executableURL?.absoluteString + } + } + set { + self.state.withLockedValue { state in + state.executableURL = newValue.map { URL(fileURLWithPath: $0) } + } + } + } + + public var arguments: [String] { + get { + self.state.withLockedValue { state in + state.arguments + } + } + set { + self.state.withLockedValue { state in + state.arguments = newValue + } + } + } + + public var environment: [String: String] { + get { + self.state.withLockedValue { state in + state.environment + } + } + set { + self.state.withLockedValue { state in + state.environment = newValue + } + } + } + + public var standardOutput: FileHandle? { + get { + self.state.withLockedValue { state in + state.standardOutput + } + } + set { + self.state.withLockedValue { state in + state.standardOutput = newValue + } + } + } + + public var standardError: FileHandle? { + get { + self.state.withLockedValue { state in + state.standardError + } + } + set { + self.state.withLockedValue { state in + state.standardError = newValue + } + } + } + + public var standardInput: Pipe? { + get { + self.state.withLockedValue { state in + state.standardInput + } + } + set { + self.state.withLockedValue { state in + state.standardInput = newValue + } + } + } + + public var terminationHandler: (@Sendable (PSProcess) -> ())? { + get { + self.state.withLockedValue { state in + state.terminationHandler + } + } + set { + self.state.withLockedValue { state in + state.terminationHandler = newValue + } + } + } +} + +func copyOwnedCTypedString(_ string: String) -> UnsafeMutablePointer { + let out = UnsafeMutableBufferPointer.allocate(capacity: string.utf8.count + 1) + _ = out.initialize(from: string.utf8.map { CChar(bitPattern: $0) }) + out[out.endIndex - 1] = 0 + + return out.baseAddress! +} + +func copyOwnedCTypedStringArray(_ array: [String]) -> UnsafeMutablePointer?> { + let out = UnsafeMutableBufferPointer?>.allocate(capacity: array.count + 1) + for (index, string) in array.enumerated() { + out[index] = copyOwnedCTypedString(string) + } + out[out.endIndex - 1] = nil + + return out.baseAddress! +} diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift index 7b5a4ee..2cc7e86 100644 --- a/Tests/AsyncProcessTests/IntegrationTests.swift +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -10,6 +10,22 @@ // //===----------------------------------------------------------------------===// +#if canImport(Darwin) +import Darwin +#elseif canImport(Musl) +@preconcurrency import Musl +#elseif canImport(Glibc) +@preconcurrency import Glibc +#elseif canImport(WASILibc) +@preconcurrency import WASILibc +#elseif canImport(Bionic) +@preconcurrency import Bionic +#elseif canImport(Android) +@preconcurrency import Android +#else +#error("unknown libc, please fix") +#endif + import AsyncAlgorithms import AsyncProcess import Atomics @@ -18,15 +34,10 @@ import NIO import NIOConcurrencyHelpers import XCTest -#if canImport(Darwin) -import Darwin -#else -import Glibc -#endif - final class IntegrationTests: XCTestCase { private var group: EventLoopGroup! private var logger: Logger! + private var highestFD: CInt? func testTheBasicsWork() async throws { let exe = ProcessExecutor( @@ -68,12 +79,7 @@ final class IntegrationTests: XCTestCase { } func testSignalsWork() async throws { - #if os(Linux) - // workaround for https://github.com/apple/swift-corelibs-foundation/issues/4772 - let signalsToTest: [CInt] = [SIGKILL] - #else let signalsToTest: [CInt] = [SIGKILL, SIGTERM, SIGINT] - #endif for signal in signalsToTest { let exe = ProcessExecutor( group: self.group, @@ -312,6 +318,8 @@ final class IntegrationTests: XCTestCase { } func testOutputWithoutNewlinesThatIsSplitIntoLines() async throws { + self.logger = Logger(label: "x") + self.logger.logLevel = .trace let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", @@ -755,11 +763,9 @@ final class IntegrationTests: XCTestCase { let result = try await exe.run() XCTFail("got result for bad executable: \(result)") } catch { - XCTAssertEqual(NSCocoaErrorDomain, (error as NSError).domain) - #if canImport(Darwin) + XCTAssertEqual(NSCocoaErrorDomain, (error as NSError).domain, "\(error)") // https://github.com/apple/swift-corelibs-foundation/issues/4810 - XCTAssertEqual(NSFileNoSuchFileError, (error as NSError).code) - #endif + XCTAssertEqual(NSFileNoSuchFileError, (error as NSError).code, "\(error)") } } @@ -891,17 +897,8 @@ final class IntegrationTests: XCTestCase { stderr = .fileDescriptor(sharing: fd) } - #if canImport(Darwin) let command = "for o in 1 2; do i=1000; while [ $i -gt 0 ]; do echo $o >&$o; i=$(( $i - 1 )); done & done; wait" - #else - // workaround for - // https://github.com/apple/swift-corelibs-foundation/issues/4772 - // which causes `SIGCHLD` being blocked in the shell so it can't wait for its children :| - let command = - "for o in 1 2; do i=1000; while [ $i -gt 0 ]; do echo $o >&$o; i=$(( $i - 1 )); done & done; sleep 10" - #endif - let exe = ProcessExecutor( group: self.group, executable: "/bin/sh", @@ -951,7 +948,7 @@ final class IntegrationTests: XCTestCase { } func testCancelProcessVeryEarlyOnStressTest() async throws { - for i in 0..<1000 { + for i in 0..<100 { self.logger.debug("iteration go", metadata: ["iteration-number": "\(i)"]) let exitReason = try await withThrowingTaskGroup( of: ProcessExitReason?.self, @@ -1017,12 +1014,9 @@ final class IntegrationTests: XCTestCase { try await p.sendSignal(SIGKILL) let finalResult = try await result XCTAssertEqual(.signal(SIGKILL), finalResult) + while try await outputIterator.next() != nil {} } - #if os(macOS) - // This test will hang on anything that uses swift-corelibs-foundation because of - // https://github.com/apple/swift-corelibs-foundation/issues/4795 - // Foundation.Process on Linux doesn't correctly detect when child process dies (creating zombie processes) func testCanDealWithRunawayChildProcesses() async throws { self.logger = Logger(label: "x") self.logger.logLevel = .info @@ -1032,7 +1026,7 @@ final class IntegrationTests: XCTestCase { "-c", """ set -e - /usr/bin/yes "Runaway process from \(#function), please file a swift-sdk-generator bug." > /dev/null & + /usr/bin/yes "Runaway process from \(#function), please file a swift-async-process bug." > /dev/null & child_pid=$! trap "echo >&2 killing $child_pid; kill -KILL $child_pid" INT echo "$child_pid" # communicate the child pid to our parent @@ -1075,17 +1069,16 @@ final class IntegrationTests: XCTestCase { let killRet = kill(pid, 0) let errnoCode = errno guard killRet == -1 || attempt > 5 else { - logger.error("kill didn't fail on attempt \(attempt), trying again...") + self.logger.error("kill didn't fail on attempt \(attempt), trying again...") usleep(100_000) continue } XCTAssertEqual(-1, killRet) XCTAssertEqual(ESRCH, errnoCode) break - } + } } } - #endif func testShutdownSequenceWorks() async throws { let p = ProcessExecutor( @@ -1104,8 +1097,8 @@ final class IntegrationTests: XCTestCase { ], standardError: .discard, teardownSequence: [ - .sendSignal(SIGQUIT, allowedTimeToExitNS: 10_000_000), - .sendSignal(SIGTERM, allowedTimeToExitNS: 10_000_000), + .sendSignal(SIGQUIT, allowedTimeToExitNS: 200_000_000), + .sendSignal(SIGTERM, allowedTimeToExitNS: 200_000_000), .sendSignal(SIGINT, allowedTimeToExitNS: 1_000_000_000), ], logger: self.logger @@ -1114,10 +1107,7 @@ final class IntegrationTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { let result = try await p.run() - #if os(macOS) - // won't work on SCLF: https://github.com/apple/swift-corelibs-foundation/issues/4772 XCTAssertEqual(.exit(3), result) - #endif } var allLines: [String] = [] for try await line in await p.standardOutput.splitIntoLines().strings { @@ -1127,25 +1117,33 @@ final class IntegrationTests: XCTestCase { allLines.append(line) } try await group.waitForAll() - #if os(macOS) - // won't work on SCLF: https://github.com/apple/swift-corelibs-foundation/issues/4772 XCTAssertEqual(["OK", "saw SIGQUIT", "saw SIGTERM", "saw SIGINT"], allLines) - #endif } } // MARK: - Setup/teardown override func setUp() async throws { + fflush(stdout) + fflush(stderr) self.group = MultiThreadedEventLoopGroup(numberOfThreads: 3) self.logger = Logger(label: "test", factory: { _ in SwiftLogNoOpLogHandler() }) + + // Make sure the singleton threads have booted (because they use file descriptors) + try await MultiThreadedEventLoopGroup.singleton.next().submit {}.get() + self.highestFD = highestOpenFD() } override func tearDown() { + let highestFD = highestOpenFD() + XCTAssertEqual(self.highestFD, highestFD, "\(blockingLSOFMyself())") + self.highestFD = nil self.logger = nil XCTAssertNoThrow(try self.group.syncShutdownGracefully()) self.group = nil + fflush(stdout) + fflush(stderr) } } @@ -1216,3 +1214,71 @@ extension ProcessExecutor { } } } + +private func highestOpenFD() -> CInt? { + #if os(macOS) + guard let dirPtr = opendir("/dev/fd") else { + return nil + } + #elseif os(Linux) + guard let dirPtr = opendir("/proc/self/fd") else { + return nil + } + #else + return nil + #endif + defer { + closedir(dirPtr) + } + var highestFDSoFar = CInt(0) + + while let dirEntPtr = readdir(dirPtr) { + var entryName = dirEntPtr.pointee.d_name + let thisFD = withUnsafeBytes(of: &entryName) { entryNamePtr -> CInt? in + + CInt(String(decoding: entryNamePtr.prefix(while: { $0 != 0 }), as: Unicode.UTF8.self)) + } + highestFDSoFar = max(thisFD ?? -1, highestFDSoFar) + } + + return highestFDSoFar +} + +private func blockingLSOFMyself() -> String { + let box = NIOLockedValueBox("n/a") + let sem = DispatchSemaphore(value: 0) + Task { + defer { + sem.signal() + } + do { + #if canImport(Darwin) + let lsofPath = "/usr/sbin/lsof" + #else + let lsofPath = "/usr/bin/lsof" + #endif + let result = try await ProcessExecutor.runCollectingOutput( + executable: lsofPath, + ["-Pnp", "\(getpid())"], + collectStandardOutput: true, + collectStandardError: true + ) + let outString = """ + exit code: \(result.exitReason)\n + ## stdout + \(String(buffer: result.standardOutput!)) + + ## stderr + \(String(buffer: result.standardError!)) + + """ + box.withLockedValue { $0 = outString } + } catch { + box.withLockedValue { debugString in + debugString = "ERROR: \(error)" + } + } + } + _ = sem.wait(timeout: .now() + 3) + return box.withLockedValue { $0 } +}