From 3450b8393fb8b1085ce047f46641b33421ddf41b Mon Sep 17 00:00:00 2001 From: June Rhodes Date: Thu, 26 Sep 2024 18:18:03 +1000 Subject: [PATCH 1/7] Add cmake-uba-run and cmake-uba-server commands for doing CMake work on UBA --- UET/Redpoint.Uba/DefaultUbaServer.cs | 59 +- UET/Redpoint.Uba/DefaultUbaServerFactory.cs | 8 +- UET/Redpoint.Uba/IUbaServer.cs | 20 +- UET/Redpoint.Uba/UbaLoggerForwarder.cs | 54 +- .../CMakeUbaRun/CMakeUbaRunCommand.cs | 197 +++++ .../Internal/CMakeUbaServer/CMakeUba.proto | 31 + .../CMakeUbaServer/CMakeUbaServerCommand.cs | 204 +++++ .../CMakeUbaServer/KubernetesConstants.cs | 7 + .../CMakeUbaServer/KubernetesNodeState.cs | 106 +++ .../CMakeUbaServer/KubernetesNodeWorker.cs | 13 + .../UbaCoordinatorKubernetes.cs | 725 ++++++++++++++++++ .../UbaCoordinatorKubernetesConfig.cs | 30 + UET/uet/Commands/Internal/InternalCommand.cs | 4 + UET/uet/uet.csproj | 4 + 14 files changed, 1407 insertions(+), 55 deletions(-) create mode 100644 UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs create mode 100644 UET/uet/Commands/Internal/CMakeUbaServer/CMakeUba.proto create mode 100644 UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs create mode 100644 UET/uet/Commands/Internal/CMakeUbaServer/KubernetesConstants.cs create mode 100644 UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeState.cs create mode 100644 UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeWorker.cs create mode 100644 UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs create mode 100644 UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetesConfig.cs diff --git a/UET/Redpoint.Uba/DefaultUbaServer.cs b/UET/Redpoint.Uba/DefaultUbaServer.cs index d21e414d..832f7864 100644 --- a/UET/Redpoint.Uba/DefaultUbaServer.cs +++ b/UET/Redpoint.Uba/DefaultUbaServer.cs @@ -18,7 +18,7 @@ internal partial class DefaultUbaServer : IUbaServer private readonly ILogger _logger; private readonly IProcessArgumentParser _processArgumentParser; private readonly IProcessExecutor _localProcessExecutor; - private readonly UbaLoggerForwarder _ubaLoggerForwarder; + private readonly nint _ubaLogger; private readonly CancellationTokenSource _localWorkerCancellationTokenSource; private nint _server; private nint _storageServer; @@ -27,6 +27,11 @@ internal partial class DefaultUbaServer : IUbaServer private readonly ConcurrentDictionary _returnedProcesses; private readonly TerminableAwaitableConcurrentQueue _processQueue; private readonly Task[] _localWorkerTasks; + private readonly SessionServer_RemoteProcessAvailableCallback _onRemoteProcessAvailableDelegate; + private readonly SessionServer_RemoteProcessReturnedCallback _onRemoteProcessReturnedDelegate; + private long _processesPendingInQueue; + private long _processesExecutingLocally; + private long _processesExecutingRemotely; #region Library Imports @@ -47,7 +52,7 @@ private static partial bool Server_AddClient( nint server, string ip, int port, - string crypto); + nint crypto); [LibraryImport("UbaHost", StringMarshalling = StringMarshalling.Custom, StringMarshallingCustomType = typeof(UbaStringMarshaller))] [DefaultDllImportSearchPaths(DllImportSearchPath.SafeDirectories)] @@ -169,7 +174,7 @@ public DefaultUbaServer( ILogger logger, IProcessArgumentParser processArgumentParser, IProcessExecutor localProcessExecutor, - UbaLoggerForwarder ubaLoggerForwarder, + nint ubaLogger, nint server, string rootStorageDirectoryPath, string ubaTraceFilePath) @@ -177,7 +182,7 @@ public DefaultUbaServer( _logger = logger; _processArgumentParser = processArgumentParser; _localProcessExecutor = localProcessExecutor; - _ubaLoggerForwarder = ubaLoggerForwarder; + _ubaLogger = ubaLogger; _localWorkerCancellationTokenSource = new CancellationTokenSource(); _server = server; @@ -187,7 +192,7 @@ public DefaultUbaServer( rootStorageDirectoryPath, 40uL * 1000 * 1000 * 1000, false, - _ubaLoggerForwarder.Logger, + ubaLogger, string.Empty); if (_storageServer == nint.Zero) { @@ -198,7 +203,7 @@ public DefaultUbaServer( _sessionServerCreateInfo = CreateSessionServerCreateInfo( _storageServer, _server, - _ubaLoggerForwarder.Logger, + ubaLogger, rootStorageDirectoryPath, ubaTraceFilePath, disableCustomAllocator: false, @@ -227,24 +232,43 @@ public DefaultUbaServer( .Select(x => Task.Run(LocalProcessWorkerLoop)) .ToArray(); - SessionServer_SetRemoteProcessAvailable(_sessionServer, OnRemoteProcessAvailable, nint.Zero); - SessionServer_SetRemoteProcessReturned(_sessionServer, OnRemoteProcessReturned, nint.Zero); + _onRemoteProcessAvailableDelegate = OnRemoteProcessAvailable; + _onRemoteProcessReturnedDelegate = OnRemoteProcessReturned; + SessionServer_SetRemoteProcessAvailable(_sessionServer, _onRemoteProcessAvailableDelegate, nint.Zero); + SessionServer_SetRemoteProcessReturned(_sessionServer, _onRemoteProcessReturnedDelegate, nint.Zero); } - public bool AddRemoteAgent(string ip, int port, string crypto) + public bool AddRemoteAgent(string ip, int port) { - return Server_AddClient(_server, ip, port, crypto); + return Server_AddClient(_server, ip, port, nint.Zero); } + public long ProcessesPendingInQueue => Interlocked.Read(ref _processesPendingInQueue); + public long ProcessesExecutingLocally => Interlocked.Read(ref _processesExecutingLocally); + public long ProcessesExecutingRemotely => Interlocked.Read(ref _processesExecutingRemotely); + private void OnRemoteProcessAvailable(nint userData) { // Start a background task to queue a remote process (since this function can't be async). _ = Task.Run(async () => { + // Before we attempt to actually dequeue, check that there is something to dequeue, since UBA + // frequently calls OnRemoteProcessAvailable whileever slots are free (and not just when a slot + // opens up). + if (Interlocked.Read(ref _processesPendingInQueue) == 0) + { + return; + } + + _logger.LogInformation("Received OnRemoteProcessAvailable, pulling next available process to run..."); + // Grab the next process to run. var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false); + Interlocked.Decrement(ref _processesPendingInQueue); + Interlocked.Increment(ref _processesExecutingRemotely); // Run the process remotely. + _logger.LogInformation($"Got process to run '{descriptor.ProcessSpecification.FilePath}'..."); var isRequeued = false; try { @@ -363,6 +387,8 @@ private void OnRemoteProcessAvailable(nint userData) } processHash = ProcessHandle_GetHash(process); + _logger.LogInformation($"Remote process '{descriptor.ProcessSpecification.FilePath}' is now running..."); + // While we wait for the exit gate to open, poll for log lines. while (!exitedGate.Opened && !_returnedProcesses.ContainsKey(processHash) && @@ -390,6 +416,8 @@ private void OnRemoteProcessAvailable(nint userData) if (!exitCode.HasValue /* Returned by remote agent */ || (exitCode.HasValue && exitCode == 9006) /* Known retry code when running cmd.exe via remote agent */) { + _logger.LogInformation($"Remote process '{descriptor.ProcessSpecification.FilePath}' was returned to the queue, scheduling for local execution..."); + // Prefer to run this command locally now. descriptor.PreferRemote = false; @@ -427,6 +455,11 @@ private void OnRemoteProcessAvailable(nint userData) { descriptor.CompletionGate.Open(); } + else + { + Interlocked.Increment(ref _processesPendingInQueue); + } + Interlocked.Decrement(ref _processesExecutingRemotely); } }); } @@ -443,7 +476,7 @@ private async Task LocalProcessWorkerLoop() // Grab the next process to run. var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false); if (descriptor.PreferRemote && - (descriptor.DateQueuedUtc - DateTimeOffset.UtcNow).TotalSeconds < 30) + (DateTimeOffset.UtcNow - descriptor.DateQueuedUtc).TotalSeconds < 30) { // If this process prefers remote execution, and it hasn't been sitting in the queue for // at least 30 seconds, requeue it and try again. @@ -451,6 +484,8 @@ private async Task LocalProcessWorkerLoop() await Task.Delay(200, _localWorkerCancellationTokenSource.Token).ConfigureAwait(false); continue; } + Interlocked.Decrement(ref _processesPendingInQueue); + Interlocked.Increment(ref _processesExecutingLocally); // Run the process locally. try @@ -467,6 +502,7 @@ private async Task LocalProcessWorkerLoop() finally { descriptor.CompletionGate.Open(); + Interlocked.Decrement(ref _processesExecutingLocally); } } while (true); @@ -495,6 +531,7 @@ public async Task ExecuteAsync( CompletionGate = new Gate(), }; _processQueue.Enqueue(descriptor); + Interlocked.Increment(ref _processesPendingInQueue); // Wait for the gate to be opened. await descriptor.CompletionGate.WaitAsync(cancellationToken).ConfigureAwait(false); diff --git a/UET/Redpoint.Uba/DefaultUbaServerFactory.cs b/UET/Redpoint.Uba/DefaultUbaServerFactory.cs index 68339e05..5debf9d3 100644 --- a/UET/Redpoint.Uba/DefaultUbaServerFactory.cs +++ b/UET/Redpoint.Uba/DefaultUbaServerFactory.cs @@ -46,9 +46,11 @@ public IUbaServer CreateServer( Directory.CreateDirectory(rootStorageDirectoryPath); Directory.CreateDirectory(Path.GetDirectoryName(ubaTraceFilePath)!); - var loggingForwarder = new UbaLoggerForwarder(_ubaLoggerForwarderLogger); + // @note: UbaLoggerForwarder must be static and hold a singleton log. Otherwise we can run into a crash if + // DefaultUbaServer is finalized after UbaLoggerForwarder. + var ubaLogger = UbaLoggerForwarder.GetUbaLogger(_ubaLoggerForwarderLogger); var server = UbaServerDelayedImports.CreateServer( - loggingForwarder.Logger, + ubaLogger, maxWorkers, sendSize, receiveTimeoutSeconds, @@ -57,7 +59,7 @@ public IUbaServer CreateServer( _defaultUbaServerLogger, _processArgumentParser, _localProcessExecutor, - loggingForwarder, + ubaLogger, server, rootStorageDirectoryPath, ubaTraceFilePath); diff --git a/UET/Redpoint.Uba/IUbaServer.cs b/UET/Redpoint.Uba/IUbaServer.cs index f166240a..fca59900 100644 --- a/UET/Redpoint.Uba/IUbaServer.cs +++ b/UET/Redpoint.Uba/IUbaServer.cs @@ -10,7 +10,7 @@ /// /// We don't support the UBA mode whereby the server listens on a port, and remote agents (clients) initiated a connection back to the server. That's because most developer machines are behind a firewall that remote agents won't be able to initiate a connection through. /// - /// Instead, we only support the model by which the remote agents (clients) listen on a port, and the server actively connects to them using . + /// Instead, we only support the model by which the remote agents (clients) listen on a port, and the server actively connects to them using . /// public interface IUbaServer : IProcessExecutor, IAsyncDisposable { @@ -19,8 +19,22 @@ public interface IUbaServer : IProcessExecutor, IAsyncDisposable /// /// The IP address of the remote agent. /// The port of the remote agent. - /// If the remote agent has the '-crypto' parameter set, this should be the 32 character hexadecimal value representing the symmetric cryptographic key. /// True if the agent was successfully added. - bool AddRemoteAgent(string ip, int port, string crypto = ""); + bool AddRemoteAgent(string ip, int port); + + /// + /// The number of processes currently in the queue to start execution. + /// + long ProcessesPendingInQueue { get; } + + /// + /// The number of processes executing locally. + /// + long ProcessesExecutingLocally { get; } + + /// + /// The number of processes executing on remote agents. + /// + long ProcessesExecutingRemotely { get; } } } diff --git a/UET/Redpoint.Uba/UbaLoggerForwarder.cs b/UET/Redpoint.Uba/UbaLoggerForwarder.cs index 3102432b..a523f581 100644 --- a/UET/Redpoint.Uba/UbaLoggerForwarder.cs +++ b/UET/Redpoint.Uba/UbaLoggerForwarder.cs @@ -5,11 +5,10 @@ using System; using System.Runtime.InteropServices; - internal partial class UbaLoggerForwarder : IDisposable + internal partial class UbaLoggerForwarder { - private readonly ILogger _logger; - private readonly nint _ubaLogger; - private bool _hasDisposed; + private static ILogger? _logger; + private static nint? _ubaLogger; #region Library Imports @@ -36,8 +35,13 @@ private static partial void DestroyCallbackLogWriter( #endregion - public UbaLoggerForwarder(ILogger logger) + public static nint GetUbaLogger(ILogger logger) { + if (_ubaLogger.HasValue) + { + return _ubaLogger.Value; + } + _logger = logger; _ubaLogger = CreateCallbackLogWriter( BeginScope, @@ -47,9 +51,9 @@ public UbaLoggerForwarder(ILogger logger) { throw new InvalidOperationException("Unable to create UBA logger!"); } - } - public nint Logger => _ubaLogger; + return _ubaLogger.Value; + } private static void BeginScope() { @@ -59,7 +63,7 @@ private static void EndScope() { } - private void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint prefixLen) + private static void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint prefixLen) { string message; if (OperatingSystem.IsWindows()) @@ -74,46 +78,20 @@ private void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint pre switch (logEntryType) { case 0: - _logger.LogError(message); + _logger!.LogError(message); break; case 1: - _logger.LogWarning(message); + _logger!.LogWarning(message); break; case 2: - _logger.LogInformation(message); + _logger!.LogInformation(message); break; case 3: case 4: default: - _logger.LogDebug(message); + _logger!.LogDebug(message); break; } } - - protected virtual void Dispose(bool disposing) - { - if (!_hasDisposed) - { - if (_ubaLogger != nint.Zero) - { - DestroyCallbackLogWriter(_ubaLogger); - } - - _hasDisposed = true; - } - } - - ~UbaLoggerForwarder() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: false); - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } } } diff --git a/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs new file mode 100644 index 00000000..c0b7cfc3 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs @@ -0,0 +1,197 @@ +namespace UET.Commands.Internal.CMakeUbaRun +{ + using CMakeUba; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; + using Redpoint.Concurrency; + using Redpoint.GrpcPipes; + using Redpoint.ProcessExecution; + using System; + using System.CommandLine; + using System.CommandLine.Invocation; + using System.Threading.Tasks; + using UET.Services; + using static CMakeUba.CMakeUbaService; + using Semaphore = System.Threading.Semaphore; + + internal class CMakeUbaRunCommand + { + internal sealed class Options + { + public Option PreferRemote = new Option( + "--prefer-remote", + "If true, the CMake UBA server will prefer to run this command remotely."); + } + + public static Command CreateCMakeUbaRunCommand() + { + var options = new Options(); + var commandArguments = new Argument("command-and-arguments", "The command to run, followed by any arguments to pass to it."); + commandArguments.Arity = ArgumentArity.OneOrMore; + var command = new Command("cmake-uba-run"); + command.AddAllOptions(options); + command.AddArgument(commandArguments); + command.AddCommonHandler(options, services => + { + services.AddSingleton(commandArguments); + }); + return command; + } + + private sealed class CMakeUbaRunCommandInstance : ICommandInstance + { + private readonly IGrpcPipeFactory _grpcPipeFactory; + private readonly ILogger _logger; + private readonly ISelfLocation _selfLocation; + private readonly IProcessExecutor _processExecutor; + private readonly Options _options; + private readonly Argument _commandArguments; + + public CMakeUbaRunCommandInstance( + IGrpcPipeFactory grpcPipeFactory, + ILogger logger, + ISelfLocation selfLocation, + IProcessExecutor processExecutor, + Options options, + Argument commandArguments) + { + _grpcPipeFactory = grpcPipeFactory; + _logger = logger; + _selfLocation = selfLocation; + _processExecutor = processExecutor; + _options = options; + _commandArguments = commandArguments; + } + + private async Task IsServerRunningAsync() + { + var client = _grpcPipeFactory.CreateClient( + "cmake-uba-server", + GrpcPipeNamespace.User, + channel => new CMakeUbaServiceClient(channel)); + try + { + await client.PingServerAsync(new CMakeUba.EmptyMessage()).ConfigureAwait(false); + return true; + } + catch + { + return false; + } + } + + private async Task WaitForServerToStartAsync(CancellationToken serverUnexpectedExit) + { + while (true) + { + serverUnexpectedExit.ThrowIfCancellationRequested(); + + var client = _grpcPipeFactory.CreateClient( + "cmake-uba-server", + GrpcPipeNamespace.User, + channel => new CMakeUbaServiceClient(channel)); + try + { + await client.PingServerAsync(new CMakeUba.EmptyMessage(), cancellationToken: serverUnexpectedExit).ConfigureAwait(false); + return; + } + catch + { + await Task.Delay(1000, serverUnexpectedExit).ConfigureAwait(false); + } + } + } + + public async Task ExecuteAsync(InvocationContext context) + { + // Start the CMake UBA server on-demand if it's not already running. + if (!await IsServerRunningAsync().ConfigureAwait(false)) + { + using var globalServerSemaphore = new Semaphore(1, 1, "cmake-uba-grpc-server"); + globalServerSemaphore.WaitOne(); + try + { + if (!await IsServerRunningAsync().ConfigureAwait(false)) + { + var cts = new CancellationTokenSource(); + var ctsDisposed = new Gate(); + try + { + _ = Task.Run(async () => + { + try + { + _logger.LogInformation("Starting CMake UBA server. You will not see output or errors from it. To see diagnostic information, run 'uet internal cmake-uba-server' in a separate terminal."); + await _processExecutor.ExecuteAsync( + new ProcessSpecification + { + FilePath = _selfLocation.GetUetLocalLocation(), + Arguments = ["internal", "cmake-uba-server"] + }, + CaptureSpecification.Silence, + CancellationToken.None).ConfigureAwait(false); + } + finally + { + if (!ctsDisposed.Opened) + { + cts.Cancel(); + } + } + }); + + await WaitForServerToStartAsync(cts.Token).ConfigureAwait(false); + } + finally + { + ctsDisposed.Open(); + cts.Dispose(); + } + } + } + finally + { + globalServerSemaphore.Release(); + } + } + + // Create our gRPC client. + var client = _grpcPipeFactory.CreateClient( + "cmake-uba-server", + GrpcPipeNamespace.User, + channel => new CMakeUbaServiceClient(channel)); + + // Run the process. + var arguments = context.ParseResult.GetValueForArgument(_commandArguments)!; + var request = new CMakeUba.ProcessRequest + { + Path = arguments[0], + WorkingDirectory = Environment.CurrentDirectory, + PreferRemote = context.ParseResult.GetValueForOption(_options.PreferRemote), + }; + for (int i = 1; i < arguments.Length; i++) + { + request.Arguments.Add(new ProcessArgument { LogicalValue = arguments[i] }); + } + var response = client.ExecuteProcess(request, cancellationToken: context.GetCancellationToken()); + + // Stream the response values. + while (await response.ResponseStream.MoveNext(cancellationToken: context.GetCancellationToken()).ConfigureAwait(false)) + { + switch (response.ResponseStream.Current.DataCase) + { + case ProcessResponse.DataOneofCase.StandardOutputLine: + Console.WriteLine(response.ResponseStream.Current.StandardOutputLine); + break; + case ProcessResponse.DataOneofCase.ExitCode: + // @note: This is the last response; return the exit code. + return response.ResponseStream.Current.ExitCode; + } + } + + _logger.LogError("Did not receive exit code for process from CMake UBA server before the response stream ended."); + return 1; + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUba.proto b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUba.proto new file mode 100644 index 00000000..04677361 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUba.proto @@ -0,0 +1,31 @@ +syntax = 'proto3'; + +package CMakeUba; + +message ProcessArgument { + string logicalValue = 1; + string originalValue = 2; +} + +message ProcessRequest { + string path = 1; + repeated ProcessArgument arguments = 2; + string workingDirectory = 3; + bool preferRemote = 4; +} + +message ProcessResponse { + oneof data { + string standardOutputLine = 1; + // This will always be the last message from the response stream. + int32 exitCode = 2; + } +} + +message EmptyMessage { +} + +service CMakeUbaService { + rpc PingServer(EmptyMessage) returns (EmptyMessage) {} + rpc ExecuteProcess(ProcessRequest) returns (stream ProcessResponse) {} +} \ No newline at end of file diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs new file mode 100644 index 00000000..c973a3e3 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs @@ -0,0 +1,204 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using CMakeUba; + using Grpc.Core; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; + using Redpoint.Concurrency; + using Redpoint.GrpcPipes; + using Redpoint.ProcessExecution; + using Redpoint.ProcessExecution.Enumerable; + using Redpoint.Uba; + using Redpoint.Uba.Native; + using System; + using System.CommandLine; + using System.CommandLine.Invocation; + using System.Linq; + using System.Runtime.InteropServices; + using System.Threading.Tasks; + using UET.Commands.Config; + using static CMakeUba.CMakeUbaService; + + internal class CMakeUbaServerCommand + { + internal sealed class Options + { + } + + public static Command CreateCMakeUbaServerCommand() + { + var options = new Options(); + var command = new Command("cmake-uba-server"); + command.AddAllOptions(options); + command.AddCommonHandler(options, services => + { + services.AddSingleton(); + }); + return command; + } + + private sealed class CMakeUbaServerCommandInstance : CMakeUbaServiceBase, ICommandInstance + { + private readonly IUbaServerFactory _ubaServerFactory; + private readonly IGrpcPipeFactory _grpcPipeFactory; + private readonly IXmlConfigHelper _xmlConfigHelper; + private readonly ILogger _logger; + private CancellationToken? _commandCancellationToken; + private IUbaServer? _ubaServer; + + public CMakeUbaServerCommandInstance( + IUbaServerFactory ubaServerFactory, + IGrpcPipeFactory grpcPipeFactory, + IXmlConfigHelper xmlConfigHelper, + ILogger logger) + { + _ubaServerFactory = ubaServerFactory; + _grpcPipeFactory = grpcPipeFactory; + _xmlConfigHelper = xmlConfigHelper; + _logger = logger; + } + + public async Task ExecuteAsync(InvocationContext context) + { + // @todo: Make this configurable. + UbaNative.Init(@"C:\Program Files\Epic Games\UE_5.4\Engine\Binaries\Win64\UnrealBuildAccelerator"); + + // Track the timestamp that the server should automatically shut down. This gets moved + // forward into the future when we have work in the queue. + var shutdownTime = DateTimeOffset.UtcNow.AddMinutes(10); + + // Create the UBA server. + _logger.LogInformation("CMake UBA server is starting up..."); + var ubaRoot = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData)!, "Redpoint", "CMakeUBA"); + await using (_ubaServerFactory + .CreateServer( + ubaRoot, + Path.Combine(ubaRoot, "UbaTrace.log")) + .AsAsyncDisposable(out var ubaServer) + .ConfigureAwait(false)) + { + _ubaServer = ubaServer; + _commandCancellationToken = context.GetCancellationToken(); + + // Create the gRPC server. + var pipeName = $"cmake-uba-server"; + await using (_grpcPipeFactory + .CreateServer( + pipeName, + GrpcPipeNamespace.User, + this) + .AsAsyncDisposable(out var grpcServer) + .ConfigureAwait(false)) + { + // Start the gRPC server. + await grpcServer.StartAsync().ConfigureAwait(false); + _logger.LogInformation("Waiting for incoming requests over gRPC..."); + + // Start the Kubernetes coordinator. + using var coordinator = new UbaCoordinatorKubernetes( + Path.Combine(@"C:\Program Files\Epic Games\UE_5.4\Engine\Binaries\Win64\UnrealBuildAccelerator", RuntimeInformation.ProcessArchitecture.ToString().ToLowerInvariant()), + _logger, + UbaCoordinatorKubernetesConfig.ReadFromBuildConfigurationXml(_xmlConfigHelper)); + await coordinator.InitAsync(ubaServer).ConfigureAwait(false); + try + { + coordinator.Start(); + + // Every second, evaluate how many processes are in queue / executing locally / executing remotely, + // and use this information to provision agents on Kubernetes as needed. + try + { + while (true) + { + _logger.LogDebug($"Pending: {ubaServer.ProcessesPendingInQueue} Executing Locally: {ubaServer.ProcessesExecutingLocally} Executing Remotely: {ubaServer.ProcessesExecutingRemotely}"); + if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) + { + shutdownTime = DateTimeOffset.UtcNow.AddMinutes(10); + } + + if (shutdownTime < DateTimeOffset.UtcNow) + { + _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); + return 0; + } + + await Task.Delay(1000, context.GetCancellationToken()).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + } + + // Stop the gRPC server. + _logger.LogInformation("CMake UBA server is shutting down..."); + await grpcServer.StopAsync().ConfigureAwait(false); + } + finally + { + await coordinator.CloseAsync().ConfigureAwait(false); + } + } + } + return 0; + } + + public override async Task ExecuteProcess(ProcessRequest request, IServerStreamWriter responseStream, ServerCallContext context) + { + if (!_commandCancellationToken.HasValue || + _ubaServer == null) + { + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + StandardOutputLine = "(The CMake UBA server is not in a valid state!)", + }).ConfigureAwait(false); + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + ExitCode = 99999, + }).ConfigureAwait(false); + return; + } + + var specification = new UbaProcessSpecification + { + FilePath = request.Path, + Arguments = request.Arguments.Select(x => new LogicalProcessArgument(x.LogicalValue)).ToArray(), + WorkingDirectory = request.WorkingDirectory, + PreferRemote = request.PreferRemote, + }; + + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_commandCancellationToken.Value, context.CancellationToken); + + await foreach (var response in _ubaServer.ExecuteAsync(specification, context.CancellationToken)) + { + switch (response) + { + case StandardOutputResponse stdout: + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + StandardOutputLine = stdout.Data, + }).ConfigureAwait(false); + break; + case StandardErrorResponse stderr: + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + StandardOutputLine = stderr.Data, + }).ConfigureAwait(false); + break; + case ExitCodeResponse exitCode: + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + ExitCode = exitCode.ExitCode, + }).ConfigureAwait(false); + // @note: This is the last message; we're done. + return; + } + } + } + + public override Task PingServer(EmptyMessage request, ServerCallContext context) + { + return Task.FromResult(new EmptyMessage()); + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesConstants.cs b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesConstants.cs new file mode 100644 index 00000000..df5e49f0 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesConstants.cs @@ -0,0 +1,7 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + internal static class KubernetesConstants + { + public const ulong MemoryBytesPerCore = 1610612736uL; + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeState.cs b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeState.cs new file mode 100644 index 00000000..90d33f27 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeState.cs @@ -0,0 +1,106 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using k8s.Models; + using System; + using System.Collections.Generic; + using System.Linq; + + internal class KubernetesNodeState + { + public string? NodeId; + public V1Node? KubernetesNode; + public List KubernetesPods = new List(); + public readonly List AllocatedBlocks = new List(); + + public ulong MemoryTotal + { + get + { + return KubernetesNode!.Status.Capacity["memory"].ToUInt64(); + } + } + + public ulong MemoryNonUba + { + get + { + return KubernetesPods + .Where(x => x.GetLabel("uba") != "true") + .SelectMany(x => x.Spec.Containers) + .Select(x => x?.Resources?.Requests != null && x.Resources.Requests.TryGetValue("memory", out var quantity) ? quantity.ToUInt64() : 0) + .Aggregate((a, b) => a + b); + } + } + + public ulong MemoryAllocated + { + get + { + return AllocatedBlocks + .Select(x => (ulong)x.AllocatedCores * KubernetesConstants.MemoryBytesPerCore) + .DefaultIfEmpty((ulong)0) + .Aggregate((a, b) => a + b); + } + } + + public ulong MemoryAvailable + { + get + { + var memory = MemoryTotal; + memory -= MemoryNonUba; + memory -= MemoryAllocated; + return Math.Max(0, memory); + } + } + + public double CoresTotal + { + get + { + return Math.Floor(KubernetesNode!.Status.Capacity["cpu"].ToDouble()); + } + } + + public double CoresNonUba + { + get + { + return KubernetesPods + .Where(x => x.GetLabel("uba") != "true") + .SelectMany(x => x.Spec.Containers) + .Select(x => x?.Resources?.Requests != null && x.Resources.Requests.TryGetValue("cpu", out var quantity) ? quantity.ToDouble() : 0) + .Sum(); + } + } + + public double CoresAllocated + { + get + { + return AllocatedBlocks.Sum(x => x.AllocatedCores); + } + } + + public int CoresAvailable + { + get + { + var cores = CoresTotal; + cores -= CoresNonUba; + cores -= CoresAllocated; + return (int)Math.Max(0, Math.Floor(cores)); + } + } + + public int CoresAllocatable + { + get + { + var realCoresAvailable = CoresAvailable; + var memoryConstrainedCoresAvailable = MemoryAvailable / KubernetesConstants.MemoryBytesPerCore; + return Math.Min(realCoresAvailable, (int)memoryConstrainedCoresAvailable); + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeWorker.cs b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeWorker.cs new file mode 100644 index 00000000..197621b8 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeWorker.cs @@ -0,0 +1,13 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using k8s.Models; + + internal class KubernetesNodeWorker + { + public V1Pod? KubernetesPod; + public V1Service? KubernetesService; + public int AllocatedCores; + public string? UbaHost; + public int? UbaPort; + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs new file mode 100644 index 00000000..68b2185b --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs @@ -0,0 +1,725 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using k8s; + using k8s.Autorest; + using k8s.Models; + using System.Net.Sockets; + using System.Net; + using System.Runtime.CompilerServices; + using Microsoft.Extensions.Logging; + using Redpoint.Uba; + using Redpoint.Hashing; + using System.Globalization; + using UET.Commands.Config; + + internal class UbaCoordinatorKubernetes : IDisposable + { + private readonly string _ubaRootPath; + private readonly ILogger _logger; + private readonly UbaCoordinatorKubernetesConfig _ubaKubeConfig; + private CancellationTokenSource? _cancellationSource; + private string? _id; + private string? _ubaAgentRemotePath; + private string? _ubaAgentHash; + private Kubernetes? _client; + private Timer? _timer; + private const int _timerPeriod = 1000; + private Dictionary _kubernetesNodes; + private IUbaServer? _ubaServer; + + private string KubernetesNamespace + { + get + { + return _ubaKubeConfig?.Namespace ?? "default"; + } + } + + public UbaCoordinatorKubernetes( + string ubaRootPath, + ILogger logger, + UbaCoordinatorKubernetesConfig ubaKubeConfig) + { + _ubaRootPath = ubaRootPath; + _logger = logger; + _ubaKubeConfig = ubaKubeConfig; + _cancellationSource = new CancellationTokenSource(); + _kubernetesNodes = new Dictionary(); + } + + public async Task InitAsync(IUbaServer ubaServer) + { + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.Namespace)) + { + // If this is not set, then the developer probably wants to use Horde instead. + return; + } + + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.Context)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> Context in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbServer)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbServer in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbShare)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbShare in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbUsername)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbUsername in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbPassword)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbPassword in BuildConfiguration.xml."); + return; + } + + _logger.LogInformation("Kubernetes UBA: InitAsync"); + + try + { + // Set the cancellation source for cancelling the pod. + _cancellationSource?.Cancel(); + _cancellationSource?.Dispose(); + _cancellationSource = new CancellationTokenSource(); + + // Clear out current nodes state. + _kubernetesNodes.Clear(); + + // Generate an ID to identify the jobs we're running. + _id = Guid.NewGuid().ToString(); + + // Copy the UbaAgent file to the UBA root directory. + var ubaFile = new FileInfo(Path.Combine(_ubaRootPath, "UbaAgent.exe")); + var agentHash = (await Hash.XxHash64OfFileAsync(ubaFile.FullName, CancellationToken.None).ConfigureAwait(false)).Hash.ToString(CultureInfo.InvariantCulture); + var ubaRemoteDir = new DirectoryInfo(Path.Combine(@$"\\{_ubaKubeConfig.SmbServer}\{_ubaKubeConfig.SmbShare}\Uba", $"cmake{agentHash}"))!; + var ubaRemoteFile = new FileInfo(Path.Combine(ubaRemoteDir.FullName, "UbaAgent.exe")); + try + { + Directory.CreateDirectory(ubaRemoteDir.FullName); + File.Copy(ubaFile.FullName, ubaRemoteFile.FullName); + } + catch + { + // File already copied. + } + _ubaAgentRemotePath = ubaRemoteFile.FullName; + _ubaAgentHash = agentHash.ToString(); + + // Set up Kubernetes client and ensure we can connect to the cluster. + var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(currentContext: _ubaKubeConfig.Context); + _client = new Kubernetes(config); + await _client.ListNamespacedPodAsync(KubernetesNamespace, cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + + // Track the executor so we can add clients to it. + _ubaServer = ubaServer; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Kubernetes UBA: InitAsync failed: {ex.Message}"); + } + } + + private async Task DeleteServiceAndPodAsync(V1Pod pod, CancellationToken? cancellationToken) + { + try + { + await _client.DeleteNamespacedServiceAsync( + pod.Name(), + pod.Namespace(), + cancellationToken: cancellationToken ?? _cancellationSource?.Token ?? CancellationToken.None) + .ConfigureAwait(false); + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.NotFound) + { + } + try + { + await _client.DeleteNamespacedPodAsync( + pod.Name(), + pod.Namespace(), + cancellationToken: cancellationToken ?? _cancellationSource?.Token ?? CancellationToken.None) + .ConfigureAwait(false); + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.NotFound) + { + } + if (_kubernetesNodes.TryGetValue(pod.Spec.NodeName, out var node)) + { + node.AllocatedBlocks.RemoveAll(x => x.KubernetesPod.Name() == pod.Name()); + } + } + + private async IAsyncEnumerable EnumerateNodePodAsync(string nodeName, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListPodForAllNamespacesAsync( + fieldSelector: $"spec.nodeName={nodeName}", + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + private async IAsyncEnumerable EnumerateNamespacedPodAsync(string labelSelector, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListNamespacedPodAsync( + KubernetesNamespace, + labelSelector: labelSelector, + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + private async IAsyncEnumerable EnumerateNamespacedServiceAsync(string labelSelector, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListNamespacedServiceAsync( + KubernetesNamespace, + labelSelector: labelSelector, + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + private async IAsyncEnumerable EnumerateNodesAsync(string labelSelector, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListNodeAsync( + labelSelector: labelSelector, + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + if (item.Spec.Taints != null && ( + item.Spec.Taints.Any(x => x.Effect == "NoSchedule") || + item.Spec.Taints.Any(x => x.Effect == "NoExecute"))) + { + continue; + } + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + [System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "This call is not used for cryptography.")] + public void Start() + { + _logger.LogInformation("Kubernetes UBA: Start"); + + _timer = new Timer(async _ => + { + _timer?.Change(Timeout.Infinite, Timeout.Infinite); + + var stopping = false; + try + { + // If we're cancelled, stop. + if (_cancellationSource == null || _cancellationSource.IsCancellationRequested) + { + _logger.LogInformation("Kubernetes UBA: Loop: Cancelled"); + stopping = true; + return; + } + + // If the client is unavailable, keep looping unless we're done. + if (_client == null) + { + _logger.LogInformation("Kubernetes UBA: Loop: Not configured to use Kubernetes."); + return; + } + + // Remove any Kubernetes pods that are complete, or failed to start. + await foreach (var pod in EnumerateNamespacedPodAsync("uba=true", _cancellationSource.Token)) + { + if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") + { + _logger.LogInformation($"Removing Kubernetes block: {pod.Name()} (cleanup)"); + await DeleteServiceAndPodAsync(pod, _cancellationSource.Token).ConfigureAwait(false); + + foreach (var kv in _kubernetesNodes) + { + kv.Value.AllocatedBlocks.RemoveAll(candidatePod => + candidatePod.KubernetesPod != null && + candidatePod.KubernetesPod.Name() == pod.Name()); + } + } + } + + // Synchronise Kubernetes nodes with our known node list. + var knownNodeNames = new HashSet(); + await foreach (var node in EnumerateNodesAsync("kubernetes.io/os=windows", _cancellationSource.Token)) + { + knownNodeNames.Add(node.Name()); + if (_kubernetesNodes.TryGetValue(node.Name(), out var existingEntry)) + { + existingEntry.KubernetesNode = node; + } + else + { + existingEntry = new KubernetesNodeState + { + NodeId = node.Name(), + KubernetesNode = node, + }; + _kubernetesNodes.Add(node.Name(), existingEntry); + } + var newPodsList = new List(); + await foreach (var pod in EnumerateNodePodAsync(node.Name(), _cancellationSource.Token)) + { + newPodsList.Add(pod); + } + existingEntry.KubernetesPods = newPodsList; + } + + // Determine the threshold over local allocation. + double desiredCpusThreshold = 0; + + // Allocate cores from Kubernetes until we're satisified. + while (true) + { + // Check how many additional cores we need to allocate from the cluster. + double desiredCpus = _ubaServer!.ProcessesPendingInQueue; + desiredCpus -= _kubernetesNodes.SelectMany(x => x.Value.AllocatedBlocks).Sum(y => y.AllocatedCores); + if (desiredCpus <= desiredCpusThreshold) + { + _logger.LogDebug($"Kubernetes UBA: Loop: Skipping (desired CPU {desiredCpus} <= {desiredCpusThreshold})"); + break; + } + + // Remove any Kubernetes pods that are complete, or failed to start. + await foreach (var pod in EnumerateNamespacedPodAsync($"uba=true,uba.queueId={_id}", _cancellationSource.Token)) + { + if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") + { + _logger.LogInformation($"Removing Kubernetes block: {pod.Name()}"); + await DeleteServiceAndPodAsync(pod, _cancellationSource.Token).ConfigureAwait(false); + } + } + + // Compute the biggest size we can allocate. + var blockSize = _kubernetesNodes + .Select(x => x.Value.CoresAllocatable) + .Where(x => x != 0) + .DefaultIfEmpty(0) + .Max(); + blockSize = Math.Min(blockSize, (int)Math.Floor(desiredCpus)); + if (blockSize == 0) + { + // No more cores to allocate from. + break; + } + + // Pick an available node, weighted by the available cores. + var selectedNode = _kubernetesNodes + .Select(x => x.Value) + .SelectMany(x => + { + var blocks = new List(); + _logger.LogInformation($"Kubernetes UBA: Loop: Node '{x.NodeId}' has\nCPU: {x.CoresTotal} total, {x.CoresNonUba} non-UBA, {x.CoresAllocated} allocated, {x.CoresAvailable} available, {x.CoresAllocatable} allocatable.\nMemory: {x.MemoryTotal} total, {x.MemoryNonUba} non-UBA, {x.MemoryAllocated} allocated, {x.MemoryAvailable} available.\nBlocks: {x.AllocatedBlocks.Count} allocated."); + for (var c = 0; c < x.CoresAllocatable; c += blockSize) + { + if (c + blockSize <= x.CoresAllocatable) + { + blocks.Add(x); + } + } + return blocks; + }) + .OrderBy(_ => Random.Shared.NextInt64()) + .FirstOrDefault(); + + // If we don't have any available node, break out of the loop node. + if (selectedNode == null) + { + break; + } + + // Generate the next block ID for this node. + int highestBlockId = 0; + await foreach (var pod in EnumerateNamespacedPodAsync($"uba.nodeId={selectedNode.NodeId}", _cancellationSource.Token)) + { + var thisBlockId = int.Parse(pod.GetLabel("uba.blockId"), CultureInfo.InvariantCulture); + highestBlockId = Math.Max(highestBlockId, thisBlockId); + } + var nextBlockId = highestBlockId + 1; + + // Create the pod and service. + var name = $"uba-{selectedNode.NodeId}-{nextBlockId}"; + _logger.LogInformation($"Allocating Kubernetes block: {name}"); + var labels = new Dictionary + { + { "uba", "true" }, + { "uba.nodeId", selectedNode.NodeId! }, + { "uba.blockId", nextBlockId.ToString(CultureInfo.InvariantCulture) }, + { "uba.queueId", _id! }, + }; + var kubernetesPod = await _client.CreateNamespacedPodAsync( + new V1Pod + { + Metadata = new V1ObjectMeta + { + Name = name, + Labels = labels, + }, + Spec = new V1PodSpec + { + AutomountServiceAccountToken = false, + NodeSelector = new Dictionary + { + { "kubernetes.io/os", "windows" }, + }, + RestartPolicy = "Never", + TerminationGracePeriodSeconds = 0, + Volumes = new List + { + new V1Volume + { + Name = "uba-storage", + HostPath = new V1HostPathVolumeSource + { + Path = @$"C:\Uba\{name}", + } + } + }, + Containers = new List + { + new V1Container + { + Image = "mcr.microsoft.com/powershell:lts-windowsservercore-ltsc2022", + ImagePullPolicy = "IfNotPresent", + Name = "uba-agent", + Resources = new V1ResourceRequirements + { + Requests = new Dictionary + { + { "cpu", new ResourceQuantity(blockSize.ToString(CultureInfo.InvariantCulture)) }, + // @note: We don't set 'memory' here because it can be finicky to get the container to get allocated. + }, + Limits = new Dictionary + { + { "cpu", new ResourceQuantity(blockSize.ToString(CultureInfo.InvariantCulture)) }, + // @note: We don't set 'memory' here because it can be finicky to get the container to get allocated. + }, + }, + SecurityContext = new V1SecurityContext + { + WindowsOptions = new V1WindowsSecurityContextOptions + { + RunAsUserName = "ContainerAdministrator", + } + }, + Command = new List + { + @"C:\Program Files\PowerShell\latest\pwsh.exe", + }, + Args = new List + { + "-Command", + $@"Start-Sleep -Seconds 1; Write-Host ""Mapping network drive...""; C:\Windows\system32\net.exe use Z: \\{_ubaKubeConfig.SmbServer}\{_ubaKubeConfig.SmbShare}\Uba /USER:{_ubaKubeConfig.SmbUsername} {_ubaKubeConfig.SmbPassword}; Write-Host ""Copying UBA agent...""; Copy-Item Z:\cmake{_ubaAgentHash}\UbaAgent.exe C:\UbaAgent.exe; Write-Host ""Running UBA agent...""; C:\UbaAgent.exe -Verbose -Listen=7000 -NoPoll -listenTimeout=120 -ProxyPort=7001 -Dir=C:\UbaData -MaxIdle=15 -MaxCpu={blockSize}; Write-Host ""UBA agent exited with exit code: $LastExitCode""; exit $LastExitCode;", + }, + Ports = new List + { + new V1ContainerPort + { + ContainerPort = 7000, + Protocol = "TCP", + }, + new V1ContainerPort + { + ContainerPort = 7001, + Protocol = "TCP", + } + }, + VolumeMounts = new List + { + new V1VolumeMount + { + Name = "uba-storage", + MountPath = @"C:\UbaData", + } + } + } + } + } + }, + KubernetesNamespace, + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + V1Service kubernetesService; + createService: + try + { + kubernetesService = await _client.CreateNamespacedServiceAsync( + new V1Service + { + Metadata = new V1ObjectMeta + { + Name = name, + Labels = labels, + }, + Spec = new V1ServiceSpec + { + Selector = labels, + Type = "NodePort", + Ports = new List + { + new V1ServicePort + { + Name = "uba", + Port = 7000, + TargetPort = new IntstrIntOrString("7000"), + Protocol = "TCP", + }, + new V1ServicePort + { + Name = "uba-proxy", + Port = 7001, + TargetPort = new IntstrIntOrString("7001"), + Protocol = "TCP", + }, + }, + }, + }, + KubernetesNamespace, + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.Conflict) + { + await _client.DeleteNamespacedServiceAsync(name, KubernetesNamespace, cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + goto createService; + } + + // Track the worker. + var worker = new KubernetesNodeWorker + { + KubernetesPod = kubernetesPod, + KubernetesService = kubernetesService, + AllocatedCores = blockSize, + }; + selectedNode.AllocatedBlocks.Add(worker); + + // In the background, wait for the worker to become ready and allocate it to UBA. + _ = Task.Run(async () => + { + var didRegister = false; + try + { + // Wait for the service to have node ports. + while (worker.UbaHost == null || worker.UbaPort == null) + { + _cancellationSource.Token.ThrowIfCancellationRequested(); + + // Refresh service status. + worker.KubernetesService = await _client.ReadNamespacedServiceAsync( + worker.KubernetesService.Name(), + worker.KubernetesService.Namespace(), + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + + // If a port doesn't have NodePort, it's not allocated yet. + if (worker.KubernetesService.Spec.Ports.Any(x => x.NodePort == null)) + { + await Task.Delay(1000).ConfigureAwait(false); + continue; + } + + // We should have the node port now. + worker.UbaHost = selectedNode.KubernetesNode!.Status.Addresses + .Where(x => x.Type == "InternalIP") + .Select(x => x.Address) + .First(); + worker.UbaPort = worker.KubernetesService.Spec.Ports.First(x => x.Name == "uba").NodePort!.Value; + break; + } + + // Wait for the pod to start. + var secondsElapsed = 0; + while (worker.KubernetesPod.Status.Phase == "Pending" && secondsElapsed < 30) + { + await Task.Delay(1000).ConfigureAwait(false); + worker.KubernetesPod = await _client.ReadNamespacedPodAsync( + worker.KubernetesPod.Name(), + worker.KubernetesPod.Namespace(), + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + secondsElapsed++; + } + if (worker.KubernetesPod.Status.Phase == "Pending") + { + // Timed out. + _logger.LogWarning($"Kubernetes timed out while allocating: {name}"); + return; + } + + // Add the worker to UBA. + _logger.LogInformation($"Kubernetes block is ready: {name} ({worker.UbaHost}:{worker.UbaPort.Value})"); + var didAddAgent = false; + for (int i = 0; i < 30; i++) + { + if (_ubaServer.AddRemoteAgent(worker.UbaHost, worker.UbaPort.Value)) + { + didAddAgent = true; + break; + } + await Task.Delay(1000).ConfigureAwait(false); + } + if (!didAddAgent) + { + _logger.LogError("Unable to register Kubernetes UBA agent with UBA library!"); + } + else + { + didRegister = true; + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"Exception in Kubernetes wait loop: {ex}"); + } + finally + { + if (!didRegister) + { + // This pod/service could not be allocated. The main loop can then try to allocate again. + await DeleteServiceAndPodAsync(worker.KubernetesPod, _cancellationSource.Token).ConfigureAwait(false); + } + } + }); + } + } + catch (OperationCanceledException ex) when (_cancellationSource != null && ex.CancellationToken == _cancellationSource.Token) + { + // Expected exception. + stopping = true; + } + catch (TaskCanceledException) + { + // Expected exception. + stopping = true; + } + catch (SocketException) + { + _logger.LogWarning("Unable to reach Kubernetes cluster."); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Exception in Kubernetes loop: {ex}"); + } + finally + { + if (!stopping) + { + _timer?.Change(_timerPeriod, Timeout.Infinite); + } + } + + }, null, 0, _timerPeriod); + } + + public void Stop() + { + _cancellationSource?.Cancel(); + } + + public async Task CloseAsync() + { + _cancellationSource?.Cancel(); + + if (_client != null) + { + try + { + // Remove any Kubernetes pods that are complete. + await foreach (var pod in EnumerateNamespacedPodAsync("uba=true", CancellationToken.None)) + { + if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") + { + _logger.LogInformation($"Removing Kubernetes block: {pod.Name()} (cleanup on close)"); + await DeleteServiceAndPodAsync(pod, CancellationToken.None).ConfigureAwait(false); + } + } + + // Remove any Kubernetes pods owned by us. + if (!string.IsNullOrWhiteSpace(_id)) + { + await foreach (var pod in EnumerateNamespacedPodAsync($"uba.queueId={_id}", CancellationToken.None)) + { + _logger.LogInformation($"Removing Kubernetes block: {pod.Name()} (unconditional)"); + await DeleteServiceAndPodAsync(pod, CancellationToken.None).ConfigureAwait(false); + } + } + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.Forbidden) + { + // Ignore this transient error on shutdown. + } + catch (SocketException) + { + // Ignore this transient error on shutdown. + } + catch (HttpRequestException) + { + // Ignore this transient error on shutdown. + } + } + } + + public void Dispose() + { + Stop(); + CloseAsync().Wait(); + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _cancellationSource?.Dispose(); + _cancellationSource = null; + _timer?.Dispose(); + _timer = null; + _client?.Dispose(); + _client = null; + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetesConfig.cs b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetesConfig.cs new file mode 100644 index 00000000..eea911fb --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetesConfig.cs @@ -0,0 +1,30 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using System.Xml; + using UET.Commands.Config; + + internal class UbaCoordinatorKubernetesConfig + { + public string? Namespace { get; set; } + public string? Context { get; set; } + public string? SmbServer { get; set; } + public string? SmbShare { get; set; } + public string? SmbUsername { get; set; } + public string? SmbPassword { get; set; } + + public static UbaCoordinatorKubernetesConfig ReadFromBuildConfigurationXml(IXmlConfigHelper xmlConfigHelper) + { + var document = new XmlDocument(); + document.Load(Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Unreal Engine", "UnrealBuildTool", "BuildConfiguration.xml")); + + var config = new UbaCoordinatorKubernetesConfig(); + config.Namespace = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "Namespace"]); + config.Context = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "Context"]); + config.SmbServer = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbServer"]); + config.SmbShare = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbShare"]); + config.SmbUsername = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbUsername"]); + config.SmbPassword = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbPassword"]); + return config; + } + } +} diff --git a/UET/uet/Commands/Internal/InternalCommand.cs b/UET/uet/Commands/Internal/InternalCommand.cs index 82f60c0b..67b14b43 100644 --- a/UET/uet/Commands/Internal/InternalCommand.cs +++ b/UET/uet/Commands/Internal/InternalCommand.cs @@ -30,6 +30,8 @@ using UET.Commands.Internal.TestUba; using UET.Commands.Internal.EngineCheckout; using UET.Commands.Internal.InstallPackage; + using UET.Commands.Internal.CMakeUbaServer; + using UET.Commands.Internal.CMakeUbaRun; internal sealed class InternalCommand { @@ -67,6 +69,8 @@ public static Command CreateInternalCommand(HashSet globalCommands) RemoteZfsTestCommand.CreateRemoteZfsTestCommand(), EngineCheckoutCommand.CreateEngineCheckoutCommand(), InstallPackageCommand.CreateInstallPackageCommand(), + CMakeUbaServerCommand.CreateCMakeUbaServerCommand(), + CMakeUbaRunCommand.CreateCMakeUbaRunCommand(), }; var command = new Command("internal", "Internal commands used by UET when it needs to call back into itself."); diff --git a/UET/uet/uet.csproj b/UET/uet/uet.csproj index ff9973bc..a22c3a27 100644 --- a/UET/uet/uet.csproj +++ b/UET/uet/uet.csproj @@ -23,10 +23,13 @@ + + + @@ -99,6 +102,7 @@ + Both From d8183082803000cbe03eaf8eb28efb536ad22027 Mon Sep 17 00:00:00 2001 From: June Rhodes Date: Thu, 26 Sep 2024 19:07:10 +1000 Subject: [PATCH 2/7] Make server auto-close after 60 seconds idle --- .../CMakeUbaRun/CMakeUbaRunCommand.cs | 2 +- .../CMakeUbaServer/CMakeUbaServerCommand.cs | 29 +++++++++++++------ 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs index c0b7cfc3..1e2f5609 100644 --- a/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs +++ b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs @@ -126,7 +126,7 @@ await _processExecutor.ExecuteAsync( new ProcessSpecification { FilePath = _selfLocation.GetUetLocalLocation(), - Arguments = ["internal", "cmake-uba-server"] + Arguments = ["internal", "cmake-uba-server", "--auto-close"] }, CaptureSpecification.Silence, CancellationToken.None).ConfigureAwait(false); diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs index c973a3e3..d1c11531 100644 --- a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs @@ -23,6 +23,9 @@ internal class CMakeUbaServerCommand { internal sealed class Options { + public Option AutoClose = new Option( + "--auto-close", + "If true, the CMake UBA server will automatically shut down after idling for 60 seconds."); } public static Command CreateCMakeUbaServerCommand() @@ -43,6 +46,7 @@ private sealed class CMakeUbaServerCommandInstance : CMakeUbaServiceBase, IComma private readonly IGrpcPipeFactory _grpcPipeFactory; private readonly IXmlConfigHelper _xmlConfigHelper; private readonly ILogger _logger; + private readonly Options _options; private CancellationToken? _commandCancellationToken; private IUbaServer? _ubaServer; @@ -50,22 +54,26 @@ public CMakeUbaServerCommandInstance( IUbaServerFactory ubaServerFactory, IGrpcPipeFactory grpcPipeFactory, IXmlConfigHelper xmlConfigHelper, - ILogger logger) + ILogger logger, + Options options) { _ubaServerFactory = ubaServerFactory; _grpcPipeFactory = grpcPipeFactory; _xmlConfigHelper = xmlConfigHelper; _logger = logger; + _options = options; } public async Task ExecuteAsync(InvocationContext context) { + var autoClose = context.ParseResult.GetValueForOption(_options.AutoClose); + // @todo: Make this configurable. UbaNative.Init(@"C:\Program Files\Epic Games\UE_5.4\Engine\Binaries\Win64\UnrealBuildAccelerator"); // Track the timestamp that the server should automatically shut down. This gets moved // forward into the future when we have work in the queue. - var shutdownTime = DateTimeOffset.UtcNow.AddMinutes(10); + var shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); // Create the UBA server. _logger.LogInformation("CMake UBA server is starting up..."); @@ -111,15 +119,18 @@ public async Task ExecuteAsync(InvocationContext context) while (true) { _logger.LogDebug($"Pending: {ubaServer.ProcessesPendingInQueue} Executing Locally: {ubaServer.ProcessesExecutingLocally} Executing Remotely: {ubaServer.ProcessesExecutingRemotely}"); - if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) - { - shutdownTime = DateTimeOffset.UtcNow.AddMinutes(10); - } - if (shutdownTime < DateTimeOffset.UtcNow) + if (autoClose) { - _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); - return 0; + if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) + { + shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); + } + if (shutdownTime < DateTimeOffset.UtcNow) + { + _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); + return 0; + } } await Task.Delay(1000, context.GetCancellationToken()).ConfigureAwait(false); From c0384c5d4e19f6ca1b57d9cfa811207d1014e236 Mon Sep 17 00:00:00 2001 From: June Rhodes Date: Thu, 26 Sep 2024 21:05:03 +1000 Subject: [PATCH 3/7] CMake UBA now mostly works --- UET/Redpoint.Uba/DefaultUbaServer.cs | 420 ++++++++++-------- UET/Redpoint.Uba/DefaultUbaServerFactory.cs | 6 +- UET/Redpoint.Uba/UbaProcessDescriptor.cs | 2 + .../CMakeUbaRun/CMakeUbaRunCommand.cs | 95 +--- .../CMakeUbaServer/CMakeUbaServerCommand.cs | 30 +- .../UbaCoordinatorKubernetes.cs | 20 +- 6 files changed, 273 insertions(+), 300 deletions(-) diff --git a/UET/Redpoint.Uba/DefaultUbaServer.cs b/UET/Redpoint.Uba/DefaultUbaServer.cs index 832f7864..264c601c 100644 --- a/UET/Redpoint.Uba/DefaultUbaServer.cs +++ b/UET/Redpoint.Uba/DefaultUbaServer.cs @@ -17,7 +17,6 @@ internal partial class DefaultUbaServer : IUbaServer { private readonly ILogger _logger; private readonly IProcessArgumentParser _processArgumentParser; - private readonly IProcessExecutor _localProcessExecutor; private readonly nint _ubaLogger; private readonly CancellationTokenSource _localWorkerCancellationTokenSource; private nint _server; @@ -118,6 +117,14 @@ private static partial nint CreateProcessStartInfo( private static partial void DestroyProcessStartInfo( nint processStartInfo); + [LibraryImport("UbaHost", StringMarshalling = StringMarshalling.Custom, StringMarshallingCustomType = typeof(UbaStringMarshaller))] + [DefaultDllImportSearchPaths(DllImportSearchPath.SafeDirectories)] + private static partial nint SessionServer_RunProcess( + nint sessionServer, + nint processStartInfo, + [MarshalAs(UnmanagedType.I1)] bool async, + [MarshalAs(UnmanagedType.I1)] bool enableDetour); + [LibraryImport("UbaHost", StringMarshalling = StringMarshalling.Custom, StringMarshallingCustomType = typeof(UbaStringMarshaller))] [DefaultDllImportSearchPaths(DllImportSearchPath.SafeDirectories)] private static partial nint SessionServer_RunProcessRemote( @@ -173,7 +180,6 @@ private static partial void SessionServer_SetRemoteProcessReturned( public DefaultUbaServer( ILogger logger, IProcessArgumentParser processArgumentParser, - IProcessExecutor localProcessExecutor, nint ubaLogger, nint server, string rootStorageDirectoryPath, @@ -181,7 +187,6 @@ public DefaultUbaServer( { _logger = logger; _processArgumentParser = processArgumentParser; - _localProcessExecutor = localProcessExecutor; _ubaLogger = ubaLogger; _localWorkerCancellationTokenSource = new CancellationTokenSource(); @@ -247,220 +252,252 @@ public bool AddRemoteAgent(string ip, int port) public long ProcessesExecutingLocally => Interlocked.Read(ref _processesExecutingLocally); public long ProcessesExecutingRemotely => Interlocked.Read(ref _processesExecutingRemotely); - private void OnRemoteProcessAvailable(nint userData) + private async Task RunProcessAsync( + UbaProcessDescriptor descriptor, + Func createProcessOnServer, + bool isRemoteExecution) { - // Start a background task to queue a remote process (since this function can't be async). - _ = Task.Run(async () => + var isRequeued = false; + try { - // Before we attempt to actually dequeue, check that there is something to dequeue, since UBA - // frequently calls OnRemoteProcessAvailable whileever slots are free (and not just when a slot - // opens up). - if (Interlocked.Read(ref _processesPendingInQueue) == 0) + // Create the gate that we can wait on until the process exits. + var exitedGate = new Gate(); + ExitCallback exited = (nint userdata, nint handle) => { - return; + exitedGate.Open(); + }; + + // Try to set the description. + var description = "Unknown"; + description = Path.GetFileName(descriptor.ProcessSpecification.Arguments.Last().LogicalValue); + + // Create the process start info. + var arguments = _processArgumentParser.JoinArguments(descriptor.ProcessSpecification.Arguments); + arguments = arguments.Replace(@"-DCMAKE_CFG_INTDIR=""Release""", @"-DCMAKE_CFG_INTDIR=\""Release\""", StringComparison.Ordinal); + arguments = arguments.Replace(@"-DCMAKE_CFG_INTDIR=""Debug""", @"-DCMAKE_CFG_INTDIR=\""Debug\""", StringComparison.Ordinal); + var processStartInfo = CreateProcessStartInfo( + descriptor.ProcessSpecification.FilePath, + arguments, + descriptor.ProcessSpecification.WorkingDirectory ?? Environment.CurrentDirectory, + description, + (uint)ProcessPriorityClass.Normal, + int.MaxValue, + true, + string.Empty, + exited); + if (processStartInfo == nint.Zero) + { + throw new InvalidOperationException("Unable to create UBA process start info!"); } - _logger.LogInformation("Received OnRemoteProcessAvailable, pulling next available process to run..."); - - // Grab the next process to run. - var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false); - Interlocked.Decrement(ref _processesPendingInQueue); - Interlocked.Increment(ref _processesExecutingRemotely); - - // Run the process remotely. - _logger.LogInformation($"Got process to run '{descriptor.ProcessSpecification.FilePath}'..."); - var isRequeued = false; - try + // Process and log tracking that needs to be shared between try and finally blocks. + var process = nint.Zero; + uint logIndex = 0; + var getTargetLogLine = (uint targetLogIndex) => { - // Create the gate that we can wait on until the process exits. - var exitedGate = new Gate(); - ExitCallback exited = (nint userdata, nint handle) => + var ptr = ProcessHandle_GetLogLine(process, targetLogIndex); + if (ptr == nint.Zero) { - exitedGate.Open(); - }; - - // Create the process start info. - var processStartInfo = CreateProcessStartInfo( - descriptor.ProcessSpecification.FilePath, - _processArgumentParser.JoinArguments(descriptor.ProcessSpecification.Arguments), - descriptor.ProcessSpecification.WorkingDirectory ?? Environment.CurrentDirectory, - $"Redpoint.Uba: {descriptor.ProcessSpecification.FilePath} {_processArgumentParser.JoinArguments(descriptor.ProcessSpecification.Arguments)}", - (uint)ProcessPriorityClass.Normal, - int.MaxValue, - true, - string.Empty, - exited); - if (processStartInfo == nint.Zero) + return null; + } + if (OperatingSystem.IsWindows()) { - throw new InvalidOperationException("Unable to create UBA process start info!"); + return Marshal.PtrToStringUni(ptr); } - - // Process and log tracking that needs to be shared between try and finally blocks. - var process = nint.Zero; - uint logIndex = 0; - var getTargetLogLine = (uint targetLogIndex) => + else { - var ptr = ProcessHandle_GetLogLine(process, targetLogIndex); - if (ptr == nint.Zero) - { - return null; - } - if (OperatingSystem.IsWindows()) - { - return Marshal.PtrToStringUni(ptr); - } - else - { - return Marshal.PtrToStringUTF8(ptr); - } - }; - var getExecutingHost = () => + return Marshal.PtrToStringUTF8(ptr); + } + }; + var getExecutingHost = () => + { + var ptr = ProcessHandle_GetExecutingHost(process); + if (ptr == nint.Zero) { - var ptr = ProcessHandle_GetExecutingHost(process); - if (ptr == nint.Zero) - { - return null; - } - if (OperatingSystem.IsWindows()) - { - return Marshal.PtrToStringUni(ptr); - } - else - { - return Marshal.PtrToStringUTF8(ptr); - } - }; - var flushLogLines = () => + return null; + } + if (OperatingSystem.IsWindows()) { - if (process != nint.Zero) + return Marshal.PtrToStringUni(ptr); + } + else + { + return Marshal.PtrToStringUTF8(ptr); + } + }; + var flushLogLines = () => + { + if (process != nint.Zero) + { + var nextLogLine = getTargetLogLine(logIndex); + while (nextLogLine != null) { - var nextLogLine = getTargetLogLine(logIndex); - while (nextLogLine != null) + logIndex++; + if (descriptor.CaptureSpecification.InterceptStandardOutput) { - logIndex++; - if (descriptor.CaptureSpecification.InterceptStandardOutput) - { - descriptor.CaptureSpecification.OnReceiveStandardOutput(nextLogLine); - } - else - { - Console.WriteLine(nextLogLine); - } - nextLogLine = getTargetLogLine(logIndex); + descriptor.CaptureSpecification.OnReceiveStandardOutput(nextLogLine); } - } - }; - - // try/finally to ensure we release native resources when finished. - var isCancelled = false; - var isComplete = false; - ulong processHash = 0; - var releaseProcessResources = (bool forceCancel) => - { - if (process != nint.Zero) - { - if (!isComplete && !isCancelled && (forceCancel || descriptor.CancellationToken.IsCancellationRequested)) + else { - ProcessHandle_Cancel(process, true); - isCancelled = true; + Console.WriteLine(nextLogLine); } - DestroyProcessHandle(process); - _returnedProcesses.Remove(processHash, out _); - process = nint.Zero; + nextLogLine = getTargetLogLine(logIndex); } - }; - try - { - // Check for cancellation. - descriptor.CancellationToken.ThrowIfCancellationRequested(); + } + }; - // Run the process remotely. - process = SessionServer_RunProcessRemote( - _sessionServer, - processStartInfo, - 1.0f, - null, - 0); - if (process == nint.Zero) + // try/finally to ensure we release native resources when finished. + var isCancelled = false; + var isComplete = false; + ulong processHash = 0; + var releaseProcessResources = (bool forceCancel) => + { + if (process != nint.Zero) + { + if (!isComplete && !isCancelled && (forceCancel || descriptor.CancellationToken.IsCancellationRequested)) { - throw new InvalidOperationException("Unable to create UBA remote process!"); + ProcessHandle_Cancel(process, true); + isCancelled = true; } - processHash = ProcessHandle_GetHash(process); - - _logger.LogInformation($"Remote process '{descriptor.ProcessSpecification.FilePath}' is now running..."); - - // While we wait for the exit gate to open, poll for log lines. - while (!exitedGate.Opened && - !_returnedProcesses.ContainsKey(processHash) && - !descriptor.CancellationToken.IsCancellationRequested) - { - // Check for cancellation. - descriptor.CancellationToken.ThrowIfCancellationRequested(); + DestroyProcessHandle(process); + _returnedProcesses.Remove(processHash, out _); + process = nint.Zero; + } + }; + try + { + // Check for cancellation. + descriptor.CancellationToken.ThrowIfCancellationRequested(); - // Flush all available log lines. - flushLogLines(); + // Run the process remotely. + process = createProcessOnServer(processStartInfo); + if (process == nint.Zero) + { + throw new InvalidOperationException("Unable to create UBA remote process!"); + } + processHash = ProcessHandle_GetHash(process); - // Continue waiting for the process to exit. - await Task.Delay(200, descriptor.CancellationToken).ConfigureAwait(false); - } + // While we wait for the exit gate to open, poll for log lines. + while (!exitedGate.Opened && + !_returnedProcesses.ContainsKey(processHash) && + !descriptor.CancellationToken.IsCancellationRequested) + { + // Check for cancellation. + descriptor.CancellationToken.ThrowIfCancellationRequested(); - // Get the exit code and mark as completed if the command finished running. - int? exitCode = null; - if (!_returnedProcesses.ContainsKey(processHash)) - { - exitCode = (int)ProcessHandle_GetExitCode(process); - isComplete = true; - } + // Flush all available log lines. + flushLogLines(); - // Check if we should requeue this process. - if (!exitCode.HasValue /* Returned by remote agent */ || - (exitCode.HasValue && exitCode == 9006) /* Known retry code when running cmd.exe via remote agent */) - { - _logger.LogInformation($"Remote process '{descriptor.ProcessSpecification.FilePath}' was returned to the queue, scheduling for local execution..."); + // Continue waiting for the process to exit. + await Task.Delay(200, descriptor.CancellationToken).ConfigureAwait(false); + } - // Prefer to run this command locally now. - descriptor.PreferRemote = false; + // Get the exit code and mark as completed if the command finished running. + int? exitCode = null; + if (!_returnedProcesses.ContainsKey(processHash)) + { + exitCode = (int)ProcessHandle_GetExitCode(process); + isComplete = true; + } - // Cancel and release remote process. - releaseProcessResources(true); + // Check if we should requeue this process. + if (isRemoteExecution && ( + !exitCode.HasValue /* Returned by remote agent */ || + (exitCode.HasValue && exitCode != 0) /* Allow a compile failure to be retried locally */)) + { + // Prefer to run this command locally now. + descriptor.AllowRemote = false; - // Push this process back into the queue for local execution. - _processQueue.Enqueue(descriptor); - isRequeued = true; - return; - } + // Cancel and release remote process. + releaseProcessResources(true); - // Otherwise, return the exit code. - isComplete = true; - descriptor.ExitCode = exitCode!.Value; + // Push this process back into the queue for local execution. + _processQueue.Enqueue(descriptor); + isRequeued = true; return; } - finally - { - flushLogLines(); - releaseProcessResources(false); - DestroyProcessStartInfo(processStartInfo); - } + + // Otherwise, return the exit code. + isComplete = true; + descriptor.ExitCode = exitCode ?? 99999; + return; } - catch (Exception ex) + finally { - if (!isRequeued) - { - descriptor.ExceptionDispatchInfo = ExceptionDispatchInfo.Capture(ex); - } + flushLogLines(); + releaseProcessResources(false); + DestroyProcessStartInfo(processStartInfo); } - finally + } + catch (Exception ex) + { + if (!isRequeued) { - if (!isRequeued) - { - descriptor.CompletionGate.Open(); - } - else + descriptor.ExceptionDispatchInfo = ExceptionDispatchInfo.Capture(ex); + } + } + finally + { + if (!isRequeued) + { + descriptor.CompletionGate.Open(); + } + else + { + Interlocked.Increment(ref _processesPendingInQueue); + } + if (isRemoteExecution) + { + Interlocked.Decrement(ref _processesExecutingRemotely); + } + else + { + Interlocked.Decrement(ref _processesExecutingLocally); + } + } + } + + private void OnRemoteProcessAvailable(nint userData) + { + // Start a background task to queue a remote process (since this function can't be async). + _ = Task.Run(async () => + { + // Before we attempt to actually dequeue, check that there is something to dequeue, since UBA + // frequently calls OnRemoteProcessAvailable whileever slots are free (and not just when a slot + // opens up). + if (Interlocked.Read(ref _processesPendingInQueue) == 0) + { + return; + } + + // Grab the next process to run. + var seenDescriptors = new HashSet(); + retryFetch: + var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false); + if (!descriptor.AllowRemote) + { + if (seenDescriptors.Contains(descriptor)) { - Interlocked.Increment(ref _processesPendingInQueue); + // No remotable processes in queue at the moment. + return; } - Interlocked.Decrement(ref _processesExecutingRemotely); + seenDescriptors.Add(descriptor); + _processQueue.Enqueue(descriptor); + goto retryFetch; } + Interlocked.Decrement(ref _processesPendingInQueue); + Interlocked.Increment(ref _processesExecutingRemotely); + + // Run the process remotely. + await RunProcessAsync( + descriptor, + processStartInfo => SessionServer_RunProcessRemote( + _sessionServer, + processStartInfo, + 1.0f, + null, + 0), + true).ConfigureAwait(false); }); } @@ -488,22 +525,14 @@ private async Task LocalProcessWorkerLoop() Interlocked.Increment(ref _processesExecutingLocally); // Run the process locally. - try - { - descriptor.ExitCode = await _localProcessExecutor.ExecuteAsync( - descriptor.ProcessSpecification, - descriptor.CaptureSpecification, - descriptor.CancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - descriptor.ExceptionDispatchInfo = ExceptionDispatchInfo.Capture(ex); - } - finally - { - descriptor.CompletionGate.Open(); - Interlocked.Decrement(ref _processesExecutingLocally); - } + await RunProcessAsync( + descriptor, + processStartInfo => SessionServer_RunProcess( + _sessionServer, + processStartInfo, + true, + false), + false).ConfigureAwait(false); } while (true); } @@ -528,6 +557,7 @@ public async Task ExecuteAsync( CancellationToken = cancellationToken, DateQueuedUtc = DateTimeOffset.UtcNow, PreferRemote = processSpecification is UbaProcessSpecification ubaProcessSpecification && ubaProcessSpecification.PreferRemote, + AllowRemote = true, CompletionGate = new Gate(), }; _processQueue.Enqueue(descriptor); diff --git a/UET/Redpoint.Uba/DefaultUbaServerFactory.cs b/UET/Redpoint.Uba/DefaultUbaServerFactory.cs index 5debf9d3..03f5048e 100644 --- a/UET/Redpoint.Uba/DefaultUbaServerFactory.cs +++ b/UET/Redpoint.Uba/DefaultUbaServerFactory.cs @@ -13,18 +13,15 @@ internal class DefaultUbaServerFactory : IUbaServerFactory private readonly ILogger _defaultUbaServerLogger; private readonly ILogger _ubaLoggerForwarderLogger; private readonly IProcessArgumentParser _processArgumentParser; - private readonly IProcessExecutor _localProcessExecutor; public DefaultUbaServerFactory( ILogger defaultUbaServerLogger, ILogger ubaLoggerForwarderLogger, - IProcessArgumentParser processArgumentParser, - IProcessExecutor localProcessExecutor) + IProcessArgumentParser processArgumentParser) { _defaultUbaServerLogger = defaultUbaServerLogger; _ubaLoggerForwarderLogger = ubaLoggerForwarderLogger; _processArgumentParser = processArgumentParser; - _localProcessExecutor = localProcessExecutor; } public IUbaServer CreateServer( @@ -58,7 +55,6 @@ public IUbaServer CreateServer( return new DefaultUbaServer( _defaultUbaServerLogger, _processArgumentParser, - _localProcessExecutor, ubaLogger, server, rootStorageDirectoryPath, diff --git a/UET/Redpoint.Uba/UbaProcessDescriptor.cs b/UET/Redpoint.Uba/UbaProcessDescriptor.cs index 37571545..f6c4bb8a 100644 --- a/UET/Redpoint.Uba/UbaProcessDescriptor.cs +++ b/UET/Redpoint.Uba/UbaProcessDescriptor.cs @@ -18,6 +18,8 @@ internal class UbaProcessDescriptor public required bool PreferRemote { get; set; } + public required bool AllowRemote { get; set; } + public required Gate CompletionGate { get; init; } public int ExitCode { get; set; } diff --git a/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs index 1e2f5609..661dd8de 100644 --- a/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs +++ b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs @@ -3,7 +3,6 @@ using CMakeUba; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; - using Redpoint.Concurrency; using Redpoint.GrpcPipes; using Redpoint.ProcessExecution; using System; @@ -12,7 +11,6 @@ using System.Threading.Tasks; using UET.Services; using static CMakeUba.CMakeUbaService; - using Semaphore = System.Threading.Semaphore; internal class CMakeUbaRunCommand { @@ -63,101 +61,47 @@ public CMakeUbaRunCommandInstance( _commandArguments = commandArguments; } - private async Task IsServerRunningAsync() + private async Task IsServerRunningAsync(string pipeName) { - var client = _grpcPipeFactory.CreateClient( - "cmake-uba-server", - GrpcPipeNamespace.User, - channel => new CMakeUbaServiceClient(channel)); - try + for (int i = 0; i < 10; i++) { - await client.PingServerAsync(new CMakeUba.EmptyMessage()).ConfigureAwait(false); - return true; - } - catch - { - return false; - } - } - - private async Task WaitForServerToStartAsync(CancellationToken serverUnexpectedExit) - { - while (true) - { - serverUnexpectedExit.ThrowIfCancellationRequested(); - var client = _grpcPipeFactory.CreateClient( - "cmake-uba-server", + pipeName, GrpcPipeNamespace.User, channel => new CMakeUbaServiceClient(channel)); try { - await client.PingServerAsync(new CMakeUba.EmptyMessage(), cancellationToken: serverUnexpectedExit).ConfigureAwait(false); - return; + await client.PingServerAsync(new CMakeUba.EmptyMessage()).ConfigureAwait(false); + return true; } catch { - await Task.Delay(1000, serverUnexpectedExit).ConfigureAwait(false); + await Task.Delay(1000).ConfigureAwait(false); } } + return false; } public async Task ExecuteAsync(InvocationContext context) { - // Start the CMake UBA server on-demand if it's not already running. - if (!await IsServerRunningAsync().ConfigureAwait(false)) + var sessionId = Environment.GetEnvironmentVariable("CMAKE_UBA_SESSION_ID"); + if (string.IsNullOrWhiteSpace(sessionId)) { - using var globalServerSemaphore = new Semaphore(1, 1, "cmake-uba-grpc-server"); - globalServerSemaphore.WaitOne(); - try - { - if (!await IsServerRunningAsync().ConfigureAwait(false)) - { - var cts = new CancellationTokenSource(); - var ctsDisposed = new Gate(); - try - { - _ = Task.Run(async () => - { - try - { - _logger.LogInformation("Starting CMake UBA server. You will not see output or errors from it. To see diagnostic information, run 'uet internal cmake-uba-server' in a separate terminal."); - await _processExecutor.ExecuteAsync( - new ProcessSpecification - { - FilePath = _selfLocation.GetUetLocalLocation(), - Arguments = ["internal", "cmake-uba-server", "--auto-close"] - }, - CaptureSpecification.Silence, - CancellationToken.None).ConfigureAwait(false); - } - finally - { - if (!ctsDisposed.Opened) - { - cts.Cancel(); - } - } - }); + _logger.LogError($"Expected CMAKE_UBA_SESSION_ID environment variable to be set."); + return 1; + } + var pipeName = $"cmake-uba-{sessionId}"; - await WaitForServerToStartAsync(cts.Token).ConfigureAwait(false); - } - finally - { - ctsDisposed.Open(); - cts.Dispose(); - } - } - } - finally - { - globalServerSemaphore.Release(); - } + // Start the CMake UBA server on-demand if it's not already running. + if (!await IsServerRunningAsync(pipeName).ConfigureAwait(false)) + { + _logger.LogError($"The CMake UBA server isn't running. Start it with '{_selfLocation.GetUetLocalLocation()} internal cmake-uba-server &'"); + return 1; } // Create our gRPC client. var client = _grpcPipeFactory.CreateClient( - "cmake-uba-server", + pipeName, GrpcPipeNamespace.User, channel => new CMakeUbaServiceClient(channel)); @@ -173,6 +117,7 @@ await _processExecutor.ExecuteAsync( { request.Arguments.Add(new ProcessArgument { LogicalValue = arguments[i] }); } + var response = client.ExecuteProcess(request, cancellationToken: context.GetCancellationToken()); // Stream the response values. diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs index d1c11531..badb8a23 100644 --- a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs @@ -23,9 +23,6 @@ internal class CMakeUbaServerCommand { internal sealed class Options { - public Option AutoClose = new Option( - "--auto-close", - "If true, the CMake UBA server will automatically shut down after idling for 60 seconds."); } public static Command CreateCMakeUbaServerCommand() @@ -66,7 +63,14 @@ public CMakeUbaServerCommandInstance( public async Task ExecuteAsync(InvocationContext context) { - var autoClose = context.ParseResult.GetValueForOption(_options.AutoClose); + // Get the session ID / pipe name. + var sessionId = Environment.GetEnvironmentVariable("CMAKE_UBA_SESSION_ID"); + if (string.IsNullOrWhiteSpace(sessionId)) + { + _logger.LogError($"Expected CMAKE_UBA_SESSION_ID environment variable to be set."); + return 1; + } + var pipeName = $"cmake-uba-{sessionId}"; // @todo: Make this configurable. UbaNative.Init(@"C:\Program Files\Epic Games\UE_5.4\Engine\Binaries\Win64\UnrealBuildAccelerator"); @@ -89,7 +93,6 @@ public async Task ExecuteAsync(InvocationContext context) _commandCancellationToken = context.GetCancellationToken(); // Create the gRPC server. - var pipeName = $"cmake-uba-server"; await using (_grpcPipeFactory .CreateServer( pipeName, @@ -120,17 +123,14 @@ public async Task ExecuteAsync(InvocationContext context) { _logger.LogDebug($"Pending: {ubaServer.ProcessesPendingInQueue} Executing Locally: {ubaServer.ProcessesExecutingLocally} Executing Remotely: {ubaServer.ProcessesExecutingRemotely}"); - if (autoClose) + if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) + { + shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); + } + if (shutdownTime < DateTimeOffset.UtcNow) { - if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) - { - shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); - } - if (shutdownTime < DateTimeOffset.UtcNow) - { - _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); - return 0; - } + _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); + return 0; } await Task.Delay(1000, context.GetCancellationToken()).ConfigureAwait(false); diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs index 68b2185b..9109785e 100644 --- a/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs +++ b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs @@ -81,7 +81,7 @@ public async Task InitAsync(IUbaServer ubaServer) return; } - _logger.LogInformation("Kubernetes UBA: InitAsync"); + _logger.LogDebug("Kubernetes UBA: InitAsync"); try { @@ -244,7 +244,7 @@ private async IAsyncEnumerable EnumerateNodesAsync(string labelSelector, [System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "This call is not used for cryptography.")] public void Start() { - _logger.LogInformation("Kubernetes UBA: Start"); + _logger.LogDebug("Kubernetes UBA: Start"); _timer = new Timer(async _ => { @@ -256,7 +256,7 @@ public void Start() // If we're cancelled, stop. if (_cancellationSource == null || _cancellationSource.IsCancellationRequested) { - _logger.LogInformation("Kubernetes UBA: Loop: Cancelled"); + _logger.LogDebug("Kubernetes UBA: Loop: Cancelled"); stopping = true; return; } @@ -264,7 +264,7 @@ public void Start() // If the client is unavailable, keep looping unless we're done. if (_client == null) { - _logger.LogInformation("Kubernetes UBA: Loop: Not configured to use Kubernetes."); + _logger.LogInformation("Kubernetes UBA: Not configured to use Kubernetes."); return; } @@ -273,7 +273,7 @@ public void Start() { if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") { - _logger.LogInformation($"Removing Kubernetes block: {pod.Name()} (cleanup)"); + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()} (cleanup)"); await DeleteServiceAndPodAsync(pod, _cancellationSource.Token).ConfigureAwait(false); foreach (var kv in _kubernetesNodes) @@ -331,7 +331,7 @@ public void Start() { if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") { - _logger.LogInformation($"Removing Kubernetes block: {pod.Name()}"); + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()}"); await DeleteServiceAndPodAsync(pod, _cancellationSource.Token).ConfigureAwait(false); } } @@ -355,7 +355,7 @@ public void Start() .SelectMany(x => { var blocks = new List(); - _logger.LogInformation($"Kubernetes UBA: Loop: Node '{x.NodeId}' has\nCPU: {x.CoresTotal} total, {x.CoresNonUba} non-UBA, {x.CoresAllocated} allocated, {x.CoresAvailable} available, {x.CoresAllocatable} allocatable.\nMemory: {x.MemoryTotal} total, {x.MemoryNonUba} non-UBA, {x.MemoryAllocated} allocated, {x.MemoryAvailable} available.\nBlocks: {x.AllocatedBlocks.Count} allocated."); + _logger.LogDebug($"Kubernetes UBA: Loop: Node '{x.NodeId}' has\nCPU: {x.CoresTotal} total, {x.CoresNonUba} non-UBA, {x.CoresAllocated} allocated, {x.CoresAvailable} available, {x.CoresAllocatable} allocatable.\nMemory: {x.MemoryTotal} total, {x.MemoryNonUba} non-UBA, {x.MemoryAllocated} allocated, {x.MemoryAvailable} available.\nBlocks: {x.AllocatedBlocks.Count} allocated."); for (var c = 0; c < x.CoresAllocatable; c += blockSize) { if (c + blockSize <= x.CoresAllocatable) @@ -385,7 +385,7 @@ public void Start() // Create the pod and service. var name = $"uba-{selectedNode.NodeId}-{nextBlockId}"; - _logger.LogInformation($"Allocating Kubernetes block: {name}"); + _logger.LogDebug($"Allocating Kubernetes block: {name}"); var labels = new Dictionary { { "uba", "true" }, @@ -671,7 +671,7 @@ public async Task CloseAsync() { if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") { - _logger.LogInformation($"Removing Kubernetes block: {pod.Name()} (cleanup on close)"); + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()} (cleanup on close)"); await DeleteServiceAndPodAsync(pod, CancellationToken.None).ConfigureAwait(false); } } @@ -681,7 +681,7 @@ public async Task CloseAsync() { await foreach (var pod in EnumerateNamespacedPodAsync($"uba.queueId={_id}", CancellationToken.None)) { - _logger.LogInformation($"Removing Kubernetes block: {pod.Name()} (unconditional)"); + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()} (unconditional)"); await DeleteServiceAndPodAsync(pod, CancellationToken.None).ConfigureAwait(false); } } From 7d17747921dcb59420cf57653828ebc12fbbb159 Mon Sep 17 00:00:00 2001 From: June Rhodes Date: Thu, 26 Sep 2024 22:28:38 +1000 Subject: [PATCH 4/7] Limit CMake UBA to only run clang-cl for now --- UET/Redpoint.Uba/DefaultUbaServer.cs | 5 +++-- UET/Redpoint.Uba/UbaProcessSpecification.cs | 5 +++++ .../Internal/CMakeUbaServer/CMakeUbaServerCommand.cs | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/UET/Redpoint.Uba/DefaultUbaServer.cs b/UET/Redpoint.Uba/DefaultUbaServer.cs index 264c601c..81e1ef04 100644 --- a/UET/Redpoint.Uba/DefaultUbaServer.cs +++ b/UET/Redpoint.Uba/DefaultUbaServer.cs @@ -550,14 +550,15 @@ public async Task ExecuteAsync( } // Push the request into the queue. + var ubaProcessSpecification = processSpecification as UbaProcessSpecification; var descriptor = new UbaProcessDescriptor { ProcessSpecification = processSpecification, CaptureSpecification = captureSpecification, CancellationToken = cancellationToken, DateQueuedUtc = DateTimeOffset.UtcNow, - PreferRemote = processSpecification is UbaProcessSpecification ubaProcessSpecification && ubaProcessSpecification.PreferRemote, - AllowRemote = true, + PreferRemote = ubaProcessSpecification != null && ubaProcessSpecification.PreferRemote, + AllowRemote = ubaProcessSpecification == null || ubaProcessSpecification.AllowRemote, CompletionGate = new Gate(), }; _processQueue.Enqueue(descriptor); diff --git a/UET/Redpoint.Uba/UbaProcessSpecification.cs b/UET/Redpoint.Uba/UbaProcessSpecification.cs index ac71ca83..8aa97387 100644 --- a/UET/Redpoint.Uba/UbaProcessSpecification.cs +++ b/UET/Redpoint.Uba/UbaProcessSpecification.cs @@ -13,5 +13,10 @@ public class UbaProcessSpecification : ProcessSpecification /// remote agent. /// public bool PreferRemote { get; set; } + + /// + /// If true, this command can be run remotely. Defaults to true if not set. + /// + public bool AllowRemote { get; set; } = true; } } diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs index badb8a23..65992679 100644 --- a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs @@ -175,6 +175,7 @@ await responseStream.WriteAsync(new CMakeUba.ProcessResponse Arguments = request.Arguments.Select(x => new LogicalProcessArgument(x.LogicalValue)).ToArray(), WorkingDirectory = request.WorkingDirectory, PreferRemote = request.PreferRemote, + AllowRemote = string.Equals(Path.GetFileName(request.Path), "clang-cl.exe", StringComparison.OrdinalIgnoreCase), }; using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_commandCancellationToken.Value, context.CancellationToken); From c33e4612f3a2df9609e3f0e06bb650ef662c18ad Mon Sep 17 00:00:00 2001 From: June Rhodes Date: Wed, 25 Dec 2024 09:43:09 +1100 Subject: [PATCH 5/7] Use KubernetesClient.Aot to fix trim issues --- UET/uet/uet.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/UET/uet/uet.csproj b/UET/uet/uet.csproj index a22c3a27..40039c92 100644 --- a/UET/uet/uet.csproj +++ b/UET/uet/uet.csproj @@ -29,7 +29,7 @@ - + From 7a682f5b5242da02bbc4c0f4915b4bf179182a79 Mon Sep 17 00:00:00 2001 From: June Rhodes Date: Wed, 25 Dec 2024 11:05:35 +1100 Subject: [PATCH 6/7] Add top-level 'uet cmake' command that puts all the internals together for you --- UET/uet/Commands/CMake/CMakeCommand.cs | 249 ++++++++++++++++++ .../CMakeUbaServer/CMakeUbaServerCommand.cs | 192 +++++++++----- UET/uet/Commands/ParameterSpec/EngineSpec.cs | 34 ++- UET/uet/Program.cs | 13 + 4 files changed, 414 insertions(+), 74 deletions(-) create mode 100644 UET/uet/Commands/CMake/CMakeCommand.cs diff --git a/UET/uet/Commands/CMake/CMakeCommand.cs b/UET/uet/Commands/CMake/CMakeCommand.cs new file mode 100644 index 00000000..b47adbfe --- /dev/null +++ b/UET/uet/Commands/CMake/CMakeCommand.cs @@ -0,0 +1,249 @@ +namespace UET.Commands.CMake +{ + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; + using Redpoint.PathResolution; + using Redpoint.ProcessExecution; + using System; + using System.Collections.Generic; + using System.CommandLine; + using System.CommandLine.Invocation; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using UET.Commands.EngineSpec; + using UET.Services; + + internal sealed class CMakeCommand + { + internal sealed class Options + { + public Option Engine; + + public Options() + { + Engine = new Option( + "--engine", + description: "The engine to use the Unreal Build Accelerator from.", + parseArgument: EngineSpec.ParseEngineSpecContextless(), + isDefault: true); + Engine.AddAlias("-e"); + Engine.Arity = ArgumentArity.ZeroOrOne; + } + } + + public static Command CreateCMakeCommand() + { + var options = new Options(); + var commandArguments = new Argument("command-and-arguments", "The command to run, followed by any arguments to pass to it."); + commandArguments.Arity = ArgumentArity.ZeroOrMore; + var command = new Command("cmake", "Run a CMake-based build leveraging the Unreal Build Accelerator (UBA) to distribute compilation.") + { + FullDescription = """ + This command runs a CMake-based build, leveraging the Unreal Build Accelerator (UBA) to distribute the build over Kubernetes. + + ------------- + + Before you can distribute builds, you must configure your BuildConfiguration.xml file located at "%appdata%\Unreal Engine\UnrealBuildTool\BuildConfiguration.xml" with the settings to connect to the Kubernetes cluster. The Kubernetes cluster must have Windows nodes in it. You can use RKM (https://src.redpoint.games/redpointgames/rkm) to spin up a Kubernetes cluster with Windows nodes with a single command. + + + + + default + defafult + 10.0.0.100 + ShareName + Domain\Username + Password + + + + The 'Smb' settings specify a network share that all Windows nodes can access as the specified user. The Unreal Build Accelerator will be copied to this share and the containers will copy from this network share. + + Your `kubectl` configuration must be connected to the cluster already, as per the 'Context' setting. You can get the context name by running `kubectl config get-contexts`. The 'Namespace' setting specifies what Kubernetes namespace to launch UBA agents into. + + ------------- + + To distribute builds, you must first generate your CMake project using: + + uet cmake -- ... + + You should omit `-G`; this command will automatically select the Ninja project generator which is required to distribute builds. + + Once you've generated your project, you can distribute the build using: + + uet cmake -e 5.5 -- --build ... + + The presence of `--build` in the CMake arguments is what this tool uses to determine whether CMake is generating project files or running the build. You only need to specify `-e` as an argument to this command when running the build; it is not necessary during generation. + + All arguments past the `--` are forwarded to CMake intact. + """ + }; + command.AddAllOptions(options); + command.AddArgument(commandArguments); + command.AddCommonHandler(options, services => + { + services.AddSingleton(commandArguments); + }); + return command; + } + + private sealed class CMakeCommandInstance : ICommandInstance + { + private readonly ILogger _logger; + private readonly IProcessExecutor _processExecutor; + private readonly ISelfLocation _selfLocation; + private readonly IPathResolver _pathResolver; + private readonly Options _options; + private readonly Argument _commandArguments; + + public CMakeCommandInstance( + ILogger logger, + IProcessExecutor processExecutor, + ISelfLocation selfLocation, + IPathResolver pathResolver, + Options options, + Argument commandArguments) + { + _logger = logger; + _processExecutor = processExecutor; + _selfLocation = selfLocation; + _pathResolver = pathResolver; + _options = options; + _commandArguments = commandArguments; + } + + public async Task ExecuteAsync(InvocationContext context) + { + var extraArguments = context.ParseResult.GetValueForArgument(_commandArguments); + + string? cmake = null; + try + { + cmake = await _pathResolver.ResolveBinaryPath("cmake"); + } + catch (FileNotFoundException) + { + cmake = Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.ProgramFiles), + "Microsoft Visual Studio\\2022\\Community\\Common7\\IDE\\CommonExtensions\\Microsoft\\CMake\\CMake\\bin\\cmake.exe"); + } + if (cmake == null || !File.Exists(cmake)) + { + _logger.LogError($"Unable to find CMake on PATH or at '{cmake}'."); + return 1; + } + + var engineResult = context.ParseResult.CommandResult.FindResultFor(_options.Engine); + string? engineString = null; + if (engineResult != null) + { + // Run the parse of the engine specification so we can error early if it's invalid. + context.ParseResult.GetValueForOption(_options.Engine); + + engineString = engineResult.Tokens.Count > 0 ? engineResult.Tokens[0].Value : null; + } + + if (extraArguments.Contains("--build")) + { + _logger.LogInformation("CMake is building the project (--build detected)."); + + if (string.IsNullOrWhiteSpace(engineString)) + { + _logger.LogError("Missing --engine option, which is necessary for --build. Use this command like 'uet cmake -e 5.5 -- ...'."); + return 1; + } + + // Generate a session ID. + var sessionId = Guid.NewGuid().ToString(); + + // Start the CMake UBA server. + _logger.LogInformation("Starting CMake UBA server..."); + using var backgroundCts = CancellationTokenSource.CreateLinkedTokenSource(context.GetCancellationToken()); + var backgroundTask = Task.Run(async () => await _processExecutor.ExecuteAsync( + new ProcessSpecification + { + FilePath = _selfLocation.GetUetLocalLocation(), + Arguments = [ + "internal", + "cmake-uba-server", + "-e", + engineString, + ], + EnvironmentVariables = new Dictionary + { + { "CMAKE_UBA_SESSION_ID", sessionId } + } + }, + CaptureSpecification.Passthrough, + backgroundCts.Token).ConfigureAwait(false)); + + var appendArguments = new List(); + if (!(extraArguments.Contains("-j") || extraArguments.Any(x => x.StartsWith("-j", StringComparison.Ordinal)))) + { + // Override core detection because we're distributing builds. + appendArguments.Add("-j256"); + } + + // Run CMake. + try + { + _logger.LogInformation("Running CMake..."); + return await _processExecutor.ExecuteAsync( + new ProcessSpecification + { + FilePath = cmake, + Arguments = extraArguments.Select(x => new LogicalProcessArgument(x)).Concat(appendArguments), + EnvironmentVariables = new Dictionary + { + { "CMAKE_UBA_SESSION_ID", sessionId }, + { "UET_IMPLICIT_COMMAND", "cmake-uba-run" }, + } + }, + CaptureSpecification.Passthrough, + context.GetCancellationToken()); + } + finally + { + backgroundCts.Cancel(); + try + { + _logger.LogInformation("Stopping CMake UBA server..."); + await backgroundTask.ConfigureAwait(false); + } + catch + { + } + } + } + else + { + _logger.LogInformation("CMake is generating project files (--build not detected)."); + + if (extraArguments.Contains("-G") || extraArguments.Any(x => x.StartsWith("-G", StringComparison.Ordinal))) + { + _logger.LogError("Detected -G argument passed to CMake during project generation. Omit this setting as UET will force use of the Ninja build system."); + return 1; + } + + // Run CMake. + _logger.LogInformation("Running CMake..."); + return await _processExecutor.ExecuteAsync( + new ProcessSpecification + { + FilePath = cmake, + Arguments = new LogicalProcessArgument[] + { + "-G", + "Ninja", + $"-DCMAKE_C_COMPILER_LAUNCHER={_selfLocation.GetUetLocalLocation()}", + $"-DCMAKE_CXX_COMPILER_LAUNCHER={_selfLocation.GetUetLocalLocation()}", + }.Concat(extraArguments.Select(x => new LogicalProcessArgument(x))), + }, + CaptureSpecification.Passthrough, + context.GetCancellationToken()); + } + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs index 65992679..dd064b6a 100644 --- a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs @@ -10,19 +10,36 @@ using Redpoint.ProcessExecution.Enumerable; using Redpoint.Uba; using Redpoint.Uba.Native; + using Redpoint.Uet.BuildPipeline.Executors.Engine; + using Redpoint.Uet.BuildPipeline.Executors; + using Redpoint.Uet.Workspace; using System; using System.CommandLine; using System.CommandLine.Invocation; using System.Linq; using System.Runtime.InteropServices; + using System.Threading; using System.Threading.Tasks; using UET.Commands.Config; + using UET.Commands.EngineSpec; using static CMakeUba.CMakeUbaService; internal class CMakeUbaServerCommand { internal sealed class Options { + public Option Engine; + + public Options() + { + Engine = new Option( + "--engine", + description: "The engine to use for the build.", + parseArgument: EngineSpec.ParseEngineSpecContextless(), + isDefault: true); + Engine.AddAlias("-e"); + Engine.Arity = ArgumentArity.ExactlyOne; + } } public static Command CreateCMakeUbaServerCommand() @@ -43,6 +60,7 @@ private sealed class CMakeUbaServerCommandInstance : CMakeUbaServiceBase, IComma private readonly IGrpcPipeFactory _grpcPipeFactory; private readonly IXmlConfigHelper _xmlConfigHelper; private readonly ILogger _logger; + private readonly IEngineWorkspaceProvider _engineWorkspaceProvider; private readonly Options _options; private CancellationToken? _commandCancellationToken; private IUbaServer? _ubaServer; @@ -52,104 +70,146 @@ public CMakeUbaServerCommandInstance( IGrpcPipeFactory grpcPipeFactory, IXmlConfigHelper xmlConfigHelper, ILogger logger, + IEngineWorkspaceProvider engineWorkspaceProvider, Options options) { _ubaServerFactory = ubaServerFactory; _grpcPipeFactory = grpcPipeFactory; _xmlConfigHelper = xmlConfigHelper; _logger = logger; + _engineWorkspaceProvider = engineWorkspaceProvider; _options = options; } public async Task ExecuteAsync(InvocationContext context) { - // Get the session ID / pipe name. - var sessionId = Environment.GetEnvironmentVariable("CMAKE_UBA_SESSION_ID"); - if (string.IsNullOrWhiteSpace(sessionId)) + var engine = context.ParseResult.GetValueForOption(_options.Engine)!; + + BuildEngineSpecification engineSpec; + switch (engine.Type) { - _logger.LogError($"Expected CMAKE_UBA_SESSION_ID environment variable to be set."); - return 1; + case EngineSpecType.UEFSPackageTag: + engineSpec = BuildEngineSpecification.ForUEFSPackageTag(engine.UEFSPackageTag!); + break; + case EngineSpecType.SESNetworkShare: + engineSpec = BuildEngineSpecification.ForSESNetworkShare(engine.SESNetworkShare!); + break; + case EngineSpecType.RemoteZfs: + engineSpec = BuildEngineSpecification.ForRemoteZfs(engine.RemoteZfs!); + break; + case EngineSpecType.Version: + engineSpec = BuildEngineSpecification.ForVersionWithPath(engine.Version!, engine.Path!); + break; + case EngineSpecType.Path: + engineSpec = BuildEngineSpecification.ForAbsolutePath(engine.Path!); + break; + case EngineSpecType.GitCommit: + engineSpec = BuildEngineSpecification.ForGitCommitWithZips( + engine.GitUrl!, + engine.GitCommit!, + engine.ZipLayers, + isEngineBuild: false); + break; + default: + throw new NotSupportedException($"The EngineSpecType {engine.Type} is not supported by the 'cmake-uba-server' command."); } - var pipeName = $"cmake-uba-{sessionId}"; - - // @todo: Make this configurable. - UbaNative.Init(@"C:\Program Files\Epic Games\UE_5.4\Engine\Binaries\Win64\UnrealBuildAccelerator"); - - // Track the timestamp that the server should automatically shut down. This gets moved - // forward into the future when we have work in the queue. - var shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); - - // Create the UBA server. - _logger.LogInformation("CMake UBA server is starting up..."); - var ubaRoot = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData)!, "Redpoint", "CMakeUBA"); - await using (_ubaServerFactory - .CreateServer( - ubaRoot, - Path.Combine(ubaRoot, "UbaTrace.log")) - .AsAsyncDisposable(out var ubaServer) - .ConfigureAwait(false)) + + await using ((await _engineWorkspaceProvider.GetEngineWorkspace( + engineSpec, + string.Empty, + context.GetCancellationToken()).ConfigureAwait(false)) + .AsAsyncDisposable(out var engineWorkspace) + .ConfigureAwait(false)) { - _ubaServer = ubaServer; - _commandCancellationToken = context.GetCancellationToken(); + // Get the session ID / pipe name. + var sessionId = Environment.GetEnvironmentVariable("CMAKE_UBA_SESSION_ID"); + if (string.IsNullOrWhiteSpace(sessionId)) + { + _logger.LogError($"Expected CMAKE_UBA_SESSION_ID environment variable to be set."); + return 1; + } + var pipeName = $"cmake-uba-{sessionId}"; + + var ubaDirectory = Path.Combine(engineWorkspace.Path, @"Engine\Binaries\Win64\UnrealBuildAccelerator"); + UbaNative.Init(ubaDirectory); + + // Track the timestamp that the server should automatically shut down. This gets moved + // forward into the future when we have work in the queue. + var shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); - // Create the gRPC server. - await using (_grpcPipeFactory + // Create the UBA server. + _logger.LogInformation("CMake UBA server is starting up..."); + var ubaRoot = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData)!, "Redpoint", "CMakeUBA"); + await using (_ubaServerFactory .CreateServer( - pipeName, - GrpcPipeNamespace.User, - this) - .AsAsyncDisposable(out var grpcServer) + ubaRoot, + Path.Combine(ubaRoot, "UbaTrace.log")) + .AsAsyncDisposable(out var ubaServer) .ConfigureAwait(false)) { - // Start the gRPC server. - await grpcServer.StartAsync().ConfigureAwait(false); - _logger.LogInformation("Waiting for incoming requests over gRPC..."); - - // Start the Kubernetes coordinator. - using var coordinator = new UbaCoordinatorKubernetes( - Path.Combine(@"C:\Program Files\Epic Games\UE_5.4\Engine\Binaries\Win64\UnrealBuildAccelerator", RuntimeInformation.ProcessArchitecture.ToString().ToLowerInvariant()), - _logger, - UbaCoordinatorKubernetesConfig.ReadFromBuildConfigurationXml(_xmlConfigHelper)); - await coordinator.InitAsync(ubaServer).ConfigureAwait(false); - try + _ubaServer = ubaServer; + _commandCancellationToken = context.GetCancellationToken(); + + // Create the gRPC server. + await using (_grpcPipeFactory + .CreateServer( + pipeName, + GrpcPipeNamespace.User, + this) + .AsAsyncDisposable(out var grpcServer) + .ConfigureAwait(false)) { - coordinator.Start(); + // Start the gRPC server. + await grpcServer.StartAsync().ConfigureAwait(false); + _logger.LogInformation("Waiting for incoming requests over gRPC..."); - // Every second, evaluate how many processes are in queue / executing locally / executing remotely, - // and use this information to provision agents on Kubernetes as needed. + // Start the Kubernetes coordinator. + using var coordinator = new UbaCoordinatorKubernetes( + Path.Combine(ubaDirectory, RuntimeInformation.ProcessArchitecture.ToString().ToLowerInvariant()), + _logger, + UbaCoordinatorKubernetesConfig.ReadFromBuildConfigurationXml(_xmlConfigHelper)); + await coordinator.InitAsync(ubaServer).ConfigureAwait(false); try { - while (true) - { - _logger.LogDebug($"Pending: {ubaServer.ProcessesPendingInQueue} Executing Locally: {ubaServer.ProcessesExecutingLocally} Executing Remotely: {ubaServer.ProcessesExecutingRemotely}"); + coordinator.Start(); - if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) - { - shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); - } - if (shutdownTime < DateTimeOffset.UtcNow) + // Every second, evaluate how many processes are in queue / executing locally / executing remotely, + // and use this information to provision agents on Kubernetes as needed. + try + { + while (true) { - _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); - return 0; - } + _logger.LogDebug($"Pending: {ubaServer.ProcessesPendingInQueue} Executing Locally: {ubaServer.ProcessesExecutingLocally} Executing Remotely: {ubaServer.ProcessesExecutingRemotely}"); - await Task.Delay(1000, context.GetCancellationToken()).ConfigureAwait(false); + if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) + { + shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); + } + if (shutdownTime < DateTimeOffset.UtcNow) + { + _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); + return 0; + } + + await Task.Delay(1000, context.GetCancellationToken()).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { } + + // Stop the gRPC server. + _logger.LogInformation("CMake UBA server is shutting down..."); + await grpcServer.StopAsync().ConfigureAwait(false); } - catch (OperationCanceledException) + finally { + await coordinator.CloseAsync().ConfigureAwait(false); } - - // Stop the gRPC server. - _logger.LogInformation("CMake UBA server is shutting down..."); - await grpcServer.StopAsync().ConfigureAwait(false); - } - finally - { - await coordinator.CloseAsync().ConfigureAwait(false); } } } + return 0; } diff --git a/UET/uet/Commands/ParameterSpec/EngineSpec.cs b/UET/uet/Commands/ParameterSpec/EngineSpec.cs index 831a34e6..2ef03cbe 100644 --- a/UET/uet/Commands/ParameterSpec/EngineSpec.cs +++ b/UET/uet/Commands/ParameterSpec/EngineSpec.cs @@ -37,8 +37,16 @@ public static ParseArgument ParseEngineSpec( distributionOpt); } + public static ParseArgument ParseEngineSpecContextless() + { + return ParseEngineSpec( + null, + string.Empty, + null); + } + private static ParseArgument ParseEngineSpec( - Func getPathSpec, + Func? getPathSpec, string pathSpecOptionName, Option? distributionOpt) { @@ -69,13 +77,16 @@ private static ParseArgument ParseEngineSpec( // can figure out the target engine from the project file. PathSpec? path = null; DistributionSpec? distribution = null; - try - { - path = getPathSpec(result); - } - catch (InvalidOperationException) + if (getPathSpec != null) { - return null!; + try + { + path = getPathSpec(result); + } + catch (InvalidOperationException) + { + return null!; + } } if (distributionOpt != null) { @@ -90,7 +101,14 @@ private static ParseArgument ParseEngineSpec( } if (path == null) { - result.ErrorMessage = $"Can't automatically detect the appropriate engine because the --{pathSpecOptionName} option was invalid."; + if (getPathSpec == null) + { + result.ErrorMessage = $"You must explicitly set the engine version to use with --{result.Argument.Name}."; + } + else + { + result.ErrorMessage = $"Can't automatically detect the appropriate engine because the --{pathSpecOptionName} option was invalid."; + } return null!; } switch (path.Type) diff --git a/UET/uet/Program.cs b/UET/uet/Program.cs index 8b5853f2..994103c0 100644 --- a/UET/uet/Program.cs +++ b/UET/uet/Program.cs @@ -11,6 +11,7 @@ using System.Text.RegularExpressions; using UET.Commands.AppleCert; using UET.Commands.Build; +using UET.Commands.CMake; using UET.Commands.Config; using UET.Commands.Format; using UET.Commands.Generate; @@ -46,8 +47,20 @@ rootCommand.AddCommand(UefsCommand.CreateUefsCommand()); rootCommand.AddCommand(TransferCommand.CreateTransferCommand()); rootCommand.AddCommand(AppleCertCommand.CreateAppleCertCommand()); +rootCommand.AddCommand(CMakeCommand.CreateCMakeCommand()); rootCommand.AddCommand(InternalCommand.CreateInternalCommand(globalCommands)); +// If we have an implicit command variable, this is an internal command where we can't specify arguments directly. +var implicitCommand = Environment.GetEnvironmentVariable("UET_IMPLICIT_COMMAND"); +if (!string.IsNullOrWhiteSpace(implicitCommand)) +{ + // Clear it for any downstream processes we might start. + Environment.SetEnvironmentVariable("UET_IMPLICIT_COMMAND", null); + + // Prepend to args. + args = new[] { "internal", implicitCommand }.Concat(args).ToArray(); +} + // Parse the command line so we can inspect it. var parseResult = rootCommand.Parse(args); var isGlobalCommand = globalCommands.Contains(parseResult.CommandResult.Command); From 04175df14df3885d5ee4125b17d681a904b9a296 Mon Sep 17 00:00:00 2001 From: June Rhodes Date: Wed, 25 Dec 2024 11:14:07 +1100 Subject: [PATCH 7/7] Try to fix YamlDotNet trim warnings caused by implicit upgrade --- .../Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj | 4 ++-- UET/uet/uet.csproj | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj b/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj index 1c6789fa..20425edd 100644 --- a/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj +++ b/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj @@ -3,8 +3,8 @@ - - + + diff --git a/UET/uet/uet.csproj b/UET/uet/uet.csproj index 40039c92..6af97b47 100644 --- a/UET/uet/uet.csproj +++ b/UET/uet/uet.csproj @@ -37,6 +37,10 @@ runtime; build; native; contentfiles; analyzers; buildtransitive + + + +