Skip to content

Commit

Permalink
확장된 명령 헤더 속성을 적용한다
Browse files Browse the repository at this point in the history
  • Loading branch information
gyuwon committed Jan 2, 2018
1 parent a4f8e3d commit 3d2ca41
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ Task<T> FindProcessManager(

Task SaveProcessManagerAndPublishCommands(
T processManager,
Guid? operationId = default,
Guid? correlationId = default,
string contributor = default,
CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public static PendingCommand FromEnvelope<T>(
ProcessManagerType = typeof(T).FullName,
ProcessManagerId = processManager.Id,
MessageId = envelope.MessageId,
OperationId = envelope.OperationId,
CorrelationId = envelope.CorrelationId,
Contributor = envelope.Contributor,
CommandJson = serializer.Serialize(envelope.Message),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public static PendingScheduledCommand FromScheduledEnvelope<T>(
ProcessManagerType = typeof(T).FullName,
ProcessManagerId = processManager.Id,
MessageId = scheduledEnvelope.Envelope.MessageId,
OperationId = scheduledEnvelope.Envelope.OperationId,
CorrelationId = scheduledEnvelope.Envelope.CorrelationId,
Contributor = scheduledEnvelope.Envelope.Contributor,
CommandJson = serializer.Serialize(scheduledEnvelope.Envelope.Message),
ScheduledTime = scheduledEnvelope.ScheduledTime,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,8 @@ public SqlProcessManagerDataContext(

public void Dispose() => _dbContext.Dispose();

[Obsolete("Use FindProcessManager() instead. This method will be removed in version 1.0.0.")]
public Task<T> Find(Expression<Func<T, bool>> predicate, CancellationToken cancellationToken)
=> FindProcessManager(predicate, cancellationToken);

public Task<T> FindProcessManager(
Expression<Func<T, bool>> predicate, CancellationToken cancellationToken)
Expression<Func<T, bool>> predicate, CancellationToken cancellationToken = default)
{
if (predicate == null)
{
Expand Down Expand Up @@ -92,31 +88,40 @@ private Task FlushCommandsIfProcessManagerExists(

public Task SaveProcessManagerAndPublishCommands(
T processManager,
Guid? correlationId,
CancellationToken cancellationToken)
Guid? operationId = default,
Guid? correlationId = default,
string contributor = default,
CancellationToken cancellationToken = default)
{
if (processManager == null)
{
throw new ArgumentNullException(nameof(processManager));
}

async Task Run()
{
await SaveProcessManagerAndCommands(processManager, correlationId, cancellationToken).ConfigureAwait(false);
await TryFlushCommands(processManager, cancellationToken).ConfigureAwait(false);
}
return RunSaveProcessManagerAndPublishCommands(processManager, operationId, correlationId, contributor, cancellationToken);
}

return Run();
private async Task RunSaveProcessManagerAndPublishCommands(
T processManager,
Guid? operationId,
Guid? correlationId,
string contributor,
CancellationToken cancellationToken)
{
await SaveProcessManagerAndCommands(processManager, operationId, correlationId, contributor, cancellationToken).ConfigureAwait(false);
await TryFlushCommands(processManager, cancellationToken).ConfigureAwait(false);
}

private Task SaveProcessManagerAndCommands(
T processManager,
Guid? operationId,
Guid? correlationId,
string contributor,
CancellationToken cancellationToken)
{
UpsertProcessManager(processManager);
InsertPendingCommands(processManager, correlationId);
InsertPendingScheduledCommands(processManager, correlationId);
InsertPendingCommands(processManager, operationId, correlationId, contributor);
InsertPendingScheduledCommands(processManager, operationId, correlationId, contributor);
return Commit(cancellationToken);
}

Expand All @@ -128,17 +133,17 @@ private void UpsertProcessManager(T processManager)
}
}

private void InsertPendingCommands(T processManager, Guid? correlationId)
private void InsertPendingCommands(T processManager, Guid? operationId, Guid? correlationId, string contributor)
{
IEnumerable<PendingCommand> pendingCommands = processManager
.FlushPendingCommands()
.Select(command => new Envelope(Guid.NewGuid(), command, correlationId: correlationId))
.Select(command => new Envelope(Guid.NewGuid(), command, operationId, correlationId, contributor))
.Select(envelope => PendingCommand.FromEnvelope(processManager, envelope, _serializer));

_dbContext.PendingCommands.AddRange(pendingCommands);
}

private void InsertPendingScheduledCommands(T processManager, Guid? correlationId)
private void InsertPendingScheduledCommands(T processManager, Guid? operationId, Guid? correlationId, string contributor)
{
IEnumerable<PendingScheduledCommand> pendingScheduledCommands =
from scheduledCommand in processManager.FlushPendingScheduledCommands()
Expand All @@ -147,7 +152,9 @@ from scheduledCommand in processManager.FlushPendingScheduledCommands()
new Envelope(
Guid.NewGuid(),
scheduledCommand.Command,
correlationId: correlationId),
operationId,
correlationId,
contributor),
scheduledCommand.ScheduledTime)
select PendingScheduledCommand.FromScheduledEnvelope(processManager, scheduledEnvelope, _serializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
<CodeAnalysisRuleSet>..\Rules\Khala.Processes.Tests.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>

<ItemGroup>
<Compile Remove="FakeDomain\Migrations\20180102052656_TheMigration.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="AutoFixture" Version="4.0.0-rc1" />
<PackageReference Include="FluentAssertions" Version="4.19.4" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<ShowAllFiles>false</ShowAllFiles>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using AutoFixture;
using FluentAssertions;
using Khala.FakeDomain;
using Khala.Messaging;
Expand Down Expand Up @@ -165,8 +166,7 @@ public async Task SaveProcessManagerAndPublishCommands_inserts_new_process_manag
using (sut)
{
// Act
var correlationId = default(Guid?);
await sut.SaveProcessManagerAndPublishCommands(processManager, correlationId, CancellationToken.None);
await sut.SaveProcessManagerAndPublishCommands(processManager);
}

// Assert
Expand Down Expand Up @@ -205,10 +205,9 @@ public async Task SaveProcessManagerAndPublishCommands_updates_existing_process_
{
processManager = await sut.FindProcessManager(x => x.Id == processManager.Id, default);
processManager.StatusValue = statusValue;
var correlationId = default(Guid?);

// Act
await sut.SaveProcessManagerAndPublishCommands(processManager, correlationId, default);
await sut.SaveProcessManagerAndPublishCommands(processManager);
}

// Assert
Expand All @@ -230,10 +229,9 @@ public async Task SaveProcessManagerAndPublishCommands_commits_once()
new JsonMessageSerializer(),
Mock.Of<ICommandPublisher>());
var processManager = new FakeProcessManager();
var correlationId = default(Guid?);

// Act
await sut.SaveProcessManagerAndPublishCommands(processManager, correlationId, CancellationToken.None);
await sut.SaveProcessManagerAndPublishCommands(processManager);

// Assert
context.CommitCount.Should().Be(1);
Expand All @@ -243,15 +241,17 @@ public async Task SaveProcessManagerAndPublishCommands_commits_once()
public async Task SaveProcessManagerAndPublishCommands_inserts_pending_commands_sequentially()
{
// Arrange
var random = new Random();
var fixture = new Fixture();
IEnumerable<FakeCommand> commands = new[]
{
new FakeCommand { Int32Value = random.Next(), StringValue = Guid.NewGuid().ToString() },
new FakeCommand { Int32Value = random.Next(), StringValue = Guid.NewGuid().ToString() },
new FakeCommand { Int32Value = random.Next(), StringValue = Guid.NewGuid().ToString() },
new FakeCommand { Int32Value = fixture.Create<int>(), StringValue = Guid.NewGuid().ToString() },
new FakeCommand { Int32Value = fixture.Create<int>(), StringValue = Guid.NewGuid().ToString() },
new FakeCommand { Int32Value = fixture.Create<int>(), StringValue = Guid.NewGuid().ToString() },
};
var processManager = new FakeProcessManager(commands);
var operationId = Guid.NewGuid();
var correlationId = Guid.NewGuid();
string contributor = fixture.Create<string>();

var serializer = new JsonMessageSerializer();

Expand All @@ -263,7 +263,7 @@ public async Task SaveProcessManagerAndPublishCommands_inserts_pending_commands_
// Act
using (sut)
{
await sut.SaveProcessManagerAndPublishCommands(processManager, correlationId, CancellationToken.None);
await sut.SaveProcessManagerAndPublishCommands(processManager, operationId, correlationId, contributor);
}

// Assert
Expand All @@ -282,7 +282,9 @@ orderby c.Id
t.actual.ProcessManagerType.Should().Be(typeof(FakeProcessManager).FullName);
t.actual.ProcessManagerId.Should().Be(processManager.Id);
t.actual.MessageId.Should().NotBeEmpty();
t.actual.OperationId.Should().Be(operationId);
t.actual.CorrelationId.Should().Be(correlationId);
t.actual.Contributor.Should().Be(contributor);
serializer.Deserialize(t.actual.CommandJson).ShouldBeEquivalentTo(t.expected, opts => opts.RespectingRuntimeTypes());
}
}
Expand All @@ -292,17 +294,19 @@ orderby c.Id
public async Task SaveProcessManagerAndPublishCommands_inserts_pending_scheduled_commands_sequentially()
{
// Arrange
var random = new Random();
var fixture = new Fixture();
IEnumerable<(FakeCommand command, DateTimeOffset scheduledTime)> scheduledCommands = new[]
{
(new FakeCommand { Int32Value = random.Next(), StringValue = Guid.NewGuid().ToString() }, DateTimeOffset.Now.AddTicks(random.Next())),
(new FakeCommand { Int32Value = random.Next(), StringValue = Guid.NewGuid().ToString() }, DateTimeOffset.Now.AddTicks(random.Next())),
(new FakeCommand { Int32Value = random.Next(), StringValue = Guid.NewGuid().ToString() }, DateTimeOffset.Now.AddTicks(random.Next())),
(new FakeCommand { Int32Value = fixture.Create<int>(), StringValue = Guid.NewGuid().ToString() }, DateTimeOffset.Now.AddTicks(fixture.Create<int>())),
(new FakeCommand { Int32Value = fixture.Create<int>(), StringValue = Guid.NewGuid().ToString() }, DateTimeOffset.Now.AddTicks(fixture.Create<int>())),
(new FakeCommand { Int32Value = fixture.Create<int>(), StringValue = Guid.NewGuid().ToString() }, DateTimeOffset.Now.AddTicks(fixture.Create<int>())),
};
var processManager = new FakeProcessManager(
from e in scheduledCommands
select new ScheduledCommand(e.command, e.scheduledTime));
var operationId = Guid.NewGuid();
var correlationId = Guid.NewGuid();
string contributor = fixture.Create<string>();
var serializer = new JsonMessageSerializer();

// Act
Expand All @@ -311,7 +315,7 @@ from e in scheduledCommands
serializer,
Mock.Of<ICommandPublisher>()))
{
await sut.SaveProcessManagerAndPublishCommands(processManager, correlationId, CancellationToken.None);
await sut.SaveProcessManagerAndPublishCommands(processManager, operationId, correlationId, contributor);
}

// Assert
Expand All @@ -331,7 +335,9 @@ orderby c.Id
t.actual.ProcessManagerType.Should().Be(typeof(FakeProcessManager).FullName);
t.actual.ProcessManagerId.Should().Be(processManager.Id);
t.actual.MessageId.Should().NotBeEmpty();
t.actual.OperationId.Should().Be(operationId);
t.actual.CorrelationId.Should().Be(correlationId);
t.actual.Contributor.Should().Be(contributor);
serializer.Deserialize(t.actual.CommandJson).ShouldBeEquivalentTo(t.expected.command, opts => opts.RespectingRuntimeTypes());
t.actual.ScheduledTime.Should().Be(t.expected.scheduledTime);
}
Expand All @@ -350,7 +356,7 @@ public async Task SaveProcessManagerAndPublishCommands_publishes_commands()
publisher);

