diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Abstractions/IStorageContext.cs b/CoreHelpers.WindowsAzure.Storage.Table.Abstractions/IStorageContext.cs index 0e55a16..c9e14e0 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Abstractions/IStorageContext.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table.Abstractions/IStorageContext.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Reflection; using System.Threading.Tasks; using CoreHelpers.WindowsAzure.Storage.Table.Abstractions; @@ -20,6 +21,8 @@ public interface IStorageContext : IDisposable void AddAttributeMapper(Type type); + void AddAttributeMapper(Type type, String optionalTablenameOverride); + void AddEntityMapper(Type entityType, String partitionKeyFormat, String rowKeyFormat, String tableName); IStorageContext CreateChildContext(); @@ -70,5 +73,7 @@ Task> QueryAsync(string partitionKey, IEnumerable Task> QueryTableList(); Task ExportToJsonAsync(string tableName, TextWriter writer, Action onOperation); + + Task ImportFromJsonAsync(string tableName, StreamReader reader, Action onOperation); } } \ No newline at end of file diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS016BackupRestore.cs b/CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS016ExportToJson.cs similarity index 87% rename from CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS016BackupRestore.cs rename to CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS016ExportToJson.cs index aca1cdc..51a7984 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS016BackupRestore.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS016ExportToJson.cs @@ -8,20 +8,21 @@ namespace CoreHelpers.WindowsAzure.Storage.Table.Tests { [Startup(typeof(Startup))] [Collection("Sequential")] - public class ITS016BackupRestore - { - private readonly ITestEnvironment env; + public class ITS016ExportToJson + { + private readonly IStorageContext _rootContext; - public ITS016BackupRestore(ITestEnvironment env) + public ITS016ExportToJson(IStorageContext context) { - this.env = env; + _rootContext = context; + } [Fact] public async Task VerifyExportToJson() { // Export Table - using (var storageContext = new StorageContext(env.ConnectionString)) + using (var storageContext = _rootContext.CreateChildContext()) { // set the tablename context storageContext.SetTableContext(); diff --git a/CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS017ImportFromJson.cs b/CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS017ImportFromJson.cs new file mode 100644 index 0000000..f70ca67 --- /dev/null +++ b/CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS017ImportFromJson.cs @@ -0,0 +1,63 @@ +using System.Text; +using CoreHelpers.WindowsAzure.Storage.Table.Tests.Contracts; +using CoreHelpers.WindowsAzure.Storage.Table.Tests.Extensions; +using CoreHelpers.WindowsAzure.Storage.Table.Tests.Models; +using Newtonsoft.Json.Linq; +using Xunit.DependencyInjection; + +namespace CoreHelpers.WindowsAzure.Storage.Table.Tests +{ + [Startup(typeof(Startup))] + [Collection("Sequential")] + public class ITS017ImportFromJson + { + private readonly IStorageContext _rootContext; + + public ITS017ImportFromJson(IStorageContext context) + { + _rootContext = context; + } + + [Fact] + public async Task VerifyImportFromJson() + { + await Task.CompletedTask; + + // Import Table + using (var storageContext = _rootContext.CreateChildContext()) + { + // set the tablename context + storageContext.SetTableContext(); + + // ensure we have a model registered in the correct table + var tableName1 = $"BU".Replace("-", ""); + storageContext.AddAttributeMapper(typeof(DemoModel2), tableName1); + + // define the import data + var staticExportData = "[{\"RowKey\":\"2\",\"PartitionKey\":\"1\",\"Properties\":[{\"PropertyName\":\"P\",\"PropertyType\":0,\"PropertyValue\":\"1\"},{\"PropertyName\":\"R\",\"PropertyType\":0,\"PropertyValue\":\"2\"}]}]"; + var staticExportDataStream = new MemoryStream(Encoding.UTF8.GetBytes(staticExportData ?? "")); + + // check if we have an empty tabel before import + Assert.Empty(await storageContext.EnableAutoCreateTable().Query().Now()); + + // open the data stream + using (var streamReader = new StreamReader(staticExportDataStream)) + { + // read the data + await storageContext.ImportFromJsonAsync(tableName1, streamReader, (ImportExportOperation) => { }); + } + + // check if we have the dara correctly imported + Assert.Single(await storageContext.Query().Now()); + + // get the data + var data = await storageContext.Query().Now(); + Assert.Equal("1", data.First().P); + Assert.Equal("2", data.First().R); + + // drop table + await storageContext.DropTableAsync(); + } + } + } +} diff --git a/CoreHelpers.WindowsAzure.Storage.Table/CoreHelpers.WindowsAzure.Storage.Table.csproj b/CoreHelpers.WindowsAzure.Storage.Table/CoreHelpers.WindowsAzure.Storage.Table.csproj index e580092..95c5753 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table/CoreHelpers.WindowsAzure.Storage.Table.csproj +++ b/CoreHelpers.WindowsAzure.Storage.Table/CoreHelpers.WindowsAzure.Storage.Table.csproj @@ -41,7 +41,6 @@ - diff --git a/CoreHelpers.WindowsAzure.Storage.Table/Extensions/TypeExtensions.cs b/CoreHelpers.WindowsAzure.Storage.Table/Extensions/TypeExtensions.cs new file mode 100644 index 0000000..e0d9693 --- /dev/null +++ b/CoreHelpers.WindowsAzure.Storage.Table/Extensions/TypeExtensions.cs @@ -0,0 +1,44 @@ +using System; +namespace CoreHelpers.WindowsAzure.Storage.Table.Extensions +{ + public enum ExportEdmType + { + String, + Binary, + Boolean, + DateTime, + Double, + Guid, + Int32, + Int64 + } + + public static class TypeExtensions + { + public static ExportEdmType GetEdmPropertyType(this Type type) + { + if (type == typeof(string)) + return ExportEdmType.String; + else if (type == typeof(byte[])) + return ExportEdmType.Binary; + else if (type == typeof(Boolean) || type == typeof(bool)) + return ExportEdmType.Boolean; + else if (type == typeof(DateTime) || type == typeof(DateTimeOffset)) + return ExportEdmType.DateTime; + else if (type == typeof(Double)) + return ExportEdmType.Double; + else if (type == typeof(Guid)) + return ExportEdmType.Guid; + else if (type == typeof(Int32) || type == typeof(int)) + return ExportEdmType.Int32; + else if (type == typeof(Int64)) + return ExportEdmType.Int64; + else + throw new NotImplementedException($"Datatype {type.ToString()} not supporter"); + } + } +} + + + + diff --git a/CoreHelpers.WindowsAzure.Storage.Table/Services/DataImportService.cs b/CoreHelpers.WindowsAzure.Storage.Table/Services/DataImportService.cs deleted file mode 100755 index 9dd9df2..0000000 --- a/CoreHelpers.WindowsAzure.Storage.Table/Services/DataImportService.cs +++ /dev/null @@ -1,131 +0,0 @@ -using CoreHelpers.WindowsAzure.Storage.Table.Models; -using Newtonsoft.Json; -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; - -namespace CoreHelpers.WindowsAzure.Storage.Table.Services -{ - /*internal class DataImportService : DataService - { - public const string TableName = "CoreHelpersTableImportLogs"; - - public DataImportService(StorageContext storageContext) - : base(storageContext) - { - } - - public async Task ImportFromJsonStreamAsync(string tableName, StreamReader streamReader, Action progress = null) { - - // ensure table exists - var targetTable = storageContext.RequestTableReference(tableName); - if (!await targetTable.ExistsAsync()) - await CreateAzureTableAsync(targetTable); - - // store the entities by partition key - var entityStore = new Dictionary>(); - - // parse - JsonSerializer serializer = new JsonSerializer(); - using (JsonReader reader = new JsonTextReader(streamReader)) - { - while (reader.Read()) - { - // deserialize only when there's "{" character in the stream - if (reader.TokenType == JsonToken.StartObject) - { - // get the data model - var currentModel = serializer.Deserialize(reader); - - foreach(var property in currentModel.Properties) { - if ((EdmType)property.PropertyType == EdmType.String && property.PropertyValue is DateTime) { - property.PropertyValue = ((DateTime)property.PropertyValue).ToString("o"); - } - } - // convert to table entity - var tableEntity = GetTableEntity(currentModel); - - // add to the right store - if (!entityStore.ContainsKey(tableEntity.PartitionKey)) - entityStore.Add(tableEntity.PartitionKey, new List()); - - // add the entity - entityStore[tableEntity.PartitionKey].Add(tableEntity); - - // check if we need to offload this table - if (entityStore[tableEntity.PartitionKey].Count == 100) { - - // restoring - await RestorePageAsync(targetTable, entityStore[tableEntity.PartitionKey], progress); - - // clear - entityStore.Remove(tableEntity.PartitionKey); - } - } - } - - // post processing - foreach(var kvp in entityStore) { - - // restoring - await RestorePageAsync(targetTable, kvp.Value, progress); - } - } - } - - private DynamicTableEntity GetTableEntity(ImportExportTableEntity data) - { - var azureEntity = new DynamicTableEntity(); - - azureEntity.PartitionKey = data.PartitionKey; - azureEntity.RowKey = data.RowKey; - - foreach (var prop in data.Properties) - { - var propertyValueType = (EdmType)prop.PropertyType; - var propertyValue = GenerateProperty(propertyValueType, prop.PropertyValue); - azureEntity.Properties.Add(new KeyValuePair(prop.PropertyName, propertyValue)); - } - return azureEntity; - } - - private async Task RestorePageAsync(CloudTable tableReference, IEnumerable models, Action progress) - { - // check that the list is small enough - if (models.Count() > TableConstants.TableServiceBatchMaximumOperations) - throw new Exception("Entity Page is to big"); - - // Create the batch - var currentBatch = new TableBatchOperation(); - - // add models - foreach (var entity in models) - currentBatch.InsertOrReplace(entity); - - // notify - progress?.Invoke(currentBatch.Count); - - // insert - await tableReference.ExecuteBatchAsync(currentBatch); - } - - private async Task CreateAzureTableAsync(CloudTable table) - { - try - { - await table.CreateAsync(); - } - catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == 409) - { - await Task.Delay(20000); - await CreateAzureTableAsync(table); - } - catch (Exception) - { - throw; - } - } - }*/ -} \ No newline at end of file diff --git a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs index 89f903a..47d9b5d 100644 --- a/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs +++ b/CoreHelpers.WindowsAzure.Storage.Table/StorageContext.cs @@ -4,9 +4,7 @@ using System.Linq; using System.Threading.Tasks; using CoreHelpers.WindowsAzure.Storage.Table.Attributes; -using System.IO; using CoreHelpers.WindowsAzure.Storage.Table.Extensions; -using CoreHelpers.WindowsAzure.Storage.Table.Services; using System.Runtime.ExceptionServices; using System.Text.RegularExpressions; using Azure.Data.Tables; @@ -14,12 +12,10 @@ using System.Threading; using CoreHelpers.WindowsAzure.Storage.Table.Internal; using CoreHelpers.WindowsAzure.Storage.Table.Abstractions; -using Newtonsoft.Json; -using System.Diagnostics; namespace CoreHelpers.WindowsAzure.Storage.Table { - public class StorageContext : IStorageContext + public partial class StorageContext : IStorageContext { private Dictionary _entityMapperRegistry { get; set; } = new Dictionary(); private bool _autoCreateTable { get; set; } = false; @@ -441,83 +437,6 @@ public async Task> QueryTableList() { tables.AddRange(tablePage.Values.Select(t => t.Name)); return tables; - } - - public async Task ExportToJsonAsync(string tableName, TextWriter writer, Action onOperation) - { - try - { - var tc = GetTableClient(GetTableName(tableName)); - - var existsTable = await tc.ExistsAsync(); - if (!existsTable) - throw new FileNotFoundException($"Table '{tableName}' does not exist"); - - // build the json writer - JsonWriter wr = new JsonTextWriter(writer); - - // prepare the array in result - wr.WriteStartArray(); - - // enumerate all items from a table - var tablePages = tc.QueryAsync().AsPages(); - - // do the backup - await foreach (var page in tablePages) - { - if (onOperation!= null) - onOperation(ImportExportOperation.processingPage); - - foreach (var entity in page.Values) - { - if (onOperation != null) - onOperation(ImportExportOperation.processingItem); - - wr.WriteStartObject(); - wr.WritePropertyName(TableConstants.RowKey); - wr.WriteValue(entity.RowKey); - wr.WritePropertyName(TableConstants.PartitionKey); - wr.WriteValue(entity.PartitionKey); - wr.WritePropertyName(TableConstants.Properties); - wr.WriteStartArray(); - foreach (var propertyKvp in entity) - { - if (propertyKvp.Key.Equals(TableConstants.PartitionKey) || propertyKvp.Key.Equals(TableConstants.RowKey) || propertyKvp.Key.Equals("odata.etag") || propertyKvp.Key.Equals(TableConstants.Timestamp)) - continue; - - wr.WriteStartObject(); - wr.WritePropertyName(TableConstants.PropertyName); - wr.WriteValue(propertyKvp.Key); - wr.WritePropertyName(TableConstants.PropertyType); - wr.WriteValue(propertyKvp.Value.GetType().ToString()); - wr.WritePropertyName(TableConstants.PropertyValue); - wr.WriteValue(propertyKvp.Value); - wr.WriteEndObject(); - } - wr.WriteEnd(); - wr.WriteEndObject(); - } - - if (onOperation != null) - onOperation(ImportExportOperation.processedPage); - } - - // finishe the export - wr.WriteEnd(); - wr.Flush(); - } - catch (Exception) - { - throw; - } - } - - public async Task ImportFromJsonAsync(string tableName, StreamReader reader) - { - /*var importer = new DataImportService(this); - await importer.ImportFromJsonStreamAsync(tableName, reader);*/ - await Task.CompletedTask; - throw new NotImplementedException(); - } + } } } diff --git a/CoreHelpers.WindowsAzure.Storage.Table/StorageContextImportExport.cs b/CoreHelpers.WindowsAzure.Storage.Table/StorageContextImportExport.cs new file mode 100644 index 0000000..7297c91 --- /dev/null +++ b/CoreHelpers.WindowsAzure.Storage.Table/StorageContextImportExport.cs @@ -0,0 +1,166 @@ +using System; +using Azure.Data.Tables; +using CoreHelpers.WindowsAzure.Storage.Table.Extensions; +using Newtonsoft.Json; +using System.IO; +using System.Threading.Tasks; +using CoreHelpers.WindowsAzure.Storage.Table.Models; +using System.Collections.Generic; +using System.Data; +using System.Diagnostics; +using CoreHelpers.WindowsAzure.Storage.Table.Serialization; +using System.Linq; + +namespace CoreHelpers.WindowsAzure.Storage.Table +{ + public partial class StorageContext : IStorageContext + { + public async Task ExportToJsonAsync(string tableName, TextWriter writer, Action onOperation) + { + try + { + var tc = GetTableClient(GetTableName(tableName)); + + var existsTable = await tc.ExistsAsync(); + if (!existsTable) + throw new FileNotFoundException($"Table '{tableName}' does not exist"); + + // build the json writer + JsonWriter wr = new JsonTextWriter(writer); + + // prepare the array in result + wr.WriteStartArray(); + + // enumerate all items from a table + var tablePages = tc.QueryAsync().AsPages(); + + // do the backup + await foreach (var page in tablePages) + { + if (onOperation != null) + onOperation(ImportExportOperation.processingPage); + + foreach (var entity in page.Values) + { + if (onOperation != null) + onOperation(ImportExportOperation.processingItem); + + wr.WriteStartObject(); + wr.WritePropertyName(TableConstants.RowKey); + wr.WriteValue(entity.RowKey); + wr.WritePropertyName(TableConstants.PartitionKey); + wr.WriteValue(entity.PartitionKey); + wr.WritePropertyName(TableConstants.Properties); + wr.WriteStartArray(); + foreach (var propertyKvp in entity) + { + if (propertyKvp.Key.Equals(TableConstants.PartitionKey) || propertyKvp.Key.Equals(TableConstants.RowKey) || propertyKvp.Key.Equals("odata.etag") || propertyKvp.Key.Equals(TableConstants.Timestamp)) + continue; + + wr.WriteStartObject(); + wr.WritePropertyName(TableConstants.PropertyName); + wr.WriteValue(propertyKvp.Key); + wr.WritePropertyName(TableConstants.PropertyType); + wr.WriteValue(propertyKvp.Value.GetType().GetEdmPropertyType()); + wr.WritePropertyName(TableConstants.PropertyValue); + wr.WriteValue(propertyKvp.Value); + wr.WriteEndObject(); + } + wr.WriteEnd(); + wr.WriteEndObject(); + } + + if (onOperation != null) + onOperation(ImportExportOperation.processedPage); + } + + // finishe the export + wr.WriteEnd(); + wr.Flush(); + } + catch (Exception) + { + throw; + } + } + + public async Task ImportFromJsonAsync(string tableName, StreamReader reader, Action onOperation) + { + // get the tableclient + var tc = GetTableClient(GetTableName(tableName)); + + // ensure table exists + if (!await tc.ExistsAsync()) + await tc.CreateAsync(); + + // store the entities by partition key + var entityStore = new Dictionary>(); + + // parse + JsonSerializer serializer = new JsonSerializer(); + using (var jsonReader = new JsonTextReader(reader)) + { + while (jsonReader.Read()) + { + // deserialize only when there's "{" character in the stream + if (jsonReader.TokenType == JsonToken.StartObject) + { + // get the data model + var currentModel = serializer.Deserialize(jsonReader); + + foreach (var property in currentModel.Properties) + { + if ((ExportEdmType)property.PropertyType == ExportEdmType.String && property.PropertyValue is DateTime) + { + property.PropertyValue = ((DateTime)property.PropertyValue).ToString("o"); + } + } + + // convert to table entity + var tableEntity = GetTableEntity(currentModel); + + // add to the right store + if (!entityStore.ContainsKey(tableEntity.PartitionKey)) + entityStore.Add(tableEntity.PartitionKey, new List()); + + // add the entity + entityStore[tableEntity.PartitionKey].Add(tableEntity); + + // check if we need to offload this table + if (entityStore[tableEntity.PartitionKey].Count == 100) + { + // insert the partition + await tc.SubmitTransactionAsync(entityStore[tableEntity.PartitionKey].Select(e => new TableTransactionAction(TableTransactionActionType.UpsertReplace, e))); + + // clear + entityStore.Remove(tableEntity.PartitionKey); + } + } + } + + // post processing + foreach (var kvp in entityStore) + { + // insert the partition + await tc.SubmitTransactionAsync(kvp.Value.Select(e => new TableTransactionAction(TableTransactionActionType.UpsertReplace, e))); + } + } + } + + private TableEntity GetTableEntity(ImportExportTableEntity data) + { + var teBuilder = new TableEntityBuilder(); + + teBuilder.AddPartitionKey(data.PartitionKey); + teBuilder.AddRowKey(data.RowKey); + + foreach (var prop in data.Properties) + { + teBuilder.AddProperty(prop.PropertyName, prop.PropertyValue); + } + + return teBuilder.Build(); + } + } +} +