From e3c1304641afe6588d45c3fee988c55e62418adb Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Thu, 4 Jan 2024 06:22:10 -0700 Subject: [PATCH 1/6] Initial push for adding orchestration reuse ID Signed-off-by: Ryan Lettieri --- eng/proto | 2 +- .../Internal/IOrchestrationSubmitter.cs | 2 ++ src/Client/Core/DurableTaskClient.cs | 21 +++++++++++++------ .../Core/Entities/DurableEntityClient.cs | 6 ++++-- src/Client/Grpc/GrpcDurableEntityClient.cs | 1 + src/Client/Grpc/GrpcDurableTaskClient.cs | 2 ++ .../ShimDurableTaskClient.cs | 1 + .../DefaultDurableTaskClientBuilderTests.cs | 1 + 8 files changed, 27 insertions(+), 9 deletions(-) diff --git a/eng/proto b/eng/proto index 02444613..19f0f696 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit 024446136eecc38c975b95f283138e7a0df149c2 +Subproject commit 19f0f6966186416ad0d2953b2f8e2271d5c93246 diff --git a/src/Abstractions/Internal/IOrchestrationSubmitter.cs b/src/Abstractions/Internal/IOrchestrationSubmitter.cs index 4d97a9a0..611205cb 100644 --- a/src/Abstractions/Internal/IOrchestrationSubmitter.cs +++ b/src/Abstractions/Internal/IOrchestrationSubmitter.cs @@ -27,6 +27,7 @@ public interface IOrchestrationSubmitter /// The optional input to pass to the scheduled orchestration instance. This must be a serializable value. /// /// The options to start the new orchestration with. + /// The policy for reusing an orchestration ID. /// /// The cancellation token. This only cancels enqueueing the new orchestration to the backend. Does not cancel the /// orchestration once enqueued. @@ -40,5 +41,6 @@ Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, + HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default); } diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 510230b0..d979999b 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -51,20 +51,25 @@ protected DurableTaskClient(string name) public virtual DurableEntityClient Entities => throw new NotSupportedException($"{this.GetType()} does not support durable entities."); - /// + /// public virtual Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, CancellationToken cancellation) - => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, null, cancellation); + => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, null, null, cancellation); - /// + /// public virtual Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input, CancellationToken cancellation) - => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, null, cancellation); + => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, null, null, cancellation); - /// + /// public virtual Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, StartOrchestrationOptions options, CancellationToken cancellation = default) - => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, options, cancellation); + => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, options, null, cancellation); + + /// + public virtual Task ScheduleNewOrchestrationInstanceAsync( + TaskName orchestratorName, StartOrchestrationOptions options, HashSet orchestrationIdReusePolicy, CancellationToken cancellation = default) + => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, null, orchestrationIdReusePolicy, cancellation); /// /// Schedules a new orchestration instance for execution. @@ -96,6 +101,9 @@ public virtual Task ScheduleNewOrchestrationInstanceAsync( /// The optional input to pass to the scheduled orchestration instance. This must be a serializable value. /// /// The options to start the new orchestration with. + /// + /// The orchestration reuse policy. This allows for the reuse of an instance ID as well as the options for it. + /// /// /// The cancellation token. This only cancels enqueueing the new orchestration to the backend. Does not cancel the /// orchestration once enqueued. @@ -110,6 +118,7 @@ public abstract Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, + HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default); /// diff --git a/src/Client/Core/Entities/DurableEntityClient.cs b/src/Client/Core/Entities/DurableEntityClient.cs index 46cd9c6d..e5fcef81 100644 --- a/src/Client/Core/Entities/DurableEntityClient.cs +++ b/src/Client/Core/Entities/DurableEntityClient.cs @@ -31,6 +31,7 @@ protected DurableEntityClient(string name) /// The name of the operation. /// The input for the operation. /// The options to signal the entity with. + /// The policy for reusing an orchestration ID. /// The cancellation token to cancel enqueuing of the operation. /// A task that completes when the message has been reliably enqueued. /// This does not wait for the operation to be processed by the receiving entity. @@ -39,6 +40,7 @@ public abstract Task SignalEntityAsync( string operationName, object? input = null, SignalEntityOptions? options = null, + HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default); /// @@ -55,7 +57,7 @@ public virtual Task SignalEntityAsync( string operationName, SignalEntityOptions options, CancellationToken cancellation = default) - => this.SignalEntityAsync(id, operationName, null, options, cancellation); + => this.SignalEntityAsync(id, operationName, null, options, null, cancellation); /// /// Signals an entity to perform an operation. @@ -69,7 +71,7 @@ public virtual Task SignalEntityAsync( EntityInstanceId id, string operationName, CancellationToken cancellation) - => this.SignalEntityAsync(id, operationName, null, null, cancellation); + => this.SignalEntityAsync(id, operationName, null, null, null, cancellation); /// /// Tries to get the entity with ID of . Includes entity state by default. diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index b5e3371d..421b7f75 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -41,6 +41,7 @@ public override async Task SignalEntityAsync( string operationName, object? input = null, SignalEntityOptions? options = null, + HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { Check.NotNullOrEmpty(id.Name); diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index b0efbb93..55d03d80 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -73,6 +73,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, + HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { Check.NotEntity(this.options.EnableEntitySupport, options?.InstanceId); @@ -83,6 +84,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( Version = orchestratorName.Version, InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"), Input = this.DataConverter.Serialize(input), + OrchestrationIdReusePolicy = { }, }; DateTimeOffset? startAt = options?.StartAt; diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index b948516c..07552de9 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -130,6 +130,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, + HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { cancellation.ThrowIfCancellationRequested(); diff --git a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs index 0732c1a5..a93ed3dc 100644 --- a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs @@ -116,6 +116,7 @@ public override Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, + HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { throw new NotImplementedException(); From 47c5446bb2c16776be764c989a4389bdf760bfc4 Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Thu, 4 Jan 2024 12:19:25 -0700 Subject: [PATCH 2/6] Updating proto Signed-off-by: Ryan Lettieri --- eng/proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eng/proto b/eng/proto index 19f0f696..0e732cf8 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit 19f0f6966186416ad0d2953b2f8e2271d5c93246 +Subproject commit 0e732cf86d4496ee9d870d647fbd348481dc819f From d44e411467edb8b05a2d70e91bd8b84776b6d9a2 Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Thu, 4 Jan 2024 17:18:02 -0700 Subject: [PATCH 3/6] Updating proto again Signed-off-by: Ryan Lettieri --- eng/proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eng/proto b/eng/proto index 0e732cf8..4207e1db 160000 --- a/eng/proto +++ b/eng/proto @@ -1 +1 @@ -Subproject commit 0e732cf86d4496ee9d870d647fbd348481dc819f +Subproject commit 4207e1dbd14cedc268f69c3befee60fcaad19367 From 4fa6141675297f6a27a8ff1949025f6c99523bee Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Fri, 5 Jan 2024 11:23:27 -0700 Subject: [PATCH 4/6] Including proto definitions and moving reuseID into options Signed-off-by: Ryan Lettieri --- src/Abstractions/Abstractions.csproj | 7 +++++++ .../Internal/IOrchestrationSubmitter.cs | 2 -- src/Abstractions/TaskOptions.cs | 8 +++++-- src/Client/Core/DurableTaskClient.cs | 21 ++++++------------- .../Core/Entities/DurableEntityClient.cs | 6 ++---- src/Client/Grpc/GrpcDurableEntityClient.cs | 1 - src/Client/Grpc/GrpcDurableTaskClient.cs | 1 - ...ient.OrchestrationServiceClientShim.csproj | 7 +++++++ .../ShimDurableTaskClient.cs | 4 +++- .../DefaultDurableTaskClientBuilderTests.cs | 1 - 10 files changed, 31 insertions(+), 27 deletions(-) diff --git a/src/Abstractions/Abstractions.csproj b/src/Abstractions/Abstractions.csproj index 7ff40c51..d952c855 100644 --- a/src/Abstractions/Abstractions.csproj +++ b/src/Abstractions/Abstractions.csproj @@ -9,13 +9,20 @@ + + + + + + + diff --git a/src/Abstractions/Internal/IOrchestrationSubmitter.cs b/src/Abstractions/Internal/IOrchestrationSubmitter.cs index 611205cb..4d97a9a0 100644 --- a/src/Abstractions/Internal/IOrchestrationSubmitter.cs +++ b/src/Abstractions/Internal/IOrchestrationSubmitter.cs @@ -27,7 +27,6 @@ public interface IOrchestrationSubmitter /// The optional input to pass to the scheduled orchestration instance. This must be a serializable value. /// /// The options to start the new orchestration with. - /// The policy for reusing an orchestration ID. /// /// The cancellation token. This only cancels enqueueing the new orchestration to the backend. Does not cancel the /// orchestration once enqueued. @@ -41,6 +40,5 @@ Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, - HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default); } diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 63f94382..4824f1ff 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -1,8 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -namespace Microsoft.DurableTask; +using P = Microsoft.DurableTask.Protobuf; +namespace Microsoft.DurableTask; /// /// Options that can be used to control the behavior of orchestrator task execution. /// @@ -100,4 +101,7 @@ public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past /// is specified, the orchestration instance will be scheduled immediately. /// -public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null); +/// The orchestration reuse policy. This allows for the reuse of an instance ID +/// as well as the options for it. +public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null, Dictionary? + OrchestrationIdReusePolicy = null); diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index d979999b..510230b0 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -51,25 +51,20 @@ protected DurableTaskClient(string name) public virtual DurableEntityClient Entities => throw new NotSupportedException($"{this.GetType()} does not support durable entities."); - /// + /// public virtual Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, CancellationToken cancellation) - => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, null, null, cancellation); + => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, null, cancellation); - /// + /// public virtual Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input, CancellationToken cancellation) - => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, null, null, cancellation); + => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, null, cancellation); - /// + /// public virtual Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, StartOrchestrationOptions options, CancellationToken cancellation = default) - => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, options, null, cancellation); - - /// - public virtual Task ScheduleNewOrchestrationInstanceAsync( - TaskName orchestratorName, StartOrchestrationOptions options, HashSet orchestrationIdReusePolicy, CancellationToken cancellation = default) - => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, null, orchestrationIdReusePolicy, cancellation); + => this.ScheduleNewOrchestrationInstanceAsync(orchestratorName, null, options, cancellation); /// /// Schedules a new orchestration instance for execution. @@ -101,9 +96,6 @@ public virtual Task ScheduleNewOrchestrationInstanceAsync( /// The optional input to pass to the scheduled orchestration instance. This must be a serializable value. /// /// The options to start the new orchestration with. - /// - /// The orchestration reuse policy. This allows for the reuse of an instance ID as well as the options for it. - /// /// /// The cancellation token. This only cancels enqueueing the new orchestration to the backend. Does not cancel the /// orchestration once enqueued. @@ -118,7 +110,6 @@ public abstract Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, - HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default); /// diff --git a/src/Client/Core/Entities/DurableEntityClient.cs b/src/Client/Core/Entities/DurableEntityClient.cs index e5fcef81..46cd9c6d 100644 --- a/src/Client/Core/Entities/DurableEntityClient.cs +++ b/src/Client/Core/Entities/DurableEntityClient.cs @@ -31,7 +31,6 @@ protected DurableEntityClient(string name) /// The name of the operation. /// The input for the operation. /// The options to signal the entity with. - /// The policy for reusing an orchestration ID. /// The cancellation token to cancel enqueuing of the operation. /// A task that completes when the message has been reliably enqueued. /// This does not wait for the operation to be processed by the receiving entity. @@ -40,7 +39,6 @@ public abstract Task SignalEntityAsync( string operationName, object? input = null, SignalEntityOptions? options = null, - HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default); /// @@ -57,7 +55,7 @@ public virtual Task SignalEntityAsync( string operationName, SignalEntityOptions options, CancellationToken cancellation = default) - => this.SignalEntityAsync(id, operationName, null, options, null, cancellation); + => this.SignalEntityAsync(id, operationName, null, options, cancellation); /// /// Signals an entity to perform an operation. @@ -71,7 +69,7 @@ public virtual Task SignalEntityAsync( EntityInstanceId id, string operationName, CancellationToken cancellation) - => this.SignalEntityAsync(id, operationName, null, null, null, cancellation); + => this.SignalEntityAsync(id, operationName, null, null, cancellation); /// /// Tries to get the entity with ID of . Includes entity state by default. diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index 421b7f75..b5e3371d 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -41,7 +41,6 @@ public override async Task SignalEntityAsync( string operationName, object? input = null, SignalEntityOptions? options = null, - HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { Check.NotNullOrEmpty(id.Name); diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 55d03d80..0ff022e0 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -73,7 +73,6 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, - HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { Check.NotEntity(this.options.EnableEntitySupport, options?.InstanceId); diff --git a/src/Client/OrchestrationServiceClientShim/Client.OrchestrationServiceClientShim.csproj b/src/Client/OrchestrationServiceClientShim/Client.OrchestrationServiceClientShim.csproj index b5389866..5a5be168 100644 --- a/src/Client/OrchestrationServiceClientShim/Client.OrchestrationServiceClientShim.csproj +++ b/src/Client/OrchestrationServiceClientShim/Client.OrchestrationServiceClientShim.csproj @@ -8,6 +8,11 @@ The client is responsible for interacting with orchestrations from outside the w true + + + + + 1.0.5 @@ -17,11 +22,13 @@ The client is responsible for interacting with orchestrations from outside the w + + diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 07552de9..5cd3039e 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -10,6 +10,8 @@ using Core = DurableTask.Core; using CoreOrchestrationQuery = DurableTask.Core.Query.OrchestrationQuery; +using P = Microsoft.DurableTask.Protobuf; + namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim; /// @@ -130,11 +132,11 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, - HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { cancellation.ThrowIfCancellationRequested(); string instanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"); + Dictionary idReusePolicy = options?.OrchestrationIdReusePolicy ?? new Dictionary(); OrchestrationInstance instance = new() { InstanceId = instanceId, diff --git a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs index a93ed3dc..0732c1a5 100644 --- a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs @@ -116,7 +116,6 @@ public override Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, StartOrchestrationOptions? options = null, - HashSet? orchestrationIdReusePolicy = null, CancellationToken cancellation = default) { throw new NotImplementedException(); From 4e7de08e1e7062464ccbbbb0ed74c1932be84f8c Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Thu, 11 Jan 2024 14:26:07 -0700 Subject: [PATCH 5/6] Creating abstraction for runtime status and orchestration action enum Signed-off-by: Ryan Lettieri --- src/Abstractions/OrchestrationOptions.cs | 87 +++++++++++++++++++ src/Abstractions/TaskOptions.cs | 12 +-- .../ShimDurableTaskClient.cs | 2 +- 3 files changed, 95 insertions(+), 6 deletions(-) create mode 100644 src/Abstractions/OrchestrationOptions.cs diff --git a/src/Abstractions/OrchestrationOptions.cs b/src/Abstractions/OrchestrationOptions.cs new file mode 100644 index 00000000..b918e92c --- /dev/null +++ b/src/Abstractions/OrchestrationOptions.cs @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// Orchestration status options. +/// +/// +public sealed class OrchestrationOptions +{ + /// + /// Initializes a new instance of the class. + /// + public OrchestrationOptions() + { + } + + /// + /// Enum describing the return value when attempting to reuse a previously used instance ID. + /// + public enum InstanceIdReuseAction + { + /// + /// An error will be returned when attempting to reuse a previously used instance ID. + /// + ERROR, + + /// + /// The request to reuse the previously used instanceID will be ignored. + /// + IGNORE, + + /// + /// The currently running orchestration will be terminated and a new instance will be started. + /// + TERMINATE, + } + + /// + /// Enum describing the runtime status of the orchestration. + /// + public enum OrchestrationRuntimeStatus + { + /// + /// The orchestration started running. + /// + Running, + + /// + /// The orchestration completed normally. + /// + Completed, + + /// + /// The orchestration is transitioning into a new instance. + /// + [Obsolete("The ContinuedAsNew status is obsolete and exists only for compatibility reasons.")] + ContinuedAsNew, + + /// + /// The orchestration completed with an unhandled exception. + /// + Failed, + + /// + /// The orchestration canceled gracefully. + /// + [Obsolete("The Canceled status is not currently used and exists only for compatibility reasons.")] + Canceled, + + /// + /// The orchestration was abruptly terminated via a management API call. + /// + Terminated, + + /// + /// The orchestration was scheduled but hasn't started running. + /// + Pending, + + /// + /// The orchestration has been suspended. + /// + Suspended, + } +} diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 4824f1ff..2c50dfe2 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -1,9 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -using P = Microsoft.DurableTask.Protobuf; - namespace Microsoft.DurableTask; + /// /// Options that can be used to control the behavior of orchestrator task execution. /// @@ -64,7 +63,10 @@ public record SubOrchestrationOptions : TaskOptions /// /// The task retry options. /// The orchestration instance ID. - public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) + /// The orchestration reuse policy. This allows for the reuse of an + /// instance ID as well as the options for it. + public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null, Dictionary? + orchestrationIdReusePolicy = null) : base(retry) { this.InstanceId = instanceId; @@ -103,5 +105,5 @@ public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) /// /// The orchestration reuse policy. This allows for the reuse of an instance ID /// as well as the options for it. -public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null, Dictionary? - OrchestrationIdReusePolicy = null); +public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null, + Dictionary, OrchestrationOptions.InstanceIdReuseAction>? OrchestrationIdReusePolicy = null); diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 5cd3039e..b55a96b1 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -136,7 +136,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( { cancellation.ThrowIfCancellationRequested(); string instanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"); - Dictionary idReusePolicy = options?.OrchestrationIdReusePolicy ?? new Dictionary(); + Dictionary, OrchestrationOptions.InstanceIdReuseAction> idReusePolicy = options?.OrchestrationIdReusePolicy ?? new Dictionary, OrchestrationOptions.InstanceIdReuseAction>(); OrchestrationInstance instance = new() { InstanceId = instanceId, From 81b993a156bb11b776ad8bcd3bc3d93d39f40bde Mon Sep 17 00:00:00 2001 From: Ryan Lettieri Date: Tue, 23 Jan 2024 07:33:22 -0700 Subject: [PATCH 6/6] Fixing ID Reuse Policy Signed-off-by: Ryan Lettieri --- src/Abstractions/Abstractions.csproj | 7 -- src/Abstractions/OrchestrationOptions.cs | 95 ++++++------------- src/Abstractions/TaskOptions.cs | 9 +- .../ShimDurableTaskClient.cs | 39 +++++++- 4 files changed, 68 insertions(+), 82 deletions(-) diff --git a/src/Abstractions/Abstractions.csproj b/src/Abstractions/Abstractions.csproj index d952c855..7ff40c51 100644 --- a/src/Abstractions/Abstractions.csproj +++ b/src/Abstractions/Abstractions.csproj @@ -9,20 +9,13 @@ - - - - - - - diff --git a/src/Abstractions/OrchestrationOptions.cs b/src/Abstractions/OrchestrationOptions.cs index b918e92c..875760a1 100644 --- a/src/Abstractions/OrchestrationOptions.cs +++ b/src/Abstractions/OrchestrationOptions.cs @@ -4,84 +4,49 @@ namespace Microsoft.DurableTask; /// -/// Orchestration status options. +/// Enum describing the runtime status of the orchestration. /// -/// -public sealed class OrchestrationOptions +public enum OrchestrationRuntimeStatus { /// - /// Initializes a new instance of the class. + /// The orchestration started running. /// - public OrchestrationOptions() - { - } + Running, /// - /// Enum describing the return value when attempting to reuse a previously used instance ID. + /// The orchestration completed normally. /// - public enum InstanceIdReuseAction - { - /// - /// An error will be returned when attempting to reuse a previously used instance ID. - /// - ERROR, - - /// - /// The request to reuse the previously used instanceID will be ignored. - /// - IGNORE, - - /// - /// The currently running orchestration will be terminated and a new instance will be started. - /// - TERMINATE, - } + Completed, /// - /// Enum describing the runtime status of the orchestration. + /// The orchestration is transitioning into a new instance. /// - public enum OrchestrationRuntimeStatus - { - /// - /// The orchestration started running. - /// - Running, - - /// - /// The orchestration completed normally. - /// - Completed, - - /// - /// The orchestration is transitioning into a new instance. - /// - [Obsolete("The ContinuedAsNew status is obsolete and exists only for compatibility reasons.")] - ContinuedAsNew, + [Obsolete("The ContinuedAsNew status is obsolete and exists only for compatibility reasons.")] + ContinuedAsNew, - /// - /// The orchestration completed with an unhandled exception. - /// - Failed, + /// + /// The orchestration completed with an unhandled exception. + /// + Failed, - /// - /// The orchestration canceled gracefully. - /// - [Obsolete("The Canceled status is not currently used and exists only for compatibility reasons.")] - Canceled, + /// + /// The orchestration canceled gracefully. + /// + [Obsolete("The Canceled status is not currently used and exists only for compatibility reasons.")] + Canceled, - /// - /// The orchestration was abruptly terminated via a management API call. - /// - Terminated, + /// + /// The orchestration was abruptly terminated via a management API call. + /// + Terminated, - /// - /// The orchestration was scheduled but hasn't started running. - /// - Pending, + /// + /// The orchestration was scheduled but hasn't started running. + /// + Pending, - /// - /// The orchestration has been suspended. - /// - Suspended, - } + /// + /// The orchestration has been suspended. + /// + Suspended, } diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 2c50dfe2..b0d3cef8 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -63,10 +63,7 @@ public record SubOrchestrationOptions : TaskOptions /// /// The task retry options. /// The orchestration instance ID. - /// The orchestration reuse policy. This allows for the reuse of an - /// instance ID as well as the options for it. - public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null, Dictionary? - orchestrationIdReusePolicy = null) + public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) : base(retry) { this.InstanceId = instanceId; @@ -104,6 +101,6 @@ public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) /// is specified, the orchestration instance will be scheduled immediately. /// /// The orchestration reuse policy. This allows for the reuse of an instance ID -/// as well as the options for it. +/// if the instance ID referenced is in any of the states supplied in this parameter. public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null, - Dictionary, OrchestrationOptions.InstanceIdReuseAction>? OrchestrationIdReusePolicy = null); + OrchestrationRuntimeStatus[]? OrchestrationIdReusePolicy = null); diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index b55a96b1..80aebfee 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -10,8 +10,6 @@ using Core = DurableTask.Core; using CoreOrchestrationQuery = DurableTask.Core.Query.OrchestrationQuery; -using P = Microsoft.DurableTask.Protobuf; - namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim; /// @@ -136,7 +134,18 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( { cancellation.ThrowIfCancellationRequested(); string instanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"); - Dictionary, OrchestrationOptions.InstanceIdReuseAction> idReusePolicy = options?.OrchestrationIdReusePolicy ?? new Dictionary, OrchestrationOptions.InstanceIdReuseAction>(); + OrchestrationStatus[] runtimeStates = Array.Empty(); + + // Convert from DurableTask.OrchestrationRuntimeStatus to DurableTask.Core.OrchestrationRuntimeStatus + if (options?.OrchestrationIdReusePolicy != null) + { + runtimeStates = new OrchestrationStatus[options.OrchestrationIdReusePolicy.Length]; + for (int i = 0; i < options.OrchestrationIdReusePolicy.Length; i++) + { + runtimeStates[i] = ConvertState(options.OrchestrationIdReusePolicy[i]); + } + } + OrchestrationInstance instance = new() { InstanceId = instanceId, @@ -156,7 +165,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( }, }; - await this.Client.CreateTaskOrchestrationAsync(message); + await this.Client.CreateTaskOrchestrationAsync(message, runtimeStates); return instanceId; } @@ -215,6 +224,28 @@ public override async Task WaitForInstanceStartAsync( } } + // Converts from DurableTask.OrchestrationRuntimeStatus to OrchestrationStatus for policy reuse. + static OrchestrationStatus ConvertState(DurableTask.OrchestrationRuntimeStatus state) + { + switch (state) + { + case DurableTask.OrchestrationRuntimeStatus.Running: + return OrchestrationStatus.Running; + case DurableTask.OrchestrationRuntimeStatus.Completed: + return OrchestrationStatus.Completed; + case DurableTask.OrchestrationRuntimeStatus.Failed: + return OrchestrationStatus.Failed; + case DurableTask.OrchestrationRuntimeStatus.Terminated: + return OrchestrationStatus.Terminated; + case DurableTask.OrchestrationRuntimeStatus.Pending: + return OrchestrationStatus.Pending; + case DurableTask.OrchestrationRuntimeStatus.Suspended: + return OrchestrationStatus.Suspended; + default: + return OrchestrationStatus.Terminated; + } + } + [return: NotNullIfNotNull("state")] OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs) {