Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Grouped store operations by partition key #7

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public async Task Execute(string storageKey, string storageSecret, string endpoi

// create 2000 items
var data = new List<HugeDemoEntry>();
for (int i = 0; i < 2000; i++)
for (int i = 0; i < 20000; i++)
data.Add(new HugeDemoEntry());

await storageContext.EnableAutoCreateTable().MergeOrInsertAsync<HugeDemoEntry>(data);
Expand Down
15 changes: 8 additions & 7 deletions CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ static async Task Main(string[] args)
// register all demo cases
var cases = new List<IDemoCase>
{
/new UC01StoreWithStaticEntityMapper(),
// new UC01StoreWithStaticEntityMapper(),
// new UC02StoreWithAttributeMapper(),
// new UC03StoreWithAttributeMapperManualRegistration(),
// new UC04GetVirtualArray(),
Expand All @@ -33,7 +33,7 @@ static async Task Main(string[] args)
// new UC07CreateModelsPaged(),
// new UC08CheckMaxItems(),
// new UC09ReadInterfaceValues(),
// new UC10CreateHugeAmountOfDemoEntries(),
new UC10CreateHugeAmountOfDemoEntries(),
// new UC11ReadPageByPage(),
// new UC12PartialUpdateMergeModel(),
// new UC13DynamicallyCreateList(),
Expand All @@ -42,7 +42,7 @@ static async Task Main(string[] args)
// new UC16Backup()
// new UC17Restore()
// new UC18DateTime()
new UC19QueryFilter()
// new UC19QueryFilter()
};

// register demo cases for Ger Cloud
Expand All @@ -54,10 +54,11 @@ static async Task Main(string[] args)
// execute in WW cloud
Console.WriteLine("Executing Demo Cases (WW Cloud)");
foreach (var useCase in cases)
await useCase.Execute(config.GetValue("key").ToString(), config.GetValue("secret").ToString());

// execute in GER cloud
/*Console.WriteLine("Executing Demo Cases (GER Cloud)");
await useCase.Execute(config.GetValue("key").ToString(), config.GetValue("secret").ToString());

Console.ReadKey();
// execute in GER cloud
/*Console.WriteLine("Executing Demo Cases (GER Cloud)");
foreach (var useCase in casesGer)
await useCase.Execute(config.GetValue("keyde").ToString(), config.GetValue("secretde").ToString(), "core.cloudapi.de"); */
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@
<Compile Include="..\CoreHelpers.WindowsAzure.Storage.Table\Extensions\PropertyInfoSetValueFromEntityProperty.cs">
<Link>Extensions\PropertyInfoSetValueFromEntityProperty.cs</Link>
</Compile>
<Compile Include="..\CoreHelpers.WindowsAzure.Storage.Table\Models\ParallelConnectionsOptions.cs">
<Link>Models\ParallelConnectionsOptions.cs</Link>
</Compile>
<Compile Include="..\CoreHelpers.WindowsAzure.Storage.Table\PagedTableEntityWriter.cs">
<Link>PagedTableEntityWriter.cs</Link>
</Compile>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace CoreHelpers.WindowsAzure.Storage.Table.Models
{
public class ParallelConnectionsOptions
{

public static ParallelConnectionsOptions Default => new ParallelConnectionsOptions()
{
RunInParallel = true,
MaxDegreeOfParallelism = 20
};

public bool RunInParallel { get; set; }

public int MaxDegreeOfParallelism { get; set; }

}
}
155 changes: 102 additions & 53 deletions CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private string GetTableName(Type entityType)
return entityMapper.TableName;
}