// Act
await sut.SaveProcessManagerAndPublishCommands(processManager, null, CancellationToken.None);
await sut.SaveProcessManagerAndPublishCommands(processManager);

// Assert
Mock.Get(publisher).Verify(x => x.FlushCommands(processManager.Id, CancellationToken.None), Times.Once());
Expand All @@ -371,7 +377,7 @@ public void given_fails_to_commit_SaveProcessManagerAndPublishCommands_does_not_
publisher);

// Act
Func<Task> action = () => sut.SaveProcessManagerAndPublishCommands(processManager, null, CancellationToken.None);
Func<Task> action = () => sut.SaveProcessManagerAndPublishCommands(processManager);

// Assert
action.ShouldThrow<SqlException>();
Expand Down Expand Up @@ -400,7 +406,7 @@ public void given_command_publisher_fails_SaveProcessManagerAndPublishCommands_i

// Act
Func<Task> action = () =>
sut.SaveProcessManagerAndPublishCommands(processManager, null, cancellationToken);
sut.SaveProcessManagerAndPublishCommands(processManager, cancellationToken: cancellationToken);

// Assert
action.ShouldThrow<InvalidOperationException>().Which.Should().BeSameAs(exception);
Expand Down Expand Up @@ -439,7 +445,7 @@ public void given_command_publisher_exception_handled_SaveProcessManagerAndPubli

