diff --git a/UET/Redpoint.Uba/DefaultUbaServer.cs b/UET/Redpoint.Uba/DefaultUbaServer.cs index d21e414d..81e1ef04 100644 --- a/UET/Redpoint.Uba/DefaultUbaServer.cs +++ b/UET/Redpoint.Uba/DefaultUbaServer.cs @@ -17,8 +17,7 @@ internal partial class DefaultUbaServer : IUbaServer { private readonly ILogger _logger; private readonly IProcessArgumentParser _processArgumentParser; - private readonly IProcessExecutor _localProcessExecutor; - private readonly UbaLoggerForwarder _ubaLoggerForwarder; + private readonly nint _ubaLogger; private readonly CancellationTokenSource _localWorkerCancellationTokenSource; private nint _server; private nint _storageServer; @@ -27,6 +26,11 @@ internal partial class DefaultUbaServer : IUbaServer private readonly ConcurrentDictionary _returnedProcesses; private readonly TerminableAwaitableConcurrentQueue _processQueue; private readonly Task[] _localWorkerTasks; + private readonly SessionServer_RemoteProcessAvailableCallback _onRemoteProcessAvailableDelegate; + private readonly SessionServer_RemoteProcessReturnedCallback _onRemoteProcessReturnedDelegate; + private long _processesPendingInQueue; + private long _processesExecutingLocally; + private long _processesExecutingRemotely; #region Library Imports @@ -47,7 +51,7 @@ private static partial bool Server_AddClient( nint server, string ip, int port, - string crypto); + nint crypto); [LibraryImport("UbaHost", StringMarshalling = StringMarshalling.Custom, StringMarshallingCustomType = typeof(UbaStringMarshaller))] [DefaultDllImportSearchPaths(DllImportSearchPath.SafeDirectories)] @@ -113,6 +117,14 @@ private static partial nint CreateProcessStartInfo( private static partial void DestroyProcessStartInfo( nint processStartInfo); + [LibraryImport("UbaHost", StringMarshalling = StringMarshalling.Custom, StringMarshallingCustomType = typeof(UbaStringMarshaller))] + [DefaultDllImportSearchPaths(DllImportSearchPath.SafeDirectories)] + private static partial nint SessionServer_RunProcess( + nint sessionServer, + nint processStartInfo, + [MarshalAs(UnmanagedType.I1)] bool async, + [MarshalAs(UnmanagedType.I1)] bool enableDetour); + [LibraryImport("UbaHost", StringMarshalling = StringMarshalling.Custom, StringMarshallingCustomType = typeof(UbaStringMarshaller))] [DefaultDllImportSearchPaths(DllImportSearchPath.SafeDirectories)] private static partial nint SessionServer_RunProcessRemote( @@ -168,16 +180,14 @@ private static partial void SessionServer_SetRemoteProcessReturned( public DefaultUbaServer( ILogger logger, IProcessArgumentParser processArgumentParser, - IProcessExecutor localProcessExecutor, - UbaLoggerForwarder ubaLoggerForwarder, + nint ubaLogger, nint server, string rootStorageDirectoryPath, string ubaTraceFilePath) { _logger = logger; _processArgumentParser = processArgumentParser; - _localProcessExecutor = localProcessExecutor; - _ubaLoggerForwarder = ubaLoggerForwarder; + _ubaLogger = ubaLogger; _localWorkerCancellationTokenSource = new CancellationTokenSource(); _server = server; @@ -187,7 +197,7 @@ public DefaultUbaServer( rootStorageDirectoryPath, 40uL * 1000 * 1000 * 1000, false, - _ubaLoggerForwarder.Logger, + ubaLogger, string.Empty); if (_storageServer == nint.Zero) { @@ -198,7 +208,7 @@ public DefaultUbaServer( _sessionServerCreateInfo = CreateSessionServerCreateInfo( _storageServer, _server, - _ubaLoggerForwarder.Logger, + ubaLogger, rootStorageDirectoryPath, ubaTraceFilePath, disableCustomAllocator: false, @@ -227,207 +237,267 @@ public DefaultUbaServer( .Select(x => Task.Run(LocalProcessWorkerLoop)) .ToArray(); - SessionServer_SetRemoteProcessAvailable(_sessionServer, OnRemoteProcessAvailable, nint.Zero); - SessionServer_SetRemoteProcessReturned(_sessionServer, OnRemoteProcessReturned, nint.Zero); + _onRemoteProcessAvailableDelegate = OnRemoteProcessAvailable; + _onRemoteProcessReturnedDelegate = OnRemoteProcessReturned; + SessionServer_SetRemoteProcessAvailable(_sessionServer, _onRemoteProcessAvailableDelegate, nint.Zero); + SessionServer_SetRemoteProcessReturned(_sessionServer, _onRemoteProcessReturnedDelegate, nint.Zero); } - public bool AddRemoteAgent(string ip, int port, string crypto) + public bool AddRemoteAgent(string ip, int port) { - return Server_AddClient(_server, ip, port, crypto); + return Server_AddClient(_server, ip, port, nint.Zero); } - private void OnRemoteProcessAvailable(nint userData) + public long ProcessesPendingInQueue => Interlocked.Read(ref _processesPendingInQueue); + public long ProcessesExecutingLocally => Interlocked.Read(ref _processesExecutingLocally); + public long ProcessesExecutingRemotely => Interlocked.Read(ref _processesExecutingRemotely); + + private async Task RunProcessAsync( + UbaProcessDescriptor descriptor, + Func createProcessOnServer, + bool isRemoteExecution) { - // Start a background task to queue a remote process (since this function can't be async). - _ = Task.Run(async () => + var isRequeued = false; + try { - // Grab the next process to run. - var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false); + // Create the gate that we can wait on until the process exits. + var exitedGate = new Gate(); + ExitCallback exited = (nint userdata, nint handle) => + { + exitedGate.Open(); + }; + + // Try to set the description. + var description = "Unknown"; + description = Path.GetFileName(descriptor.ProcessSpecification.Arguments.Last().LogicalValue); + + // Create the process start info. + var arguments = _processArgumentParser.JoinArguments(descriptor.ProcessSpecification.Arguments); + arguments = arguments.Replace(@"-DCMAKE_CFG_INTDIR=""Release""", @"-DCMAKE_CFG_INTDIR=\""Release\""", StringComparison.Ordinal); + arguments = arguments.Replace(@"-DCMAKE_CFG_INTDIR=""Debug""", @"-DCMAKE_CFG_INTDIR=\""Debug\""", StringComparison.Ordinal); + var processStartInfo = CreateProcessStartInfo( + descriptor.ProcessSpecification.FilePath, + arguments, + descriptor.ProcessSpecification.WorkingDirectory ?? Environment.CurrentDirectory, + description, + (uint)ProcessPriorityClass.Normal, + int.MaxValue, + true, + string.Empty, + exited); + if (processStartInfo == nint.Zero) + { + throw new InvalidOperationException("Unable to create UBA process start info!"); + } - // Run the process remotely. - var isRequeued = false; - try + // Process and log tracking that needs to be shared between try and finally blocks. + var process = nint.Zero; + uint logIndex = 0; + var getTargetLogLine = (uint targetLogIndex) => { - // Create the gate that we can wait on until the process exits. - var exitedGate = new Gate(); - ExitCallback exited = (nint userdata, nint handle) => + var ptr = ProcessHandle_GetLogLine(process, targetLogIndex); + if (ptr == nint.Zero) { - exitedGate.Open(); - }; - - // Create the process start info. - var processStartInfo = CreateProcessStartInfo( - descriptor.ProcessSpecification.FilePath, - _processArgumentParser.JoinArguments(descriptor.ProcessSpecification.Arguments), - descriptor.ProcessSpecification.WorkingDirectory ?? Environment.CurrentDirectory, - $"Redpoint.Uba: {descriptor.ProcessSpecification.FilePath} {_processArgumentParser.JoinArguments(descriptor.ProcessSpecification.Arguments)}", - (uint)ProcessPriorityClass.Normal, - int.MaxValue, - true, - string.Empty, - exited); - if (processStartInfo == nint.Zero) + return null; + } + if (OperatingSystem.IsWindows()) { - throw new InvalidOperationException("Unable to create UBA process start info!"); + return Marshal.PtrToStringUni(ptr); } - - // Process and log tracking that needs to be shared between try and finally blocks. - var process = nint.Zero; - uint logIndex = 0; - var getTargetLogLine = (uint targetLogIndex) => + else { - var ptr = ProcessHandle_GetLogLine(process, targetLogIndex); - if (ptr == nint.Zero) - { - return null; - } - if (OperatingSystem.IsWindows()) - { - return Marshal.PtrToStringUni(ptr); - } - else - { - return Marshal.PtrToStringUTF8(ptr); - } - }; - var getExecutingHost = () => + return Marshal.PtrToStringUTF8(ptr); + } + }; + var getExecutingHost = () => + { + var ptr = ProcessHandle_GetExecutingHost(process); + if (ptr == nint.Zero) { - var ptr = ProcessHandle_GetExecutingHost(process); - if (ptr == nint.Zero) - { - return null; - } - if (OperatingSystem.IsWindows()) - { - return Marshal.PtrToStringUni(ptr); - } - else - { - return Marshal.PtrToStringUTF8(ptr); - } - }; - var flushLogLines = () => + return null; + } + if (OperatingSystem.IsWindows()) + { + return Marshal.PtrToStringUni(ptr); + } + else + { + return Marshal.PtrToStringUTF8(ptr); + } + }; + var flushLogLines = () => + { + if (process != nint.Zero) { - if (process != nint.Zero) + var nextLogLine = getTargetLogLine(logIndex); + while (nextLogLine != null) { - var nextLogLine = getTargetLogLine(logIndex); - while (nextLogLine != null) + logIndex++; + if (descriptor.CaptureSpecification.InterceptStandardOutput) { - logIndex++; - if (descriptor.CaptureSpecification.InterceptStandardOutput) - { - descriptor.CaptureSpecification.OnReceiveStandardOutput(nextLogLine); - } - else - { - Console.WriteLine(nextLogLine); - } - nextLogLine = getTargetLogLine(logIndex); + descriptor.CaptureSpecification.OnReceiveStandardOutput(nextLogLine); } - } - }; - - // try/finally to ensure we release native resources when finished. - var isCancelled = false; - var isComplete = false; - ulong processHash = 0; - var releaseProcessResources = (bool forceCancel) => - { - if (process != nint.Zero) - { - if (!isComplete && !isCancelled && (forceCancel || descriptor.CancellationToken.IsCancellationRequested)) + else { - ProcessHandle_Cancel(process, true); - isCancelled = true; + Console.WriteLine(nextLogLine); } - DestroyProcessHandle(process); - _returnedProcesses.Remove(processHash, out _); - process = nint.Zero; + nextLogLine = getTargetLogLine(logIndex); } - }; - try - { - // Check for cancellation. - descriptor.CancellationToken.ThrowIfCancellationRequested(); + } + }; - // Run the process remotely. - process = SessionServer_RunProcessRemote( - _sessionServer, - processStartInfo, - 1.0f, - null, - 0); - if (process == nint.Zero) + // try/finally to ensure we release native resources when finished. + var isCancelled = false; + var isComplete = false; + ulong processHash = 0; + var releaseProcessResources = (bool forceCancel) => + { + if (process != nint.Zero) + { + if (!isComplete && !isCancelled && (forceCancel || descriptor.CancellationToken.IsCancellationRequested)) { - throw new InvalidOperationException("Unable to create UBA remote process!"); + ProcessHandle_Cancel(process, true); + isCancelled = true; } - processHash = ProcessHandle_GetHash(process); + DestroyProcessHandle(process); + _returnedProcesses.Remove(processHash, out _); + process = nint.Zero; + } + }; + try + { + // Check for cancellation. + descriptor.CancellationToken.ThrowIfCancellationRequested(); - // While we wait for the exit gate to open, poll for log lines. - while (!exitedGate.Opened && - !_returnedProcesses.ContainsKey(processHash) && - !descriptor.CancellationToken.IsCancellationRequested) - { - // Check for cancellation. - descriptor.CancellationToken.ThrowIfCancellationRequested(); + // Run the process remotely. + process = createProcessOnServer(processStartInfo); + if (process == nint.Zero) + { + throw new InvalidOperationException("Unable to create UBA remote process!"); + } + processHash = ProcessHandle_GetHash(process); - // Flush all available log lines. - flushLogLines(); + // While we wait for the exit gate to open, poll for log lines. + while (!exitedGate.Opened && + !_returnedProcesses.ContainsKey(processHash) && + !descriptor.CancellationToken.IsCancellationRequested) + { + // Check for cancellation. + descriptor.CancellationToken.ThrowIfCancellationRequested(); - // Continue waiting for the process to exit. - await Task.Delay(200, descriptor.CancellationToken).ConfigureAwait(false); - } + // Flush all available log lines. + flushLogLines(); - // Get the exit code and mark as completed if the command finished running. - int? exitCode = null; - if (!_returnedProcesses.ContainsKey(processHash)) - { - exitCode = (int)ProcessHandle_GetExitCode(process); - isComplete = true; - } + // Continue waiting for the process to exit. + await Task.Delay(200, descriptor.CancellationToken).ConfigureAwait(false); + } - // Check if we should requeue this process. - if (!exitCode.HasValue /* Returned by remote agent */ || - (exitCode.HasValue && exitCode == 9006) /* Known retry code when running cmd.exe via remote agent */) - { - // Prefer to run this command locally now. - descriptor.PreferRemote = false; + // Get the exit code and mark as completed if the command finished running. + int? exitCode = null; + if (!_returnedProcesses.ContainsKey(processHash)) + { + exitCode = (int)ProcessHandle_GetExitCode(process); + isComplete = true; + } - // Cancel and release remote process. - releaseProcessResources(true); + // Check if we should requeue this process. + if (isRemoteExecution && ( + !exitCode.HasValue /* Returned by remote agent */ || + (exitCode.HasValue && exitCode != 0) /* Allow a compile failure to be retried locally */)) + { + // Prefer to run this command locally now. + descriptor.AllowRemote = false; - // Push this process back into the queue for local execution. - _processQueue.Enqueue(descriptor); - isRequeued = true; - return; - } + // Cancel and release remote process. + releaseProcessResources(true); - // Otherwise, return the exit code. - isComplete = true; - descriptor.ExitCode = exitCode!.Value; + // Push this process back into the queue for local execution. + _processQueue.Enqueue(descriptor); + isRequeued = true; return; } - finally - { - flushLogLines(); - releaseProcessResources(false); - DestroyProcessStartInfo(processStartInfo); - } + + // Otherwise, return the exit code. + isComplete = true; + descriptor.ExitCode = exitCode ?? 99999; + return; } - catch (Exception ex) + finally { - if (!isRequeued) - { - descriptor.ExceptionDispatchInfo = ExceptionDispatchInfo.Capture(ex); - } + flushLogLines(); + releaseProcessResources(false); + DestroyProcessStartInfo(processStartInfo); } - finally + } + catch (Exception ex) + { + if (!isRequeued) + { + descriptor.ExceptionDispatchInfo = ExceptionDispatchInfo.Capture(ex); + } + } + finally + { + if (!isRequeued) + { + descriptor.CompletionGate.Open(); + } + else + { + Interlocked.Increment(ref _processesPendingInQueue); + } + if (isRemoteExecution) + { + Interlocked.Decrement(ref _processesExecutingRemotely); + } + else { - if (!isRequeued) + Interlocked.Decrement(ref _processesExecutingLocally); + } + } + } + + private void OnRemoteProcessAvailable(nint userData) + { + // Start a background task to queue a remote process (since this function can't be async). + _ = Task.Run(async () => + { + // Before we attempt to actually dequeue, check that there is something to dequeue, since UBA + // frequently calls OnRemoteProcessAvailable whileever slots are free (and not just when a slot + // opens up). + if (Interlocked.Read(ref _processesPendingInQueue) == 0) + { + return; + } + + // Grab the next process to run. + var seenDescriptors = new HashSet(); + retryFetch: + var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false); + if (!descriptor.AllowRemote) + { + if (seenDescriptors.Contains(descriptor)) { - descriptor.CompletionGate.Open(); + // No remotable processes in queue at the moment. + return; } + seenDescriptors.Add(descriptor); + _processQueue.Enqueue(descriptor); + goto retryFetch; } + Interlocked.Decrement(ref _processesPendingInQueue); + Interlocked.Increment(ref _processesExecutingRemotely); + + // Run the process remotely. + await RunProcessAsync( + descriptor, + processStartInfo => SessionServer_RunProcessRemote( + _sessionServer, + processStartInfo, + 1.0f, + null, + 0), + true).ConfigureAwait(false); }); } @@ -443,7 +513,7 @@ private async Task LocalProcessWorkerLoop() // Grab the next process to run. var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false); if (descriptor.PreferRemote && - (descriptor.DateQueuedUtc - DateTimeOffset.UtcNow).TotalSeconds < 30) + (DateTimeOffset.UtcNow - descriptor.DateQueuedUtc).TotalSeconds < 30) { // If this process prefers remote execution, and it hasn't been sitting in the queue for // at least 30 seconds, requeue it and try again. @@ -451,23 +521,18 @@ private async Task LocalProcessWorkerLoop() await Task.Delay(200, _localWorkerCancellationTokenSource.Token).ConfigureAwait(false); continue; } + Interlocked.Decrement(ref _processesPendingInQueue); + Interlocked.Increment(ref _processesExecutingLocally); // Run the process locally. - try - { - descriptor.ExitCode = await _localProcessExecutor.ExecuteAsync( - descriptor.ProcessSpecification, - descriptor.CaptureSpecification, - descriptor.CancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - descriptor.ExceptionDispatchInfo = ExceptionDispatchInfo.Capture(ex); - } - finally - { - descriptor.CompletionGate.Open(); - } + await RunProcessAsync( + descriptor, + processStartInfo => SessionServer_RunProcess( + _sessionServer, + processStartInfo, + true, + false), + false).ConfigureAwait(false); } while (true); } @@ -485,16 +550,19 @@ public async Task ExecuteAsync( } // Push the request into the queue. + var ubaProcessSpecification = processSpecification as UbaProcessSpecification; var descriptor = new UbaProcessDescriptor { ProcessSpecification = processSpecification, CaptureSpecification = captureSpecification, CancellationToken = cancellationToken, DateQueuedUtc = DateTimeOffset.UtcNow, - PreferRemote = processSpecification is UbaProcessSpecification ubaProcessSpecification && ubaProcessSpecification.PreferRemote, + PreferRemote = ubaProcessSpecification != null && ubaProcessSpecification.PreferRemote, + AllowRemote = ubaProcessSpecification == null || ubaProcessSpecification.AllowRemote, CompletionGate = new Gate(), }; _processQueue.Enqueue(descriptor); + Interlocked.Increment(ref _processesPendingInQueue); // Wait for the gate to be opened. await descriptor.CompletionGate.WaitAsync(cancellationToken).ConfigureAwait(false); diff --git a/UET/Redpoint.Uba/DefaultUbaServerFactory.cs b/UET/Redpoint.Uba/DefaultUbaServerFactory.cs index 68339e05..03f5048e 100644 --- a/UET/Redpoint.Uba/DefaultUbaServerFactory.cs +++ b/UET/Redpoint.Uba/DefaultUbaServerFactory.cs @@ -13,18 +13,15 @@ internal class DefaultUbaServerFactory : IUbaServerFactory private readonly ILogger _defaultUbaServerLogger; private readonly ILogger _ubaLoggerForwarderLogger; private readonly IProcessArgumentParser _processArgumentParser; - private readonly IProcessExecutor _localProcessExecutor; public DefaultUbaServerFactory( ILogger defaultUbaServerLogger, ILogger ubaLoggerForwarderLogger, - IProcessArgumentParser processArgumentParser, - IProcessExecutor localProcessExecutor) + IProcessArgumentParser processArgumentParser) { _defaultUbaServerLogger = defaultUbaServerLogger; _ubaLoggerForwarderLogger = ubaLoggerForwarderLogger; _processArgumentParser = processArgumentParser; - _localProcessExecutor = localProcessExecutor; } public IUbaServer CreateServer( @@ -46,9 +43,11 @@ public IUbaServer CreateServer( Directory.CreateDirectory(rootStorageDirectoryPath); Directory.CreateDirectory(Path.GetDirectoryName(ubaTraceFilePath)!); - var loggingForwarder = new UbaLoggerForwarder(_ubaLoggerForwarderLogger); + // @note: UbaLoggerForwarder must be static and hold a singleton log. Otherwise we can run into a crash if + // DefaultUbaServer is finalized after UbaLoggerForwarder. + var ubaLogger = UbaLoggerForwarder.GetUbaLogger(_ubaLoggerForwarderLogger); var server = UbaServerDelayedImports.CreateServer( - loggingForwarder.Logger, + ubaLogger, maxWorkers, sendSize, receiveTimeoutSeconds, @@ -56,8 +55,7 @@ public IUbaServer CreateServer( return new DefaultUbaServer( _defaultUbaServerLogger, _processArgumentParser, - _localProcessExecutor, - loggingForwarder, + ubaLogger, server, rootStorageDirectoryPath, ubaTraceFilePath); diff --git a/UET/Redpoint.Uba/IUbaServer.cs b/UET/Redpoint.Uba/IUbaServer.cs index f166240a..fca59900 100644 --- a/UET/Redpoint.Uba/IUbaServer.cs +++ b/UET/Redpoint.Uba/IUbaServer.cs @@ -10,7 +10,7 @@ /// /// We don't support the UBA mode whereby the server listens on a port, and remote agents (clients) initiated a connection back to the server. That's because most developer machines are behind a firewall that remote agents won't be able to initiate a connection through. /// - /// Instead, we only support the model by which the remote agents (clients) listen on a port, and the server actively connects to them using . + /// Instead, we only support the model by which the remote agents (clients) listen on a port, and the server actively connects to them using . /// public interface IUbaServer : IProcessExecutor, IAsyncDisposable { @@ -19,8 +19,22 @@ public interface IUbaServer : IProcessExecutor, IAsyncDisposable /// /// The IP address of the remote agent. /// The port of the remote agent. - /// If the remote agent has the '-crypto' parameter set, this should be the 32 character hexadecimal value representing the symmetric cryptographic key. /// True if the agent was successfully added. - bool AddRemoteAgent(string ip, int port, string crypto = ""); + bool AddRemoteAgent(string ip, int port); + + /// + /// The number of processes currently in the queue to start execution. + /// + long ProcessesPendingInQueue { get; } + + /// + /// The number of processes executing locally. + /// + long ProcessesExecutingLocally { get; } + + /// + /// The number of processes executing on remote agents. + /// + long ProcessesExecutingRemotely { get; } } } diff --git a/UET/Redpoint.Uba/UbaLoggerForwarder.cs b/UET/Redpoint.Uba/UbaLoggerForwarder.cs index 3102432b..a523f581 100644 --- a/UET/Redpoint.Uba/UbaLoggerForwarder.cs +++ b/UET/Redpoint.Uba/UbaLoggerForwarder.cs @@ -5,11 +5,10 @@ using System; using System.Runtime.InteropServices; - internal partial class UbaLoggerForwarder : IDisposable + internal partial class UbaLoggerForwarder { - private readonly ILogger _logger; - private readonly nint _ubaLogger; - private bool _hasDisposed; + private static ILogger? _logger; + private static nint? _ubaLogger; #region Library Imports @@ -36,8 +35,13 @@ private static partial void DestroyCallbackLogWriter( #endregion - public UbaLoggerForwarder(ILogger logger) + public static nint GetUbaLogger(ILogger logger) { + if (_ubaLogger.HasValue) + { + return _ubaLogger.Value; + } + _logger = logger; _ubaLogger = CreateCallbackLogWriter( BeginScope, @@ -47,9 +51,9 @@ public UbaLoggerForwarder(ILogger logger) { throw new InvalidOperationException("Unable to create UBA logger!"); } - } - public nint Logger => _ubaLogger; + return _ubaLogger.Value; + } private static void BeginScope() { @@ -59,7 +63,7 @@ private static void EndScope() { } - private void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint prefixLen) + private static void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint prefixLen) { string message; if (OperatingSystem.IsWindows()) @@ -74,46 +78,20 @@ private void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint pre switch (logEntryType) { case 0: - _logger.LogError(message); + _logger!.LogError(message); break; case 1: - _logger.LogWarning(message); + _logger!.LogWarning(message); break; case 2: - _logger.LogInformation(message); + _logger!.LogInformation(message); break; case 3: case 4: default: - _logger.LogDebug(message); + _logger!.LogDebug(message); break; } } - - protected virtual void Dispose(bool disposing) - { - if (!_hasDisposed) - { - if (_ubaLogger != nint.Zero) - { - DestroyCallbackLogWriter(_ubaLogger); - } - - _hasDisposed = true; - } - } - - ~UbaLoggerForwarder() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: false); - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } } } diff --git a/UET/Redpoint.Uba/UbaProcessDescriptor.cs b/UET/Redpoint.Uba/UbaProcessDescriptor.cs index 37571545..f6c4bb8a 100644 --- a/UET/Redpoint.Uba/UbaProcessDescriptor.cs +++ b/UET/Redpoint.Uba/UbaProcessDescriptor.cs @@ -18,6 +18,8 @@ internal class UbaProcessDescriptor public required bool PreferRemote { get; set; } + public required bool AllowRemote { get; set; } + public required Gate CompletionGate { get; init; } public int ExitCode { get; set; } diff --git a/UET/Redpoint.Uba/UbaProcessSpecification.cs b/UET/Redpoint.Uba/UbaProcessSpecification.cs index ac71ca83..8aa97387 100644 --- a/UET/Redpoint.Uba/UbaProcessSpecification.cs +++ b/UET/Redpoint.Uba/UbaProcessSpecification.cs @@ -13,5 +13,10 @@ public class UbaProcessSpecification : ProcessSpecification /// remote agent. /// public bool PreferRemote { get; set; } + + /// + /// If true, this command can be run remotely. Defaults to true if not set. + /// + public bool AllowRemote { get; set; } = true; } } diff --git a/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj b/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj index 1c6789fa..20425edd 100644 --- a/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj +++ b/UET/Redpoint.Uet.BuildPipeline.Executors.GitLab/Redpoint.Uet.BuildPipeline.Executors.GitLab.csproj @@ -3,8 +3,8 @@ - - + + diff --git a/UET/uet/Commands/CMake/CMakeCommand.cs b/UET/uet/Commands/CMake/CMakeCommand.cs new file mode 100644 index 00000000..b47adbfe --- /dev/null +++ b/UET/uet/Commands/CMake/CMakeCommand.cs @@ -0,0 +1,249 @@ +namespace UET.Commands.CMake +{ + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; + using Redpoint.PathResolution; + using Redpoint.ProcessExecution; + using System; + using System.Collections.Generic; + using System.CommandLine; + using System.CommandLine.Invocation; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using UET.Commands.EngineSpec; + using UET.Services; + + internal sealed class CMakeCommand + { + internal sealed class Options + { + public Option Engine; + + public Options() + { + Engine = new Option( + "--engine", + description: "The engine to use the Unreal Build Accelerator from.", + parseArgument: EngineSpec.ParseEngineSpecContextless(), + isDefault: true); + Engine.AddAlias("-e"); + Engine.Arity = ArgumentArity.ZeroOrOne; + } + } + + public static Command CreateCMakeCommand() + { + var options = new Options(); + var commandArguments = new Argument("command-and-arguments", "The command to run, followed by any arguments to pass to it."); + commandArguments.Arity = ArgumentArity.ZeroOrMore; + var command = new Command("cmake", "Run a CMake-based build leveraging the Unreal Build Accelerator (UBA) to distribute compilation.") + { + FullDescription = """ + This command runs a CMake-based build, leveraging the Unreal Build Accelerator (UBA) to distribute the build over Kubernetes. + + ------------- + + Before you can distribute builds, you must configure your BuildConfiguration.xml file located at "%appdata%\Unreal Engine\UnrealBuildTool\BuildConfiguration.xml" with the settings to connect to the Kubernetes cluster. The Kubernetes cluster must have Windows nodes in it. You can use RKM (https://src.redpoint.games/redpointgames/rkm) to spin up a Kubernetes cluster with Windows nodes with a single command. + + + + + default + defafult + 10.0.0.100 + ShareName + Domain\Username + Password + + + + The 'Smb' settings specify a network share that all Windows nodes can access as the specified user. The Unreal Build Accelerator will be copied to this share and the containers will copy from this network share. + + Your `kubectl` configuration must be connected to the cluster already, as per the 'Context' setting. You can get the context name by running `kubectl config get-contexts`. The 'Namespace' setting specifies what Kubernetes namespace to launch UBA agents into. + + ------------- + + To distribute builds, you must first generate your CMake project using: + + uet cmake -- ... + + You should omit `-G`; this command will automatically select the Ninja project generator which is required to distribute builds. + + Once you've generated your project, you can distribute the build using: + + uet cmake -e 5.5 -- --build ... + + The presence of `--build` in the CMake arguments is what this tool uses to determine whether CMake is generating project files or running the build. You only need to specify `-e` as an argument to this command when running the build; it is not necessary during generation. + + All arguments past the `--` are forwarded to CMake intact. + """ + }; + command.AddAllOptions(options); + command.AddArgument(commandArguments); + command.AddCommonHandler(options, services => + { + services.AddSingleton(commandArguments); + }); + return command; + } + + private sealed class CMakeCommandInstance : ICommandInstance + { + private readonly ILogger _logger; + private readonly IProcessExecutor _processExecutor; + private readonly ISelfLocation _selfLocation; + private readonly IPathResolver _pathResolver; + private readonly Options _options; + private readonly Argument _commandArguments; + + public CMakeCommandInstance( + ILogger logger, + IProcessExecutor processExecutor, + ISelfLocation selfLocation, + IPathResolver pathResolver, + Options options, + Argument commandArguments) + { + _logger = logger; + _processExecutor = processExecutor; + _selfLocation = selfLocation; + _pathResolver = pathResolver; + _options = options; + _commandArguments = commandArguments; + } + + public async Task ExecuteAsync(InvocationContext context) + { + var extraArguments = context.ParseResult.GetValueForArgument(_commandArguments); + + string? cmake = null; + try + { + cmake = await _pathResolver.ResolveBinaryPath("cmake"); + } + catch (FileNotFoundException) + { + cmake = Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.ProgramFiles), + "Microsoft Visual Studio\\2022\\Community\\Common7\\IDE\\CommonExtensions\\Microsoft\\CMake\\CMake\\bin\\cmake.exe"); + } + if (cmake == null || !File.Exists(cmake)) + { + _logger.LogError($"Unable to find CMake on PATH or at '{cmake}'."); + return 1; + } + + var engineResult = context.ParseResult.CommandResult.FindResultFor(_options.Engine); + string? engineString = null; + if (engineResult != null) + { + // Run the parse of the engine specification so we can error early if it's invalid. + context.ParseResult.GetValueForOption(_options.Engine); + + engineString = engineResult.Tokens.Count > 0 ? engineResult.Tokens[0].Value : null; + } + + if (extraArguments.Contains("--build")) + { + _logger.LogInformation("CMake is building the project (--build detected)."); + + if (string.IsNullOrWhiteSpace(engineString)) + { + _logger.LogError("Missing --engine option, which is necessary for --build. Use this command like 'uet cmake -e 5.5 -- ...'."); + return 1; + } + + // Generate a session ID. + var sessionId = Guid.NewGuid().ToString(); + + // Start the CMake UBA server. + _logger.LogInformation("Starting CMake UBA server..."); + using var backgroundCts = CancellationTokenSource.CreateLinkedTokenSource(context.GetCancellationToken()); + var backgroundTask = Task.Run(async () => await _processExecutor.ExecuteAsync( + new ProcessSpecification + { + FilePath = _selfLocation.GetUetLocalLocation(), + Arguments = [ + "internal", + "cmake-uba-server", + "-e", + engineString, + ], + EnvironmentVariables = new Dictionary + { + { "CMAKE_UBA_SESSION_ID", sessionId } + } + }, + CaptureSpecification.Passthrough, + backgroundCts.Token).ConfigureAwait(false)); + + var appendArguments = new List(); + if (!(extraArguments.Contains("-j") || extraArguments.Any(x => x.StartsWith("-j", StringComparison.Ordinal)))) + { + // Override core detection because we're distributing builds. + appendArguments.Add("-j256"); + } + + // Run CMake. + try + { + _logger.LogInformation("Running CMake..."); + return await _processExecutor.ExecuteAsync( + new ProcessSpecification + { + FilePath = cmake, + Arguments = extraArguments.Select(x => new LogicalProcessArgument(x)).Concat(appendArguments), + EnvironmentVariables = new Dictionary + { + { "CMAKE_UBA_SESSION_ID", sessionId }, + { "UET_IMPLICIT_COMMAND", "cmake-uba-run" }, + } + }, + CaptureSpecification.Passthrough, + context.GetCancellationToken()); + } + finally + { + backgroundCts.Cancel(); + try + { + _logger.LogInformation("Stopping CMake UBA server..."); + await backgroundTask.ConfigureAwait(false); + } + catch + { + } + } + } + else + { + _logger.LogInformation("CMake is generating project files (--build not detected)."); + + if (extraArguments.Contains("-G") || extraArguments.Any(x => x.StartsWith("-G", StringComparison.Ordinal))) + { + _logger.LogError("Detected -G argument passed to CMake during project generation. Omit this setting as UET will force use of the Ninja build system."); + return 1; + } + + // Run CMake. + _logger.LogInformation("Running CMake..."); + return await _processExecutor.ExecuteAsync( + new ProcessSpecification + { + FilePath = cmake, + Arguments = new LogicalProcessArgument[] + { + "-G", + "Ninja", + $"-DCMAKE_C_COMPILER_LAUNCHER={_selfLocation.GetUetLocalLocation()}", + $"-DCMAKE_CXX_COMPILER_LAUNCHER={_selfLocation.GetUetLocalLocation()}", + }.Concat(extraArguments.Select(x => new LogicalProcessArgument(x))), + }, + CaptureSpecification.Passthrough, + context.GetCancellationToken()); + } + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs new file mode 100644 index 00000000..661dd8de --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaRun/CMakeUbaRunCommand.cs @@ -0,0 +1,142 @@ +namespace UET.Commands.Internal.CMakeUbaRun +{ + using CMakeUba; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; + using Redpoint.GrpcPipes; + using Redpoint.ProcessExecution; + using System; + using System.CommandLine; + using System.CommandLine.Invocation; + using System.Threading.Tasks; + using UET.Services; + using static CMakeUba.CMakeUbaService; + + internal class CMakeUbaRunCommand + { + internal sealed class Options + { + public Option PreferRemote = new Option( + "--prefer-remote", + "If true, the CMake UBA server will prefer to run this command remotely."); + } + + public static Command CreateCMakeUbaRunCommand() + { + var options = new Options(); + var commandArguments = new Argument("command-and-arguments", "The command to run, followed by any arguments to pass to it."); + commandArguments.Arity = ArgumentArity.OneOrMore; + var command = new Command("cmake-uba-run"); + command.AddAllOptions(options); + command.AddArgument(commandArguments); + command.AddCommonHandler(options, services => + { + services.AddSingleton(commandArguments); + }); + return command; + } + + private sealed class CMakeUbaRunCommandInstance : ICommandInstance + { + private readonly IGrpcPipeFactory _grpcPipeFactory; + private readonly ILogger _logger; + private readonly ISelfLocation _selfLocation; + private readonly IProcessExecutor _processExecutor; + private readonly Options _options; + private readonly Argument _commandArguments; + + public CMakeUbaRunCommandInstance( + IGrpcPipeFactory grpcPipeFactory, + ILogger logger, + ISelfLocation selfLocation, + IProcessExecutor processExecutor, + Options options, + Argument commandArguments) + { + _grpcPipeFactory = grpcPipeFactory; + _logger = logger; + _selfLocation = selfLocation; + _processExecutor = processExecutor; + _options = options; + _commandArguments = commandArguments; + } + + private async Task IsServerRunningAsync(string pipeName) + { + for (int i = 0; i < 10; i++) + { + var client = _grpcPipeFactory.CreateClient( + pipeName, + GrpcPipeNamespace.User, + channel => new CMakeUbaServiceClient(channel)); + try + { + await client.PingServerAsync(new CMakeUba.EmptyMessage()).ConfigureAwait(false); + return true; + } + catch + { + await Task.Delay(1000).ConfigureAwait(false); + } + } + return false; + } + + public async Task ExecuteAsync(InvocationContext context) + { + var sessionId = Environment.GetEnvironmentVariable("CMAKE_UBA_SESSION_ID"); + if (string.IsNullOrWhiteSpace(sessionId)) + { + _logger.LogError($"Expected CMAKE_UBA_SESSION_ID environment variable to be set."); + return 1; + } + var pipeName = $"cmake-uba-{sessionId}"; + + // Start the CMake UBA server on-demand if it's not already running. + if (!await IsServerRunningAsync(pipeName).ConfigureAwait(false)) + { + _logger.LogError($"The CMake UBA server isn't running. Start it with '{_selfLocation.GetUetLocalLocation()} internal cmake-uba-server &'"); + return 1; + } + + // Create our gRPC client. + var client = _grpcPipeFactory.CreateClient( + pipeName, + GrpcPipeNamespace.User, + channel => new CMakeUbaServiceClient(channel)); + + // Run the process. + var arguments = context.ParseResult.GetValueForArgument(_commandArguments)!; + var request = new CMakeUba.ProcessRequest + { + Path = arguments[0], + WorkingDirectory = Environment.CurrentDirectory, + PreferRemote = context.ParseResult.GetValueForOption(_options.PreferRemote), + }; + for (int i = 1; i < arguments.Length; i++) + { + request.Arguments.Add(new ProcessArgument { LogicalValue = arguments[i] }); + } + + var response = client.ExecuteProcess(request, cancellationToken: context.GetCancellationToken()); + + // Stream the response values. + while (await response.ResponseStream.MoveNext(cancellationToken: context.GetCancellationToken()).ConfigureAwait(false)) + { + switch (response.ResponseStream.Current.DataCase) + { + case ProcessResponse.DataOneofCase.StandardOutputLine: + Console.WriteLine(response.ResponseStream.Current.StandardOutputLine); + break; + case ProcessResponse.DataOneofCase.ExitCode: + // @note: This is the last response; return the exit code. + return response.ResponseStream.Current.ExitCode; + } + } + + _logger.LogError("Did not receive exit code for process from CMake UBA server before the response stream ended."); + return 1; + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUba.proto b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUba.proto new file mode 100644 index 00000000..04677361 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUba.proto @@ -0,0 +1,31 @@ +syntax = 'proto3'; + +package CMakeUba; + +message ProcessArgument { + string logicalValue = 1; + string originalValue = 2; +} + +message ProcessRequest { + string path = 1; + repeated ProcessArgument arguments = 2; + string workingDirectory = 3; + bool preferRemote = 4; +} + +message ProcessResponse { + oneof data { + string standardOutputLine = 1; + // This will always be the last message from the response stream. + int32 exitCode = 2; + } +} + +message EmptyMessage { +} + +service CMakeUbaService { + rpc PingServer(EmptyMessage) returns (EmptyMessage) {} + rpc ExecuteProcess(ProcessRequest) returns (stream ProcessResponse) {} +} \ No newline at end of file diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs new file mode 100644 index 00000000..dd064b6a --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/CMakeUbaServerCommand.cs @@ -0,0 +1,276 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using CMakeUba; + using Grpc.Core; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; + using Redpoint.Concurrency; + using Redpoint.GrpcPipes; + using Redpoint.ProcessExecution; + using Redpoint.ProcessExecution.Enumerable; + using Redpoint.Uba; + using Redpoint.Uba.Native; + using Redpoint.Uet.BuildPipeline.Executors.Engine; + using Redpoint.Uet.BuildPipeline.Executors; + using Redpoint.Uet.Workspace; + using System; + using System.CommandLine; + using System.CommandLine.Invocation; + using System.Linq; + using System.Runtime.InteropServices; + using System.Threading; + using System.Threading.Tasks; + using UET.Commands.Config; + using UET.Commands.EngineSpec; + using static CMakeUba.CMakeUbaService; + + internal class CMakeUbaServerCommand + { + internal sealed class Options + { + public Option Engine; + + public Options() + { + Engine = new Option( + "--engine", + description: "The engine to use for the build.", + parseArgument: EngineSpec.ParseEngineSpecContextless(), + isDefault: true); + Engine.AddAlias("-e"); + Engine.Arity = ArgumentArity.ExactlyOne; + } + } + + public static Command CreateCMakeUbaServerCommand() + { + var options = new Options(); + var command = new Command("cmake-uba-server"); + command.AddAllOptions(options); + command.AddCommonHandler(options, services => + { + services.AddSingleton(); + }); + return command; + } + + private sealed class CMakeUbaServerCommandInstance : CMakeUbaServiceBase, ICommandInstance + { + private readonly IUbaServerFactory _ubaServerFactory; + private readonly IGrpcPipeFactory _grpcPipeFactory; + private readonly IXmlConfigHelper _xmlConfigHelper; + private readonly ILogger _logger; + private readonly IEngineWorkspaceProvider _engineWorkspaceProvider; + private readonly Options _options; + private CancellationToken? _commandCancellationToken; + private IUbaServer? _ubaServer; + + public CMakeUbaServerCommandInstance( + IUbaServerFactory ubaServerFactory, + IGrpcPipeFactory grpcPipeFactory, + IXmlConfigHelper xmlConfigHelper, + ILogger logger, + IEngineWorkspaceProvider engineWorkspaceProvider, + Options options) + { + _ubaServerFactory = ubaServerFactory; + _grpcPipeFactory = grpcPipeFactory; + _xmlConfigHelper = xmlConfigHelper; + _logger = logger; + _engineWorkspaceProvider = engineWorkspaceProvider; + _options = options; + } + + public async Task ExecuteAsync(InvocationContext context) + { + var engine = context.ParseResult.GetValueForOption(_options.Engine)!; + + BuildEngineSpecification engineSpec; + switch (engine.Type) + { + case EngineSpecType.UEFSPackageTag: + engineSpec = BuildEngineSpecification.ForUEFSPackageTag(engine.UEFSPackageTag!); + break; + case EngineSpecType.SESNetworkShare: + engineSpec = BuildEngineSpecification.ForSESNetworkShare(engine.SESNetworkShare!); + break; + case EngineSpecType.RemoteZfs: + engineSpec = BuildEngineSpecification.ForRemoteZfs(engine.RemoteZfs!); + break; + case EngineSpecType.Version: + engineSpec = BuildEngineSpecification.ForVersionWithPath(engine.Version!, engine.Path!); + break; + case EngineSpecType.Path: + engineSpec = BuildEngineSpecification.ForAbsolutePath(engine.Path!); + break; + case EngineSpecType.GitCommit: + engineSpec = BuildEngineSpecification.ForGitCommitWithZips( + engine.GitUrl!, + engine.GitCommit!, + engine.ZipLayers, + isEngineBuild: false); + break; + default: + throw new NotSupportedException($"The EngineSpecType {engine.Type} is not supported by the 'cmake-uba-server' command."); + } + + await using ((await _engineWorkspaceProvider.GetEngineWorkspace( + engineSpec, + string.Empty, + context.GetCancellationToken()).ConfigureAwait(false)) + .AsAsyncDisposable(out var engineWorkspace) + .ConfigureAwait(false)) + { + // Get the session ID / pipe name. + var sessionId = Environment.GetEnvironmentVariable("CMAKE_UBA_SESSION_ID"); + if (string.IsNullOrWhiteSpace(sessionId)) + { + _logger.LogError($"Expected CMAKE_UBA_SESSION_ID environment variable to be set."); + return 1; + } + var pipeName = $"cmake-uba-{sessionId}"; + + var ubaDirectory = Path.Combine(engineWorkspace.Path, @"Engine\Binaries\Win64\UnrealBuildAccelerator"); + UbaNative.Init(ubaDirectory); + + // Track the timestamp that the server should automatically shut down. This gets moved + // forward into the future when we have work in the queue. + var shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); + + // Create the UBA server. + _logger.LogInformation("CMake UBA server is starting up..."); + var ubaRoot = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData)!, "Redpoint", "CMakeUBA"); + await using (_ubaServerFactory + .CreateServer( + ubaRoot, + Path.Combine(ubaRoot, "UbaTrace.log")) + .AsAsyncDisposable(out var ubaServer) + .ConfigureAwait(false)) + { + _ubaServer = ubaServer; + _commandCancellationToken = context.GetCancellationToken(); + + // Create the gRPC server. + await using (_grpcPipeFactory + .CreateServer( + pipeName, + GrpcPipeNamespace.User, + this) + .AsAsyncDisposable(out var grpcServer) + .ConfigureAwait(false)) + { + // Start the gRPC server. + await grpcServer.StartAsync().ConfigureAwait(false); + _logger.LogInformation("Waiting for incoming requests over gRPC..."); + + // Start the Kubernetes coordinator. + using var coordinator = new UbaCoordinatorKubernetes( + Path.Combine(ubaDirectory, RuntimeInformation.ProcessArchitecture.ToString().ToLowerInvariant()), + _logger, + UbaCoordinatorKubernetesConfig.ReadFromBuildConfigurationXml(_xmlConfigHelper)); + await coordinator.InitAsync(ubaServer).ConfigureAwait(false); + try + { + coordinator.Start(); + + // Every second, evaluate how many processes are in queue / executing locally / executing remotely, + // and use this information to provision agents on Kubernetes as needed. + try + { + while (true) + { + _logger.LogDebug($"Pending: {ubaServer.ProcessesPendingInQueue} Executing Locally: {ubaServer.ProcessesExecutingLocally} Executing Remotely: {ubaServer.ProcessesExecutingRemotely}"); + + if (ubaServer.ProcessesPendingInQueue > 0 || ubaServer.ProcessesExecutingLocally > 0 || ubaServer.ProcessesExecutingRemotely > 0) + { + shutdownTime = DateTimeOffset.UtcNow.AddSeconds(60); + } + if (shutdownTime < DateTimeOffset.UtcNow) + { + _logger.LogInformation("CMake UBA server is shutting down because there hasn't been any requests recently..."); + return 0; + } + + await Task.Delay(1000, context.GetCancellationToken()).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + } + + // Stop the gRPC server. + _logger.LogInformation("CMake UBA server is shutting down..."); + await grpcServer.StopAsync().ConfigureAwait(false); + } + finally + { + await coordinator.CloseAsync().ConfigureAwait(false); + } + } + } + } + + return 0; + } + + public override async Task ExecuteProcess(ProcessRequest request, IServerStreamWriter responseStream, ServerCallContext context) + { + if (!_commandCancellationToken.HasValue || + _ubaServer == null) + { + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + StandardOutputLine = "(The CMake UBA server is not in a valid state!)", + }).ConfigureAwait(false); + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + ExitCode = 99999, + }).ConfigureAwait(false); + return; + } + + var specification = new UbaProcessSpecification + { + FilePath = request.Path, + Arguments = request.Arguments.Select(x => new LogicalProcessArgument(x.LogicalValue)).ToArray(), + WorkingDirectory = request.WorkingDirectory, + PreferRemote = request.PreferRemote, + AllowRemote = string.Equals(Path.GetFileName(request.Path), "clang-cl.exe", StringComparison.OrdinalIgnoreCase), + }; + + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_commandCancellationToken.Value, context.CancellationToken); + + await foreach (var response in _ubaServer.ExecuteAsync(specification, context.CancellationToken)) + { + switch (response) + { + case StandardOutputResponse stdout: + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + StandardOutputLine = stdout.Data, + }).ConfigureAwait(false); + break; + case StandardErrorResponse stderr: + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + StandardOutputLine = stderr.Data, + }).ConfigureAwait(false); + break; + case ExitCodeResponse exitCode: + await responseStream.WriteAsync(new CMakeUba.ProcessResponse + { + ExitCode = exitCode.ExitCode, + }).ConfigureAwait(false); + // @note: This is the last message; we're done. + return; + } + } + } + + public override Task PingServer(EmptyMessage request, ServerCallContext context) + { + return Task.FromResult(new EmptyMessage()); + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesConstants.cs b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesConstants.cs new file mode 100644 index 00000000..df5e49f0 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesConstants.cs @@ -0,0 +1,7 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + internal static class KubernetesConstants + { + public const ulong MemoryBytesPerCore = 1610612736uL; + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeState.cs b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeState.cs new file mode 100644 index 00000000..90d33f27 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeState.cs @@ -0,0 +1,106 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using k8s.Models; + using System; + using System.Collections.Generic; + using System.Linq; + + internal class KubernetesNodeState + { + public string? NodeId; + public V1Node? KubernetesNode; + public List KubernetesPods = new List(); + public readonly List AllocatedBlocks = new List(); + + public ulong MemoryTotal + { + get + { + return KubernetesNode!.Status.Capacity["memory"].ToUInt64(); + } + } + + public ulong MemoryNonUba + { + get + { + return KubernetesPods + .Where(x => x.GetLabel("uba") != "true") + .SelectMany(x => x.Spec.Containers) + .Select(x => x?.Resources?.Requests != null && x.Resources.Requests.TryGetValue("memory", out var quantity) ? quantity.ToUInt64() : 0) + .Aggregate((a, b) => a + b); + } + } + + public ulong MemoryAllocated + { + get + { + return AllocatedBlocks + .Select(x => (ulong)x.AllocatedCores * KubernetesConstants.MemoryBytesPerCore) + .DefaultIfEmpty((ulong)0) + .Aggregate((a, b) => a + b); + } + } + + public ulong MemoryAvailable + { + get + { + var memory = MemoryTotal; + memory -= MemoryNonUba; + memory -= MemoryAllocated; + return Math.Max(0, memory); + } + } + + public double CoresTotal + { + get + { + return Math.Floor(KubernetesNode!.Status.Capacity["cpu"].ToDouble()); + } + } + + public double CoresNonUba + { + get + { + return KubernetesPods + .Where(x => x.GetLabel("uba") != "true") + .SelectMany(x => x.Spec.Containers) + .Select(x => x?.Resources?.Requests != null && x.Resources.Requests.TryGetValue("cpu", out var quantity) ? quantity.ToDouble() : 0) + .Sum(); + } + } + + public double CoresAllocated + { + get + { + return AllocatedBlocks.Sum(x => x.AllocatedCores); + } + } + + public int CoresAvailable + { + get + { + var cores = CoresTotal; + cores -= CoresNonUba; + cores -= CoresAllocated; + return (int)Math.Max(0, Math.Floor(cores)); + } + } + + public int CoresAllocatable + { + get + { + var realCoresAvailable = CoresAvailable; + var memoryConstrainedCoresAvailable = MemoryAvailable / KubernetesConstants.MemoryBytesPerCore; + return Math.Min(realCoresAvailable, (int)memoryConstrainedCoresAvailable); + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeWorker.cs b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeWorker.cs new file mode 100644 index 00000000..197621b8 --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/KubernetesNodeWorker.cs @@ -0,0 +1,13 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using k8s.Models; + + internal class KubernetesNodeWorker + { + public V1Pod? KubernetesPod; + public V1Service? KubernetesService; + public int AllocatedCores; + public string? UbaHost; + public int? UbaPort; + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs new file mode 100644 index 00000000..9109785e --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetes.cs @@ -0,0 +1,725 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using k8s; + using k8s.Autorest; + using k8s.Models; + using System.Net.Sockets; + using System.Net; + using System.Runtime.CompilerServices; + using Microsoft.Extensions.Logging; + using Redpoint.Uba; + using Redpoint.Hashing; + using System.Globalization; + using UET.Commands.Config; + + internal class UbaCoordinatorKubernetes : IDisposable + { + private readonly string _ubaRootPath; + private readonly ILogger _logger; + private readonly UbaCoordinatorKubernetesConfig _ubaKubeConfig; + private CancellationTokenSource? _cancellationSource; + private string? _id; + private string? _ubaAgentRemotePath; + private string? _ubaAgentHash; + private Kubernetes? _client; + private Timer? _timer; + private const int _timerPeriod = 1000; + private Dictionary _kubernetesNodes; + private IUbaServer? _ubaServer; + + private string KubernetesNamespace + { + get + { + return _ubaKubeConfig?.Namespace ?? "default"; + } + } + + public UbaCoordinatorKubernetes( + string ubaRootPath, + ILogger logger, + UbaCoordinatorKubernetesConfig ubaKubeConfig) + { + _ubaRootPath = ubaRootPath; + _logger = logger; + _ubaKubeConfig = ubaKubeConfig; + _cancellationSource = new CancellationTokenSource(); + _kubernetesNodes = new Dictionary(); + } + + public async Task InitAsync(IUbaServer ubaServer) + { + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.Namespace)) + { + // If this is not set, then the developer probably wants to use Horde instead. + return; + } + + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.Context)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> Context in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbServer)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbServer in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbShare)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbShare in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbUsername)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbUsername in BuildConfiguration.xml."); + return; + } + if (string.IsNullOrWhiteSpace(_ubaKubeConfig.SmbPassword)) + { + _logger.LogWarning("Kubernetes UBA: Missing Kubernetes -> SmbPassword in BuildConfiguration.xml."); + return; + } + + _logger.LogDebug("Kubernetes UBA: InitAsync"); + + try + { + // Set the cancellation source for cancelling the pod. + _cancellationSource?.Cancel(); + _cancellationSource?.Dispose(); + _cancellationSource = new CancellationTokenSource(); + + // Clear out current nodes state. + _kubernetesNodes.Clear(); + + // Generate an ID to identify the jobs we're running. + _id = Guid.NewGuid().ToString(); + + // Copy the UbaAgent file to the UBA root directory. + var ubaFile = new FileInfo(Path.Combine(_ubaRootPath, "UbaAgent.exe")); + var agentHash = (await Hash.XxHash64OfFileAsync(ubaFile.FullName, CancellationToken.None).ConfigureAwait(false)).Hash.ToString(CultureInfo.InvariantCulture); + var ubaRemoteDir = new DirectoryInfo(Path.Combine(@$"\\{_ubaKubeConfig.SmbServer}\{_ubaKubeConfig.SmbShare}\Uba", $"cmake{agentHash}"))!; + var ubaRemoteFile = new FileInfo(Path.Combine(ubaRemoteDir.FullName, "UbaAgent.exe")); + try + { + Directory.CreateDirectory(ubaRemoteDir.FullName); + File.Copy(ubaFile.FullName, ubaRemoteFile.FullName); + } + catch + { + // File already copied. + } + _ubaAgentRemotePath = ubaRemoteFile.FullName; + _ubaAgentHash = agentHash.ToString(); + + // Set up Kubernetes client and ensure we can connect to the cluster. + var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(currentContext: _ubaKubeConfig.Context); + _client = new Kubernetes(config); + await _client.ListNamespacedPodAsync(KubernetesNamespace, cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + + // Track the executor so we can add clients to it. + _ubaServer = ubaServer; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Kubernetes UBA: InitAsync failed: {ex.Message}"); + } + } + + private async Task DeleteServiceAndPodAsync(V1Pod pod, CancellationToken? cancellationToken) + { + try + { + await _client.DeleteNamespacedServiceAsync( + pod.Name(), + pod.Namespace(), + cancellationToken: cancellationToken ?? _cancellationSource?.Token ?? CancellationToken.None) + .ConfigureAwait(false); + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.NotFound) + { + } + try + { + await _client.DeleteNamespacedPodAsync( + pod.Name(), + pod.Namespace(), + cancellationToken: cancellationToken ?? _cancellationSource?.Token ?? CancellationToken.None) + .ConfigureAwait(false); + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.NotFound) + { + } + if (_kubernetesNodes.TryGetValue(pod.Spec.NodeName, out var node)) + { + node.AllocatedBlocks.RemoveAll(x => x.KubernetesPod.Name() == pod.Name()); + } + } + + private async IAsyncEnumerable EnumerateNodePodAsync(string nodeName, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListPodForAllNamespacesAsync( + fieldSelector: $"spec.nodeName={nodeName}", + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + private async IAsyncEnumerable EnumerateNamespacedPodAsync(string labelSelector, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListNamespacedPodAsync( + KubernetesNamespace, + labelSelector: labelSelector, + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + private async IAsyncEnumerable EnumerateNamespacedServiceAsync(string labelSelector, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListNamespacedServiceAsync( + KubernetesNamespace, + labelSelector: labelSelector, + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + private async IAsyncEnumerable EnumerateNodesAsync(string labelSelector, [EnumeratorCancellation] CancellationToken cancellationToken) + { + string? continueParameter = null; + do + { + cancellationToken.ThrowIfCancellationRequested(); + var list = await _client.ListNodeAsync( + labelSelector: labelSelector, + continueParameter: continueParameter, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + foreach (var item in list.Items) + { + if (item.Spec.Taints != null && ( + item.Spec.Taints.Any(x => x.Effect == "NoSchedule") || + item.Spec.Taints.Any(x => x.Effect == "NoExecute"))) + { + continue; + } + yield return item; + } + continueParameter = list.Metadata.ContinueProperty; + } while (!string.IsNullOrWhiteSpace(continueParameter)); + } + + [System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "This call is not used for cryptography.")] + public void Start() + { + _logger.LogDebug("Kubernetes UBA: Start"); + + _timer = new Timer(async _ => + { + _timer?.Change(Timeout.Infinite, Timeout.Infinite); + + var stopping = false; + try + { + // If we're cancelled, stop. + if (_cancellationSource == null || _cancellationSource.IsCancellationRequested) + { + _logger.LogDebug("Kubernetes UBA: Loop: Cancelled"); + stopping = true; + return; + } + + // If the client is unavailable, keep looping unless we're done. + if (_client == null) + { + _logger.LogInformation("Kubernetes UBA: Not configured to use Kubernetes."); + return; + } + + // Remove any Kubernetes pods that are complete, or failed to start. + await foreach (var pod in EnumerateNamespacedPodAsync("uba=true", _cancellationSource.Token)) + { + if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") + { + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()} (cleanup)"); + await DeleteServiceAndPodAsync(pod, _cancellationSource.Token).ConfigureAwait(false); + + foreach (var kv in _kubernetesNodes) + { + kv.Value.AllocatedBlocks.RemoveAll(candidatePod => + candidatePod.KubernetesPod != null && + candidatePod.KubernetesPod.Name() == pod.Name()); + } + } + } + + // Synchronise Kubernetes nodes with our known node list. + var knownNodeNames = new HashSet(); + await foreach (var node in EnumerateNodesAsync("kubernetes.io/os=windows", _cancellationSource.Token)) + { + knownNodeNames.Add(node.Name()); + if (_kubernetesNodes.TryGetValue(node.Name(), out var existingEntry)) + { + existingEntry.KubernetesNode = node; + } + else + { + existingEntry = new KubernetesNodeState + { + NodeId = node.Name(), + KubernetesNode = node, + }; + _kubernetesNodes.Add(node.Name(), existingEntry); + } + var newPodsList = new List(); + await foreach (var pod in EnumerateNodePodAsync(node.Name(), _cancellationSource.Token)) + { + newPodsList.Add(pod); + } + existingEntry.KubernetesPods = newPodsList; + } + + // Determine the threshold over local allocation. + double desiredCpusThreshold = 0; + + // Allocate cores from Kubernetes until we're satisified. + while (true) + { + // Check how many additional cores we need to allocate from the cluster. + double desiredCpus = _ubaServer!.ProcessesPendingInQueue; + desiredCpus -= _kubernetesNodes.SelectMany(x => x.Value.AllocatedBlocks).Sum(y => y.AllocatedCores); + if (desiredCpus <= desiredCpusThreshold) + { + _logger.LogDebug($"Kubernetes UBA: Loop: Skipping (desired CPU {desiredCpus} <= {desiredCpusThreshold})"); + break; + } + + // Remove any Kubernetes pods that are complete, or failed to start. + await foreach (var pod in EnumerateNamespacedPodAsync($"uba=true,uba.queueId={_id}", _cancellationSource.Token)) + { + if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") + { + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()}"); + await DeleteServiceAndPodAsync(pod, _cancellationSource.Token).ConfigureAwait(false); + } + } + + // Compute the biggest size we can allocate. + var blockSize = _kubernetesNodes + .Select(x => x.Value.CoresAllocatable) + .Where(x => x != 0) + .DefaultIfEmpty(0) + .Max(); + blockSize = Math.Min(blockSize, (int)Math.Floor(desiredCpus)); + if (blockSize == 0) + { + // No more cores to allocate from. + break; + } + + // Pick an available node, weighted by the available cores. + var selectedNode = _kubernetesNodes + .Select(x => x.Value) + .SelectMany(x => + { + var blocks = new List(); + _logger.LogDebug($"Kubernetes UBA: Loop: Node '{x.NodeId}' has\nCPU: {x.CoresTotal} total, {x.CoresNonUba} non-UBA, {x.CoresAllocated} allocated, {x.CoresAvailable} available, {x.CoresAllocatable} allocatable.\nMemory: {x.MemoryTotal} total, {x.MemoryNonUba} non-UBA, {x.MemoryAllocated} allocated, {x.MemoryAvailable} available.\nBlocks: {x.AllocatedBlocks.Count} allocated."); + for (var c = 0; c < x.CoresAllocatable; c += blockSize) + { + if (c + blockSize <= x.CoresAllocatable) + { + blocks.Add(x); + } + } + return blocks; + }) + .OrderBy(_ => Random.Shared.NextInt64()) + .FirstOrDefault(); + + // If we don't have any available node, break out of the loop node. + if (selectedNode == null) + { + break; + } + + // Generate the next block ID for this node. + int highestBlockId = 0; + await foreach (var pod in EnumerateNamespacedPodAsync($"uba.nodeId={selectedNode.NodeId}", _cancellationSource.Token)) + { + var thisBlockId = int.Parse(pod.GetLabel("uba.blockId"), CultureInfo.InvariantCulture); + highestBlockId = Math.Max(highestBlockId, thisBlockId); + } + var nextBlockId = highestBlockId + 1; + + // Create the pod and service. + var name = $"uba-{selectedNode.NodeId}-{nextBlockId}"; + _logger.LogDebug($"Allocating Kubernetes block: {name}"); + var labels = new Dictionary + { + { "uba", "true" }, + { "uba.nodeId", selectedNode.NodeId! }, + { "uba.blockId", nextBlockId.ToString(CultureInfo.InvariantCulture) }, + { "uba.queueId", _id! }, + }; + var kubernetesPod = await _client.CreateNamespacedPodAsync( + new V1Pod + { + Metadata = new V1ObjectMeta + { + Name = name, + Labels = labels, + }, + Spec = new V1PodSpec + { + AutomountServiceAccountToken = false, + NodeSelector = new Dictionary + { + { "kubernetes.io/os", "windows" }, + }, + RestartPolicy = "Never", + TerminationGracePeriodSeconds = 0, + Volumes = new List + { + new V1Volume + { + Name = "uba-storage", + HostPath = new V1HostPathVolumeSource + { + Path = @$"C:\Uba\{name}", + } + } + }, + Containers = new List + { + new V1Container + { + Image = "mcr.microsoft.com/powershell:lts-windowsservercore-ltsc2022", + ImagePullPolicy = "IfNotPresent", + Name = "uba-agent", + Resources = new V1ResourceRequirements + { + Requests = new Dictionary + { + { "cpu", new ResourceQuantity(blockSize.ToString(CultureInfo.InvariantCulture)) }, + // @note: We don't set 'memory' here because it can be finicky to get the container to get allocated. + }, + Limits = new Dictionary + { + { "cpu", new ResourceQuantity(blockSize.ToString(CultureInfo.InvariantCulture)) }, + // @note: We don't set 'memory' here because it can be finicky to get the container to get allocated. + }, + }, + SecurityContext = new V1SecurityContext + { + WindowsOptions = new V1WindowsSecurityContextOptions + { + RunAsUserName = "ContainerAdministrator", + } + }, + Command = new List + { + @"C:\Program Files\PowerShell\latest\pwsh.exe", + }, + Args = new List + { + "-Command", + $@"Start-Sleep -Seconds 1; Write-Host ""Mapping network drive...""; C:\Windows\system32\net.exe use Z: \\{_ubaKubeConfig.SmbServer}\{_ubaKubeConfig.SmbShare}\Uba /USER:{_ubaKubeConfig.SmbUsername} {_ubaKubeConfig.SmbPassword}; Write-Host ""Copying UBA agent...""; Copy-Item Z:\cmake{_ubaAgentHash}\UbaAgent.exe C:\UbaAgent.exe; Write-Host ""Running UBA agent...""; C:\UbaAgent.exe -Verbose -Listen=7000 -NoPoll -listenTimeout=120 -ProxyPort=7001 -Dir=C:\UbaData -MaxIdle=15 -MaxCpu={blockSize}; Write-Host ""UBA agent exited with exit code: $LastExitCode""; exit $LastExitCode;", + }, + Ports = new List + { + new V1ContainerPort + { + ContainerPort = 7000, + Protocol = "TCP", + }, + new V1ContainerPort + { + ContainerPort = 7001, + Protocol = "TCP", + } + }, + VolumeMounts = new List + { + new V1VolumeMount + { + Name = "uba-storage", + MountPath = @"C:\UbaData", + } + } + } + } + } + }, + KubernetesNamespace, + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + V1Service kubernetesService; + createService: + try + { + kubernetesService = await _client.CreateNamespacedServiceAsync( + new V1Service + { + Metadata = new V1ObjectMeta + { + Name = name, + Labels = labels, + }, + Spec = new V1ServiceSpec + { + Selector = labels, + Type = "NodePort", + Ports = new List + { + new V1ServicePort + { + Name = "uba", + Port = 7000, + TargetPort = new IntstrIntOrString("7000"), + Protocol = "TCP", + }, + new V1ServicePort + { + Name = "uba-proxy", + Port = 7001, + TargetPort = new IntstrIntOrString("7001"), + Protocol = "TCP", + }, + }, + }, + }, + KubernetesNamespace, + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.Conflict) + { + await _client.DeleteNamespacedServiceAsync(name, KubernetesNamespace, cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + goto createService; + } + + // Track the worker. + var worker = new KubernetesNodeWorker + { + KubernetesPod = kubernetesPod, + KubernetesService = kubernetesService, + AllocatedCores = blockSize, + }; + selectedNode.AllocatedBlocks.Add(worker); + + // In the background, wait for the worker to become ready and allocate it to UBA. + _ = Task.Run(async () => + { + var didRegister = false; + try + { + // Wait for the service to have node ports. + while (worker.UbaHost == null || worker.UbaPort == null) + { + _cancellationSource.Token.ThrowIfCancellationRequested(); + + // Refresh service status. + worker.KubernetesService = await _client.ReadNamespacedServiceAsync( + worker.KubernetesService.Name(), + worker.KubernetesService.Namespace(), + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + + // If a port doesn't have NodePort, it's not allocated yet. + if (worker.KubernetesService.Spec.Ports.Any(x => x.NodePort == null)) + { + await Task.Delay(1000).ConfigureAwait(false); + continue; + } + + // We should have the node port now. + worker.UbaHost = selectedNode.KubernetesNode!.Status.Addresses + .Where(x => x.Type == "InternalIP") + .Select(x => x.Address) + .First(); + worker.UbaPort = worker.KubernetesService.Spec.Ports.First(x => x.Name == "uba").NodePort!.Value; + break; + } + + // Wait for the pod to start. + var secondsElapsed = 0; + while (worker.KubernetesPod.Status.Phase == "Pending" && secondsElapsed < 30) + { + await Task.Delay(1000).ConfigureAwait(false); + worker.KubernetesPod = await _client.ReadNamespacedPodAsync( + worker.KubernetesPod.Name(), + worker.KubernetesPod.Namespace(), + cancellationToken: _cancellationSource.Token).ConfigureAwait(false); + secondsElapsed++; + } + if (worker.KubernetesPod.Status.Phase == "Pending") + { + // Timed out. + _logger.LogWarning($"Kubernetes timed out while allocating: {name}"); + return; + } + + // Add the worker to UBA. + _logger.LogInformation($"Kubernetes block is ready: {name} ({worker.UbaHost}:{worker.UbaPort.Value})"); + var didAddAgent = false; + for (int i = 0; i < 30; i++) + { + if (_ubaServer.AddRemoteAgent(worker.UbaHost, worker.UbaPort.Value)) + { + didAddAgent = true; + break; + } + await Task.Delay(1000).ConfigureAwait(false); + } + if (!didAddAgent) + { + _logger.LogError("Unable to register Kubernetes UBA agent with UBA library!"); + } + else + { + didRegister = true; + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"Exception in Kubernetes wait loop: {ex}"); + } + finally + { + if (!didRegister) + { + // This pod/service could not be allocated. The main loop can then try to allocate again. + await DeleteServiceAndPodAsync(worker.KubernetesPod, _cancellationSource.Token).ConfigureAwait(false); + } + } + }); + } + } + catch (OperationCanceledException ex) when (_cancellationSource != null && ex.CancellationToken == _cancellationSource.Token) + { + // Expected exception. + stopping = true; + } + catch (TaskCanceledException) + { + // Expected exception. + stopping = true; + } + catch (SocketException) + { + _logger.LogWarning("Unable to reach Kubernetes cluster."); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Exception in Kubernetes loop: {ex}"); + } + finally + { + if (!stopping) + { + _timer?.Change(_timerPeriod, Timeout.Infinite); + } + } + + }, null, 0, _timerPeriod); + } + + public void Stop() + { + _cancellationSource?.Cancel(); + } + + public async Task CloseAsync() + { + _cancellationSource?.Cancel(); + + if (_client != null) + { + try + { + // Remove any Kubernetes pods that are complete. + await foreach (var pod in EnumerateNamespacedPodAsync("uba=true", CancellationToken.None)) + { + if (pod.Status.Phase == "Succeeded" || pod.Status.Phase == "Failed" || pod.Status.Phase == "Unknown") + { + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()} (cleanup on close)"); + await DeleteServiceAndPodAsync(pod, CancellationToken.None).ConfigureAwait(false); + } + } + + // Remove any Kubernetes pods owned by us. + if (!string.IsNullOrWhiteSpace(_id)) + { + await foreach (var pod in EnumerateNamespacedPodAsync($"uba.queueId={_id}", CancellationToken.None)) + { + _logger.LogDebug($"Removing Kubernetes block: {pod.Name()} (unconditional)"); + await DeleteServiceAndPodAsync(pod, CancellationToken.None).ConfigureAwait(false); + } + } + } + catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.Forbidden) + { + // Ignore this transient error on shutdown. + } + catch (SocketException) + { + // Ignore this transient error on shutdown. + } + catch (HttpRequestException) + { + // Ignore this transient error on shutdown. + } + } + } + + public void Dispose() + { + Stop(); + CloseAsync().Wait(); + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _cancellationSource?.Dispose(); + _cancellationSource = null; + _timer?.Dispose(); + _timer = null; + _client?.Dispose(); + _client = null; + } + } + } +} diff --git a/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetesConfig.cs b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetesConfig.cs new file mode 100644 index 00000000..eea911fb --- /dev/null +++ b/UET/uet/Commands/Internal/CMakeUbaServer/UbaCoordinatorKubernetesConfig.cs @@ -0,0 +1,30 @@ +namespace UET.Commands.Internal.CMakeUbaServer +{ + using System.Xml; + using UET.Commands.Config; + + internal class UbaCoordinatorKubernetesConfig + { + public string? Namespace { get; set; } + public string? Context { get; set; } + public string? SmbServer { get; set; } + public string? SmbShare { get; set; } + public string? SmbUsername { get; set; } + public string? SmbPassword { get; set; } + + public static UbaCoordinatorKubernetesConfig ReadFromBuildConfigurationXml(IXmlConfigHelper xmlConfigHelper) + { + var document = new XmlDocument(); + document.Load(Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), "Unreal Engine", "UnrealBuildTool", "BuildConfiguration.xml")); + + var config = new UbaCoordinatorKubernetesConfig(); + config.Namespace = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "Namespace"]); + config.Context = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "Context"]); + config.SmbServer = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbServer"]); + config.SmbShare = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbShare"]); + config.SmbUsername = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbUsername"]); + config.SmbPassword = xmlConfigHelper.GetValue(document, ["Configuration", "Kubernetes", "SmbPassword"]); + return config; + } + } +} diff --git a/UET/uet/Commands/Internal/InternalCommand.cs b/UET/uet/Commands/Internal/InternalCommand.cs index 82f60c0b..67b14b43 100644 --- a/UET/uet/Commands/Internal/InternalCommand.cs +++ b/UET/uet/Commands/Internal/InternalCommand.cs @@ -30,6 +30,8 @@ using UET.Commands.Internal.TestUba; using UET.Commands.Internal.EngineCheckout; using UET.Commands.Internal.InstallPackage; + using UET.Commands.Internal.CMakeUbaServer; + using UET.Commands.Internal.CMakeUbaRun; internal sealed class InternalCommand { @@ -67,6 +69,8 @@ public static Command CreateInternalCommand(HashSet globalCommands) RemoteZfsTestCommand.CreateRemoteZfsTestCommand(), EngineCheckoutCommand.CreateEngineCheckoutCommand(), InstallPackageCommand.CreateInstallPackageCommand(), + CMakeUbaServerCommand.CreateCMakeUbaServerCommand(), + CMakeUbaRunCommand.CreateCMakeUbaRunCommand(), }; var command = new Command("internal", "Internal commands used by UET when it needs to call back into itself."); diff --git a/UET/uet/Commands/ParameterSpec/EngineSpec.cs b/UET/uet/Commands/ParameterSpec/EngineSpec.cs index 831a34e6..2ef03cbe 100644 --- a/UET/uet/Commands/ParameterSpec/EngineSpec.cs +++ b/UET/uet/Commands/ParameterSpec/EngineSpec.cs @@ -37,8 +37,16 @@ public static ParseArgument ParseEngineSpec( distributionOpt); } + public static ParseArgument ParseEngineSpecContextless() + { + return ParseEngineSpec( + null, + string.Empty, + null); + } + private static ParseArgument ParseEngineSpec( - Func getPathSpec, + Func? getPathSpec, string pathSpecOptionName, Option? distributionOpt) { @@ -69,13 +77,16 @@ private static ParseArgument ParseEngineSpec( // can figure out the target engine from the project file. PathSpec? path = null; DistributionSpec? distribution = null; - try - { - path = getPathSpec(result); - } - catch (InvalidOperationException) + if (getPathSpec != null) { - return null!; + try + { + path = getPathSpec(result); + } + catch (InvalidOperationException) + { + return null!; + } } if (distributionOpt != null) { @@ -90,7 +101,14 @@ private static ParseArgument ParseEngineSpec( } if (path == null) { - result.ErrorMessage = $"Can't automatically detect the appropriate engine because the --{pathSpecOptionName} option was invalid."; + if (getPathSpec == null) + { + result.ErrorMessage = $"You must explicitly set the engine version to use with --{result.Argument.Name}."; + } + else + { + result.ErrorMessage = $"Can't automatically detect the appropriate engine because the --{pathSpecOptionName} option was invalid."; + } return null!; } switch (path.Type) diff --git a/UET/uet/Program.cs b/UET/uet/Program.cs index 8b5853f2..994103c0 100644 --- a/UET/uet/Program.cs +++ b/UET/uet/Program.cs @@ -11,6 +11,7 @@ using System.Text.RegularExpressions; using UET.Commands.AppleCert; using UET.Commands.Build; +using UET.Commands.CMake; using UET.Commands.Config; using UET.Commands.Format; using UET.Commands.Generate; @@ -46,8 +47,20 @@ rootCommand.AddCommand(UefsCommand.CreateUefsCommand()); rootCommand.AddCommand(TransferCommand.CreateTransferCommand()); rootCommand.AddCommand(AppleCertCommand.CreateAppleCertCommand()); +rootCommand.AddCommand(CMakeCommand.CreateCMakeCommand()); rootCommand.AddCommand(InternalCommand.CreateInternalCommand(globalCommands)); +// If we have an implicit command variable, this is an internal command where we can't specify arguments directly. +var implicitCommand = Environment.GetEnvironmentVariable("UET_IMPLICIT_COMMAND"); +if (!string.IsNullOrWhiteSpace(implicitCommand)) +{ + // Clear it for any downstream processes we might start. + Environment.SetEnvironmentVariable("UET_IMPLICIT_COMMAND", null); + + // Prepend to args. + args = new[] { "internal", implicitCommand }.Concat(args).ToArray(); +} + // Parse the command line so we can inspect it. var parseResult = rootCommand.Parse(args); var isGlobalCommand = globalCommands.Contains(parseResult.CommandResult.Command); diff --git a/UET/uet/uet.csproj b/UET/uet/uet.csproj index ff9973bc..6af97b47 100644 --- a/UET/uet/uet.csproj +++ b/UET/uet/uet.csproj @@ -23,10 +23,13 @@ + + + @@ -34,6 +37,10 @@ runtime; build; native; contentfiles; analyzers; buildtransitive + + + + @@ -99,6 +106,7 @@ + Both