diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs b/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs index 8cbd322..4380df2 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table.Demo/DemoCases/UC10CreateHugeAmountOfDemoEntries.cs @@ -25,7 +25,7 @@ public async Task Execute(string storageKey, string storageSecret, string endpoi // create 2000 items var data = new List(); - for (int i = 0; i < 2000; i++) + for (int i = 0; i < 20000; i++) data.Add(new HugeDemoEntry()); await storageContext.EnableAutoCreateTable().MergeOrInsertAsync(data); diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs b/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs index 0fc11ac..4b9fdac 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table.Demo/Program.cs @@ -24,7 +24,7 @@ static async Task Main(string[] args) // register all demo cases var cases = new List { - /new UC01StoreWithStaticEntityMapper(), + // new UC01StoreWithStaticEntityMapper(), // new UC02StoreWithAttributeMapper(), // new UC03StoreWithAttributeMapperManualRegistration(), // new UC04GetVirtualArray(), @@ -33,7 +33,7 @@ static async Task Main(string[] args) // new UC07CreateModelsPaged(), // new UC08CheckMaxItems(), // new UC09ReadInterfaceValues(), - // new UC10CreateHugeAmountOfDemoEntries(), + new UC10CreateHugeAmountOfDemoEntries(), // new UC11ReadPageByPage(), // new UC12PartialUpdateMergeModel(), // new UC13DynamicallyCreateList(), @@ -42,7 +42,7 @@ static async Task Main(string[] args) // new UC16Backup() // new UC17Restore() // new UC18DateTime() - new UC19QueryFilter() + // new UC19QueryFilter() }; // register demo cases for Ger Cloud @@ -54,10 +54,11 @@ static async Task Main(string[] args) // execute in WW cloud Console.WriteLine("Executing Demo Cases (WW Cloud)"); foreach (var useCase in cases) - await useCase.Execute(config.GetValue("key").ToString(), config.GetValue("secret").ToString()); - - // execute in GER cloud - /*Console.WriteLine("Executing Demo Cases (GER Cloud)"); + await useCase.Execute(config.GetValue("key").ToString(), config.GetValue("secret").ToString()); + + Console.ReadKey(); + // execute in GER cloud + /*Console.WriteLine("Executing Demo Cases (GER Cloud)"); foreach (var useCase in casesGer) await useCase.Execute(config.GetValue("keyde").ToString(), config.GetValue("secretde").ToString(), "core.cloudapi.de"); */ } diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj b/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj index 4201084..3b0e95e 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj +++ b/CoreHelpers.WindowsAzure.Storage.Table.Net45/CoreHelpers.WindowsAzure.Storage.Table.Net45.csproj @@ -116,6 +116,9 @@ Extensions\PropertyInfoSetValueFromEntityProperty.cs + + Models\ParallelConnectionsOptions.cs + PagedTableEntityWriter.cs diff --git a/CoreHelpers.WindowsAzure.Storage.Table/Models/ParallelConnectionsOptions.cs b/CoreHelpers.WindowsAzure.Storage.Table/Models/ParallelConnectionsOptions.cs new file mode 100644 index 0000000..ced5c17 --- /dev/null +++ b/CoreHelpers.WindowsAzure.Storage.Table/Models/ParallelConnectionsOptions.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace CoreHelpers.WindowsAzure.Storage.Table.Models +{ + public class ParallelConnectionsOptions + { + + public static ParallelConnectionsOptions Default => new ParallelConnectionsOptions() + { + RunInParallel = true, + MaxDegreeOfParallelism = 20 + }; + + public bool RunInParallel { get; set; } + + public int MaxDegreeOfParallelism { get; set; } + + } +} diff --git a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs index bd6fdac..52e15c3 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs @@ -299,7 +299,7 @@ private string GetTableName(Type entityType) return entityMapper.TableName; } - public async Task StoreAsync(nStoreOperation storaeOperationType, IEnumerable models) where T : new() + public async Task StoreAsync(nStoreOperation storaeOperationType, IEnumerable models, ParallelConnectionsOptions parallelOptions = null) where T : new() { try { @@ -308,66 +308,115 @@ private string GetTableName(Type entityType) _delegate.OnStoring(typeof(T), storaeOperationType); // Retrieve a reference to the table. - CloudTable table = GetTableReference(GetTableName()); + var table = GetTableReference(GetTableName()); // Create the batch operation. - List batchOperations = new List(); - - // Create the first batch - var currentBatch = new TableBatchOperation(); - batchOperations.Add(currentBatch); + var batchOperations = new List(); + + // Allocate batch variable + var currentBatch = default(TableBatchOperation); // lookup the entitymapper var entityMapper = _entityMapperRegistry[typeof(T)]; + + // batch operations must be in the same partition + var partitions = models.Select(m => new DynamicTableEntity(m, entityMapper)).GroupBy(m => m.PartitionKey); - // define the modelcounter - int modelCounter = 0; + var batchTasks = new List>>(); - // Add all items - foreach (var model in models) - { - switch (storaeOperationType) - { - case nStoreOperation.insertOperation: - currentBatch.Insert(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.insertOrReplaceOperation: - currentBatch.InsertOrReplace(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.mergeOperation: - currentBatch.Merge(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.mergeOrInserOperation: - currentBatch.InsertOrMerge(new DynamicTableEntity(model, entityMapper)); - break; - case nStoreOperation.delete: - currentBatch.Delete(new DynamicTableEntity(model, entityMapper)); - break; - } - - modelCounter++; - - if (modelCounter % 100 == 0) - { - currentBatch = new TableBatchOperation(); - batchOperations.Add(currentBatch); - } - } + if (parallelOptions == null) + parallelOptions = ParallelConnectionsOptions.Default; - // execute - foreach (var createdBatch in batchOperations) - { - if (createdBatch.Count() > 0) - { - await table.ExecuteBatchAsync(createdBatch); - - // notify delegate - if (_delegate != null) - _delegate.OnStored(typeof(T), storaeOperationType, createdBatch.Count(), null); - } - } - } - catch (StorageException ex) + if (parallelOptions.RunInParallel && _autoCreateTable) + { + // try to create the table if we are parallel processing the catch/retry mechanism fails + await CreateTableAsync(true); + } + + + // Add all items + foreach (var partition in partitions) + { + + currentBatch = new TableBatchOperation(); + if (!parallelOptions.RunInParallel) + batchOperations.Add(currentBatch); + + foreach (var dynamicEntity in partition) + { + if (currentBatch.Count == 100) + { + if (parallelOptions.RunInParallel) + batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); + + currentBatch = new TableBatchOperation(); + + if (!parallelOptions.RunInParallel) + batchOperations.Add(currentBatch); + } + + switch (storaeOperationType) + { + case nStoreOperation.insertOperation: + currentBatch.Insert(dynamicEntity); + break; + case nStoreOperation.insertOrReplaceOperation: + currentBatch.InsertOrReplace(dynamicEntity); + break; + case nStoreOperation.mergeOperation: + currentBatch.Merge(dynamicEntity); + break; + case nStoreOperation.mergeOrInserOperation: + currentBatch.InsertOrMerge(dynamicEntity); + break; + case nStoreOperation.delete: + currentBatch.Delete(dynamicEntity); + break; + } + + + if (parallelOptions.RunInParallel && batchTasks.Count >= parallelOptions.MaxDegreeOfParallelism) + { + var taskResults = await Task.WhenAll(batchTasks); + if (_delegate != null) + foreach (var taskResult in taskResults) + _delegate.OnStored(typeof(T), storaeOperationType, taskResult.Count, null); + + batchTasks.Clear(); + } + } + + if (parallelOptions.RunInParallel && currentBatch != null && currentBatch.Any()) + batchTasks.Add(table.ExecuteBatchAsync(currentBatch)); + + } + + if (parallelOptions.RunInParallel) + { + + var taskResults = await Task.WhenAll(batchTasks); + if (_delegate != null) + foreach (var taskResult in taskResults) + _delegate.OnStored(typeof(T), storaeOperationType, taskResult.Count, null); + } + else + { + // execute + foreach (var createdBatch in batchOperations) + { + if (createdBatch.Count() > 0) + { + await table.ExecuteBatchAsync(createdBatch); + + // notify delegate + if (_delegate != null) + _delegate.OnStored(typeof(T), storaeOperationType, createdBatch.Count, null); + } + } + } + + } + catch (StorageException ex) { // check the exception if (!_autoCreateTable || !ex.Message.StartsWith("0:The table specified does not exist", StringComparison.CurrentCulture))