public async Task StoreAsync<T>(nStoreOperation storaeOperationType, IEnumerable<T> models) where T : new()
public async Task StoreAsync<T>(nStoreOperation storaeOperationType, IEnumerable<T> models, ParallelConnectionsOptions parallelOptions = null) where T : new()
{
try
{
Expand All @@ -308,66 +308,115 @@ private string GetTableName(Type entityType)
_delegate.OnStoring(typeof(T), storaeOperationType);

// Retrieve a reference to the table.
CloudTable table = GetTableReference(GetTableName<T>());
var table = GetTableReference(GetTableName<T>());

// Create the batch operation.
List<TableBatchOperation> batchOperations = new List<TableBatchOperation>();

// Create the first batch
var currentBatch = new TableBatchOperation();
batchOperations.Add(currentBatch);
var batchOperations = new List<TableBatchOperation>();

// Allocate batch variable
var currentBatch = default(TableBatchOperation);

// lookup the entitymapper
var entityMapper = _entityMapperRegistry[typeof(T)];

// batch operations must be in the same partition
var partitions = models.Select(m => new DynamicTableEntity<T>(m, entityMapper)).GroupBy(m => m.PartitionKey);

// define the modelcounter
int modelCounter = 0;
var batchTasks = new List<Task<IList<TableResult>>>();

// Add all items
foreach (var model in models)
{
switch (storaeOperationType)
{
case nStoreOperation.insertOperation:
currentBatch.Insert(new DynamicTableEntity<T>(model, entityMapper));
break;
case nStoreOperation.insertOrReplaceOperation:
currentBatch.InsertOrReplace(new DynamicTableEntity<T>(model, entityMapper));
break;
case nStoreOperation.mergeOperation:
currentBatch.Merge(new DynamicTableEntity<T>(model, entityMapper));
break;
case nStoreOperation.mergeOrInserOperation:
currentBatch.InsertOrMerge(new DynamicTableEntity<T>(model, entityMapper));
break;
case nStoreOperation.delete:
currentBatch.Delete(new DynamicTableEntity<T>(model, entityMapper));
break;
}

modelCounter++;

if (modelCounter % 100 == 0)
{
currentBatch = new TableBatchOperation();
batchOperations.Add(currentBatch);
}
}
if (parallelOptions == null)
parallelOptions = ParallelConnectionsOptions.Default;

// execute
foreach (var createdBatch in batchOperations)
{
if (createdBatch.Count() > 0)
{
await table.ExecuteBatchAsync(createdBatch);

// notify delegate
if (_delegate != null)
_delegate.OnStored(typeof(T), storaeOperationType, createdBatch.Count(), null);
}
}
}
catch (StorageException ex)
if (parallelOptions.RunInParallel && _autoCreateTable)
{
// try to create the table if we are parallel processing the catch/retry mechanism fails
await CreateTableAsync<T>(true);
}


// Add all items
foreach (var partition in partitions)
{

currentBatch = new TableBatchOperation();
if (!parallelOptions.RunInParallel)
batchOperations.Add(currentBatch);

foreach (var dynamicEntity in partition)
{
if (currentBatch.Count == 100)
{
if (parallelOptions.RunInParallel)
batchTasks.Add(table.ExecuteBatchAsync(currentBatch));

currentBatch = new TableBatchOperation();

if (!parallelOptions.RunInParallel)
batchOperations.Add(currentBatch);
}

switch (storaeOperationType)
{
case nStoreOperation.insertOperation:
currentBatch.Insert(dynamicEntity);
break;
case nStoreOperation.insertOrReplaceOperation:
currentBatch.InsertOrReplace(dynamicEntity);
break;
case nStoreOperation.mergeOperation:
currentBatch.Merge(dynamicEntity);
break;
case nStoreOperation.mergeOrInserOperation:
currentBatch.InsertOrMerge(dynamicEntity);
break;
case nStoreOperation.delete:
currentBatch.Delete(dynamicEntity);
break;
}


if (parallelOptions.RunInParallel && batchTasks.Count >= parallelOptions.MaxDegreeOfParallelism)
{
var taskResults = await Task.WhenAll(batchTasks);
if (_delegate != null)
foreach (var taskResult in taskResults)
_delegate.OnStored(typeof(T), storaeOperationType, taskResult.Count, null);

batchTasks.Clear();
}
}

if (parallelOptions.RunInParallel && currentBatch != null && currentBatch.Any())
batchTasks.Add(table.ExecuteBatchAsync(currentBatch));

}

if (parallelOptions.RunInParallel)
{

var taskResults = await Task.WhenAll(batchTasks);
if (_delegate != null)
foreach (var taskResult in taskResults)
_delegate.OnStored(typeof(T), storaeOperationType, taskResult.Count, null);
}
else
{
// execute
foreach (var createdBatch in batchOperations)
{
if (createdBatch.Count() > 0)
{
await table.ExecuteBatchAsync(createdBatch);

// notify delegate
if (_delegate != null)
_delegate.OnStored(typeof(T), storaeOperationType, createdBatch.Count, null);
}
}
}

}
catch (StorageException ex)
{
// check the exception
if (!_autoCreateTable || !ex.Message.StartsWith("0:The table specified does not exist", StringComparison.CurrentCulture))
Expand Down