Skip to content

Commit

Permalink
put the dynamic table creation directly to the batch operation to red…
Browse files Browse the repository at this point in the history
…uce runtime
  • Loading branch information
dei79 committed Aug 29, 2022
1 parent a3ba42b commit d89fded
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;

namespace CoreHelpers.WindowsAzure.Storage.Table.Extensions
Expand Down Expand Up @@ -33,6 +37,31 @@ public static async Task<bool> ExistsAsync(this TableClient tc)
}
}
}

public static async Task<Response<IReadOnlyList<Response>>> SubmitTransactionWithAutoCreateTableAsync(this TableClient tc, IEnumerable<TableTransactionAction> transactionActions, CancellationToken cancellationToken, bool allowAutoCreate)
{
try
{
return await tc.SubmitTransactionAsync(transactionActions, cancellationToken);
}
catch (TableTransactionFailedException ex)
{
// check the exception
if (allowAutoCreate && ex.ErrorCode.Equals("TableNotFound"))
{
// try to create the table
await tc.CreateAsync();

// retry
return await tc.SubmitTransactionAsync(transactionActions, cancellationToken);
}
else
{
ExceptionDispatchInfo.Capture(ex).Throw();
return null;
}
}
}
}
}

57 changes: 23 additions & 34 deletions CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Text.RegularExpressions;
using Azure.Data.Tables;
using CoreHelpers.WindowsAzure.Storage.Table.Serialization;
using System.Threading;

namespace CoreHelpers.WindowsAzure.Storage.Table
{
Expand Down Expand Up @@ -296,13 +297,9 @@ private string GetTableName(string tableName)

// Retrieve a reference to the table.
var tc = GetTableClient(GetTableName<T>());

// Create the batch
var tableTransactionsBatch = new List<List<TableTransactionAction>>();


// Create the frist transaction
var tableTransactions = new List<TableTransactionAction>();
tableTransactionsBatch.Add(tableTransactions);

// lookup the entitymapper
var entityMapper = _entityMapperRegistry[typeof(T)];
Expand Down Expand Up @@ -336,44 +333,36 @@ private string GetTableName(string tableName)

if (modelCounter % 100 == 0)
{
tableTransactions = new List<TableTransactionAction>();
tableTransactionsBatch.Add(tableTransactions);
// store the first 100 models
await tc.SubmitTransactionWithAutoCreateTableAsync(tableTransactions, default(CancellationToken), _autoCreateTable);

// notify delegate
if (_delegate != null)
_delegate.OnStored(typeof(T), storaeOperationType, tableTransactions.Count(), null);

// generate a fresh transaction
tableTransactions = new List<TableTransactionAction>();
}
}

// execute
foreach (var createdBatch in tableTransactionsBatch)
// store the last transaction
if (tableTransactions.Count > 0)
{
if (createdBatch.Count() > 0)
{
await tc.SubmitTransactionAsync(createdBatch);
await tc.SubmitTransactionWithAutoCreateTableAsync(tableTransactions, default(CancellationToken), _autoCreateTable);

// notify delegate
if (_delegate != null)
_delegate.OnStored(typeof(T), storaeOperationType, createdBatch.Count(), null);
}
}
// notify delegate
if (_delegate != null)
_delegate.OnStored(typeof(T), storaeOperationType, tableTransactions.Count(), null);
}
}
catch (TableTransactionFailedException ex)
{
// check the exception
if (_autoCreateTable && ex.ErrorCode.Equals("TableNotFound"))
{
// try to create the table
await CreateTableAsync<T>();
// notify delegate
if (_delegate != null)
_delegate.OnStored(typeof(T), storaeOperationType, 0, ex);

// retry
await StoreAsync<T>(storaeOperationType, models);
}
else
{
// notify delegate
if (_delegate != null)
_delegate.OnStored(typeof(T), storaeOperationType, 0, ex);

throw ex;
}
}
ExceptionDispatchInfo.Capture(ex).Throw();
}
}

public async Task DeleteAsync<T>(T model) where T: new()
Expand Down

0 comments on commit d89fded

Please sign in to comment.