Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'uet cmake' command to distribute CMake-based builds over UBA #34

Merged
merged 7 commits into from
Dec 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add cmake-uba-run and cmake-uba-server commands for doing CMake work …
…on UBA
hach-que committed Dec 24, 2024
commit 3450b8393fb8b1085ce047f46641b33421ddf41b
59 changes: 48 additions & 11 deletions UET/Redpoint.Uba/DefaultUbaServer.cs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ internal partial class DefaultUbaServer : IUbaServer
private readonly ILogger<DefaultUbaServer> _logger;
private readonly IProcessArgumentParser _processArgumentParser;
private readonly IProcessExecutor _localProcessExecutor;
private readonly UbaLoggerForwarder _ubaLoggerForwarder;
private readonly nint _ubaLogger;
private readonly CancellationTokenSource _localWorkerCancellationTokenSource;
private nint _server;
private nint _storageServer;
@@ -27,6 +27,11 @@ internal partial class DefaultUbaServer : IUbaServer
private readonly ConcurrentDictionary<ulong, bool> _returnedProcesses;
private readonly TerminableAwaitableConcurrentQueue<UbaProcessDescriptor> _processQueue;
private readonly Task[] _localWorkerTasks;
private readonly SessionServer_RemoteProcessAvailableCallback _onRemoteProcessAvailableDelegate;
private readonly SessionServer_RemoteProcessReturnedCallback _onRemoteProcessReturnedDelegate;
private long _processesPendingInQueue;
private long _processesExecutingLocally;
private long _processesExecutingRemotely;

#region Library Imports

@@ -47,7 +52,7 @@ private static partial bool Server_AddClient(
nint server,
string ip,
int port,
string crypto);
nint crypto);