// Act
Func<Task> action = () =>
sut.SaveProcessManagerAndPublishCommands(processManager, null, cancellationToken);
sut.SaveProcessManagerAndPublishCommands(processManager, cancellationToken: cancellationToken);

// Assert
action.ShouldNotThrow();
Expand Down Expand Up @@ -469,7 +475,7 @@ public void SaveProcessManagerAndPublishCommands_absorbs_command_publisher_excep

// Act
Func<Task> action = () =>
sut.SaveProcessManagerAndPublishCommands(processManager, null, cancellationToken);
sut.SaveProcessManagerAndPublishCommands(processManager, cancellationToken: cancellationToken);

// Assert
action.ShouldNotThrow();
Expand All @@ -493,7 +499,7 @@ public void given_db_context_does_not_support_transaction_SaveProcessManagerAndP

// Act
Func<Task> action = () =>
sut.SaveProcessManagerAndPublishCommands(processManager, null, CancellationToken.None);
sut.SaveProcessManagerAndPublishCommands(processManager);

// Assert
action.ShouldThrow<InvalidOperationException>();
Expand All @@ -518,7 +524,7 @@ public void given_db_context_supports_transaction_SaveProcessManagerAndPublishCo

// Act
Func<Task> action = () =>
sut.SaveProcessManagerAndPublishCommands(processManager, null, CancellationToken.None);
sut.SaveProcessManagerAndPublishCommands(processManager);

// Assert
action.ShouldNotThrow();
Expand Down
Loading

0 comments on commit 3d2ca41

Please sign in to comment.