[LibraryImport("UbaHost", StringMarshalling = StringMarshalling.Custom, StringMarshallingCustomType = typeof(UbaStringMarshaller))]
[DefaultDllImportSearchPaths(DllImportSearchPath.SafeDirectories)]
@@ -169,15 +174,15 @@ public DefaultUbaServer(
ILogger<DefaultUbaServer> logger,
IProcessArgumentParser processArgumentParser,
IProcessExecutor localProcessExecutor,
UbaLoggerForwarder ubaLoggerForwarder,
nint ubaLogger,
nint server,
string rootStorageDirectoryPath,
string ubaTraceFilePath)
{
_logger = logger;
_processArgumentParser = processArgumentParser;
_localProcessExecutor = localProcessExecutor;
_ubaLoggerForwarder = ubaLoggerForwarder;
_ubaLogger = ubaLogger;
_localWorkerCancellationTokenSource = new CancellationTokenSource();

_server = server;
@@ -187,7 +192,7 @@ public DefaultUbaServer(
rootStorageDirectoryPath,
40uL * 1000 * 1000 * 1000,
false,
_ubaLoggerForwarder.Logger,
ubaLogger,
string.Empty);
if (_storageServer == nint.Zero)
{
@@ -198,7 +203,7 @@ public DefaultUbaServer(
_sessionServerCreateInfo = CreateSessionServerCreateInfo(
_storageServer,
_server,
_ubaLoggerForwarder.Logger,
ubaLogger,
rootStorageDirectoryPath,
ubaTraceFilePath,
disableCustomAllocator: false,
@@ -227,24 +232,43 @@ public DefaultUbaServer(
.Select(x => Task.Run(LocalProcessWorkerLoop))
.ToArray();

SessionServer_SetRemoteProcessAvailable(_sessionServer, OnRemoteProcessAvailable, nint.Zero);
SessionServer_SetRemoteProcessReturned(_sessionServer, OnRemoteProcessReturned, nint.Zero);
_onRemoteProcessAvailableDelegate = OnRemoteProcessAvailable;
_onRemoteProcessReturnedDelegate = OnRemoteProcessReturned;
SessionServer_SetRemoteProcessAvailable(_sessionServer, _onRemoteProcessAvailableDelegate, nint.Zero);
SessionServer_SetRemoteProcessReturned(_sessionServer, _onRemoteProcessReturnedDelegate, nint.Zero);
}

public bool AddRemoteAgent(string ip, int port, string crypto)
public bool AddRemoteAgent(string ip, int port)
{
return Server_AddClient(_server, ip, port, crypto);
return Server_AddClient(_server, ip, port, nint.Zero);
}

public long ProcessesPendingInQueue => Interlocked.Read(ref _processesPendingInQueue);
public long ProcessesExecutingLocally => Interlocked.Read(ref _processesExecutingLocally);
public long ProcessesExecutingRemotely => Interlocked.Read(ref _processesExecutingRemotely);

private void OnRemoteProcessAvailable(nint userData)
{
// Start a background task to queue a remote process (since this function can't be async).
_ = Task.Run(async () =>
{
// Before we attempt to actually dequeue, check that there is something to dequeue, since UBA
// frequently calls OnRemoteProcessAvailable whileever slots are free (and not just when a slot
// opens up).
if (Interlocked.Read(ref _processesPendingInQueue) == 0)
{
return;
}

_logger.LogInformation("Received OnRemoteProcessAvailable, pulling next available process to run...");

// Grab the next process to run.
var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false);
Interlocked.Decrement(ref _processesPendingInQueue);
Interlocked.Increment(ref _processesExecutingRemotely);

// Run the process remotely.
_logger.LogInformation($"Got process to run '{descriptor.ProcessSpecification.FilePath}'...");
var isRequeued = false;
try
{
@@ -363,6 +387,8 @@ private void OnRemoteProcessAvailable(nint userData)
}
processHash = ProcessHandle_GetHash(process);

_logger.LogInformation($"Remote process '{descriptor.ProcessSpecification.FilePath}' is now running...");

// While we wait for the exit gate to open, poll for log lines.
while (!exitedGate.Opened &&
!_returnedProcesses.ContainsKey(processHash) &&
@@ -390,6 +416,8 @@ private void OnRemoteProcessAvailable(nint userData)
if (!exitCode.HasValue /* Returned by remote agent */ ||
(exitCode.HasValue && exitCode == 9006) /* Known retry code when running cmd.exe via remote agent */)
{
_logger.LogInformation($"Remote process '{descriptor.ProcessSpecification.FilePath}' was returned to the queue, scheduling for local execution...");

// Prefer to run this command locally now.
descriptor.PreferRemote = false;

@@ -427,6 +455,11 @@ private void OnRemoteProcessAvailable(nint userData)
{
descriptor.CompletionGate.Open();
}
else
{
Interlocked.Increment(ref _processesPendingInQueue);
}
Interlocked.Decrement(ref _processesExecutingRemotely);
}
});
}
@@ -443,14 +476,16 @@ private async Task LocalProcessWorkerLoop()
// Grab the next process to run.
var descriptor = await _processQueue.DequeueAsync(_localWorkerCancellationTokenSource.Token).ConfigureAwait(false);
if (descriptor.PreferRemote &&
(descriptor.DateQueuedUtc - DateTimeOffset.UtcNow).TotalSeconds < 30)
(DateTimeOffset.UtcNow - descriptor.DateQueuedUtc).TotalSeconds < 30)
{
// If this process prefers remote execution, and it hasn't been sitting in the queue for
// at least 30 seconds, requeue it and try again.
_processQueue.Enqueue(descriptor);
await Task.Delay(200, _localWorkerCancellationTokenSource.Token).ConfigureAwait(false);
continue;
}
Interlocked.Decrement(ref _processesPendingInQueue);
Interlocked.Increment(ref _processesExecutingLocally);

// Run the process locally.
try
@@ -467,6 +502,7 @@ private async Task LocalProcessWorkerLoop()
finally
{
descriptor.CompletionGate.Open();
Interlocked.Decrement(ref _processesExecutingLocally);
}
}
while (true);
@@ -495,6 +531,7 @@ public async Task<int> ExecuteAsync(
CompletionGate = new Gate(),
};
_processQueue.Enqueue(descriptor);
Interlocked.Increment(ref _processesPendingInQueue);

// Wait for the gate to be opened.
await descriptor.CompletionGate.WaitAsync(cancellationToken).ConfigureAwait(false);
8 changes: 5 additions & 3 deletions UET/Redpoint.Uba/DefaultUbaServerFactory.cs
Original file line number Diff line number Diff line change
@@ -46,9 +46,11 @@ public IUbaServer CreateServer(
Directory.CreateDirectory(rootStorageDirectoryPath);
Directory.CreateDirectory(Path.GetDirectoryName(ubaTraceFilePath)!);

var loggingForwarder = new UbaLoggerForwarder(_ubaLoggerForwarderLogger);
// @note: UbaLoggerForwarder must be static and hold a singleton log. Otherwise we can run into a crash if
// DefaultUbaServer is finalized after UbaLoggerForwarder.
var ubaLogger = UbaLoggerForwarder.GetUbaLogger(_ubaLoggerForwarderLogger);
var server = UbaServerDelayedImports.CreateServer(
loggingForwarder.Logger,
ubaLogger,
maxWorkers,
sendSize,
receiveTimeoutSeconds,
@@ -57,7 +59,7 @@ public IUbaServer CreateServer(
_defaultUbaServerLogger,
_processArgumentParser,
_localProcessExecutor,
loggingForwarder,
ubaLogger,
server,
rootStorageDirectoryPath,
ubaTraceFilePath);
20 changes: 17 additions & 3 deletions UET/Redpoint.Uba/IUbaServer.cs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
///
/// We don't support the UBA mode whereby the server listens on a port, and remote agents (clients) initiated a connection back to the server. That's because most developer machines are behind a firewall that remote agents won't be able to initiate a connection through.
///
/// Instead, we only support the model by which the remote agents (clients) listen on a port, and the server actively connects to them using <see cref="AddRemoteAgent(string, int, string)"/>.
/// Instead, we only support the model by which the remote agents (clients) listen on a port, and the server actively connects to them using <see cref="AddRemoteAgent(string, int)"/>.
/// </summary>
public interface IUbaServer : IProcessExecutor, IAsyncDisposable
{
@@ -19,8 +19,22 @@ public interface IUbaServer : IProcessExecutor, IAsyncDisposable
/// </summary>
/// <param name="ip">The IP address of the remote agent.</param>
/// <param name="port">The port of the remote agent.</param>
/// <param name="crypto">If the remote agent has the '-crypto' parameter set, this should be the 32 character hexadecimal value representing the symmetric cryptographic key.</param>
/// <returns>True if the agent was successfully added.</returns>
bool AddRemoteAgent(string ip, int port, string crypto = "");
bool AddRemoteAgent(string ip, int port);

/// <summary>
/// The number of processes currently in the queue to start execution.
/// </summary>
long ProcessesPendingInQueue { get; }

/// <summary>
/// The number of processes executing locally.
/// </summary>
long ProcessesExecutingLocally { get; }

/// <summary>
/// The number of processes executing on remote agents.
/// </summary>
long ProcessesExecutingRemotely { get; }
}
}
54 changes: 16 additions & 38 deletions UET/Redpoint.Uba/UbaLoggerForwarder.cs
Original file line number Diff line number Diff line change
@@ -5,11 +5,10 @@
using System;
using System.Runtime.InteropServices;

internal partial class UbaLoggerForwarder : IDisposable
internal partial class UbaLoggerForwarder
{
private readonly ILogger<UbaLoggerForwarder> _logger;
private readonly nint _ubaLogger;
private bool _hasDisposed;
private static ILogger<UbaLoggerForwarder>? _logger;
private static nint? _ubaLogger;

#region Library Imports

@@ -36,8 +35,13 @@ private static partial void DestroyCallbackLogWriter(

#endregion

public UbaLoggerForwarder(ILogger<UbaLoggerForwarder> logger)
public static nint GetUbaLogger(ILogger<UbaLoggerForwarder> logger)
{
if (_ubaLogger.HasValue)
{
return _ubaLogger.Value;
}

_logger = logger;
_ubaLogger = CreateCallbackLogWriter(
BeginScope,
@@ -47,9 +51,9 @@ public UbaLoggerForwarder(ILogger<UbaLoggerForwarder> logger)
{
throw new InvalidOperationException("Unable to create UBA logger!");
}
}

public nint Logger => _ubaLogger;
return _ubaLogger.Value;
}

private static void BeginScope()
{
@@ -59,7 +63,7 @@ private static void EndScope()
{
}

private void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint prefixLen)
private static void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint prefixLen)
{
string message;
if (OperatingSystem.IsWindows())
@@ -74,46 +78,20 @@ private void Log(byte logEntryType, nint str, uint strLen, nint prefix, uint pre
switch (logEntryType)
{
case 0:
_logger.LogError(message);
_logger!.LogError(message);
break;
case 1:
_logger.LogWarning(message);
_logger!.LogWarning(message);
break;
case 2:
_logger.LogInformation(message);
_logger!.LogInformation(message);
break;
case 3:
case 4:
default:
_logger.LogDebug(message);
_logger!.LogDebug(message);
break;
}
}

protected virtual void Dispose(bool disposing)
{
if (!_hasDisposed)
{
if (_ubaLogger != nint.Zero)
{
DestroyCallbackLogWriter(_ubaLogger);
}

_hasDisposed = true;
}
}

~UbaLoggerForwarder()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: false);
}

public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}
Loading