Skip to content

Commit

Permalink
Added dedicated backup restore module including verification
Browse files Browse the repository at this point in the history
  • Loading branch information
dei79 committed Sep 10, 2022
1 parent 4e1d593 commit 06c9ae6
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Task<IEnumerable<T>> QueryAsync<T>(string partitionKey, IEnumerable<QueryFilter>

void SetTableNamePrefix(string tableNamePrefix);

string GetTableNamePrefix();

void OverrideTableName<T>(string table) where T : class, new();

Task MergeOrInsertAsync<T>(IEnumerable<T> models) where T : class, new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace CoreHelpers.WindowsAzure.Storage.Table.Backup.Abstractions
{
public interface IRestoreContext : IDisposable
{
// Task BackupTable(IStorageContext storageContext, string tableName, bool compress = true);
Task Restore(IStorageContext storageContext, string[] excludedTables = null);
}
}

22 changes: 17 additions & 5 deletions CoreHelpers.WindowsAzure.Storage.Table.Backup/BackupContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,17 @@ public async Task Backup(IStorageContext storageContext, string[] excludedTables
_logger.LogInformation($"Statsfile is under {memoryStatsFile}...");
statsFile.WriteLine($"TableName,PageCounter,ItemCount,MemoryFootprint");

// calculate the real prefix
var effectiveTableNamePrefix = String.IsNullOrEmpty(_targetTableNamePrefix) ? "" : _targetTableNamePrefix;
if (!String.IsNullOrEmpty(storageContext.GetTableNamePrefix()))
effectiveTableNamePrefix = $"{storageContext.GetTableNamePrefix()}{_targetTableNamePrefix}";

// visit every table
foreach (var tableName in tables)
{

// filter the table prefix
if (!String.IsNullOrEmpty(_targetTableNamePrefix) && !tableName.StartsWith(_targetTableNamePrefix, StringComparison.CurrentCulture))
if (!String.IsNullOrEmpty(effectiveTableNamePrefix) && !tableName.StartsWith(effectiveTableNamePrefix, StringComparison.CurrentCulture))
{
_logger.LogInformation($"Ignoring table {tableName}...");
continue;
Expand All @@ -80,9 +85,10 @@ public async Task Backup(IStorageContext storageContext, string[] excludedTables
}

using (_logger.BeginScope($"Processing backup for table {tableName}..."))
{
{
// do the backup
var fileName = $"{tableName}.json";
var prefixLessTableName = tableName.Remove(0, effectiveTableNamePrefix.Length);
var fileName = $"{prefixLessTableName}.json";
if (!string.IsNullOrEmpty(_targetPath)) { fileName = $"{_targetPath}/{fileName}"; }
if (compress) { fileName += ".gz"; }

Expand All @@ -96,7 +102,7 @@ public async Task Backup(IStorageContext storageContext, string[] excludedTables
_logger.LogInformation($"Writing backup to non compressed file");

// do it
using (var backupFileStream = await blobClient.OpenWriteAsync(false))
using (var backupFileStream = await blobClient.OpenWriteAsync(true))
{
using (var contentWriter = new ZippedStreamWriter(backupFileStream, compress))
{
Expand All @@ -105,7 +111,13 @@ public async Task Backup(IStorageContext storageContext, string[] excludedTables

var pageLogScope = default(IDisposable);

await storageContext.ExportToJsonAsync(tableName, contentWriter, (ImportExportOperation operation) =>
// get the effective tablename
var effectiveTableName = tableName;
if (!String.IsNullOrEmpty(storageContext.GetTableNamePrefix()))
effectiveTableName = effectiveTableName.Remove(0, storageContext.GetTableNamePrefix().Length);

// start export
await storageContext.ExportToJsonAsync(effectiveTableName, contentWriter, (ImportExportOperation operation) =>
{
switch (operation)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ public async Task<IBackupContext> OpenBackupContext(string targetBlobStorageConn
public async Task<IRestoreContext> OpenRestorContext(string sourceBlobStorageConnectionString, string sourceContainerName, string sourcePath, string tableNamePrefix = null)
{
await Task.CompletedTask;
return new RestoreContext();
return new RestoreContext(
_loggerFactory.CreateLogger<RestoreContext>(),
sourceBlobStorageConnectionString, sourceContainerName, sourcePath,
tableNamePrefix);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
</PropertyGroup>


<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<LangVersion>default</LangVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<LangVersion>default</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\CoreHelpers.WindowsAzure.Storage.Table.Backup.Abstractions\CoreHelpers.WindowsAzure.Storage.Table.Backup.Abstractions.csproj" />
</ItemGroup>
Expand Down
156 changes: 86 additions & 70 deletions CoreHelpers.WindowsAzure.Storage.Table.Backup/RestoreContext.cs
Original file line number Diff line number Diff line change
@@ -1,93 +1,109 @@
using System;
using System.ComponentModel;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using CoreHelpers.WindowsAzure.Storage.Table.Backup.Abstractions;
using Microsoft.Extensions.Logging;

namespace CoreHelpers.WindowsAzure.Storage.Table.Backup
{
public class RestoreContext : IRestoreContext
{
public RestoreContext()
private ILogger<RestoreContext> _logger;
private BlobServiceClient _blobServiceClient;

private string _sourceConnectionString;
private string _sourceContainer;
private string _sourcePath;
private string _sourceTableNamePrefix;

public RestoreContext(ILogger<RestoreContext> logger, string connectionString, string container, string path, string tableNamePrefix)
{
_logger = logger;
_blobServiceClient = new BlobServiceClient(connectionString);

_sourceConnectionString = connectionString;
_sourceContainer = container;
_sourcePath = path;
_sourceTableNamePrefix = tableNamePrefix;
}

public void Dispose()
{
throw new NotImplementedException();
{
}
}
}

/*
* public async Task Restore(string containerName, string srcPath, string tablePrefix = null) {
// log
storageLogger.LogInformation($"Starting restore procedure...");

// get all backup files
var blobClient = backupStorageAccount.CreateCloudBlobClient();
var containerReference = blobClient.GetContainerReference(containerName.ToLower());
// check if the container exists
if (!await containerReference.ExistsAsync()) {
storageLogger.LogInformation($"Missing container {containerName.ToLower()}");
return;
}
// build the path including prefix
storageLogger.LogInformation($"Search Prefix is {srcPath}");
// track the state
var continuationToken = default(BlobContinuationToken);
do
public async Task Restore(IStorageContext storageContext, string[] excludedTables = null)
{
using (_logger.BeginScope("Starting restore procedure..."))
{
// get all blobs
var blobResult = await containerReference.ListBlobsSegmentedAsync(srcPath, true, BlobListingDetails.All, 1000, continuationToken, null, null);
// process every backup file as table
foreach(var blob in blobResult.Results) {
// build the name
var blobName = blob.StorageUri.PrimaryUri.AbsolutePath;
blobName = blobName.Remove(0, containerName.Length + 2);
// get the tablename
var tableName = Path.GetFileNameWithoutExtension(blobName);
var compressed = blobName.EndsWith(".gz", StringComparison.CurrentCultureIgnoreCase);
if (compressed)
tableName = Path.GetFileNameWithoutExtension(tableName);
// add the prefix
if (!String.IsNullOrEmpty(tablePrefix))
tableName = $"{tablePrefix}{tableName}";
// get the container reference
var blobContainerClient = _blobServiceClient.GetBlobContainerClient(_sourceContainer);
if (!await blobContainerClient.ExistsAsync())
throw new Exception("Container not found");

// check if the container exists
if (!await blobContainerClient.ExistsAsync())
{
_logger.LogInformation($"Missing container {_sourceContainer.ToLower()}");
return;
}

// log
storageLogger.LogInformation($"Restoring {blobName} to table {tableName} (Compressed: {compressed})");
// build the path including prefix
_logger.LogInformation($"Search Prefix is {_sourcePath}");

// build the reference
var blockBlobReference = containerReference.GetBlockBlobReference(blobName);
// get the pages
var blobPages = blobContainerClient.GetBlobsAsync(Azure.Storage.Blobs.Models.BlobTraits.None, Azure.Storage.Blobs.Models.BlobStates.None, _sourcePath).AsPages();

// open the read stream
using (var readStream = await blockBlobReference.OpenReadAsync())
// visit every page
await foreach (var page in blobPages)
{
foreach(var blob in page.Values)
{
// unzip the stream
using (var contentReader = new ZippedStreamReader(readStream, compressed))
// build the name
var blobName = blob.Name;

// get the tablename
var tableName = Path.GetFileNameWithoutExtension(blobName);
var compressed = blobName.EndsWith(".gz", StringComparison.CurrentCultureIgnoreCase);
if (compressed)
tableName = Path.GetFileNameWithoutExtension(tableName);

// add the prefix
if (!String.IsNullOrEmpty(_sourceTableNamePrefix))
tableName = $"{_sourceTableNamePrefix}{tableName}";

// log
_logger.LogInformation($"Restoring {blobName} to table {tableName} (Compressed: {compressed})");

// open the read stream
var blobClient = blobContainerClient.GetBlobClient(blob.Name);
using (var readStream = await blobClient.OpenReadAsync())
{
// import the stream
var pageCounter = 0;
await dataImportService.ImportFromJsonStreamAsync(tableName, contentReader, (c) => {
pageCounter++;
storageLogger.LogInformation($" Processing page #{pageCounter} with #{c} items...");
});
// unzip the stream
using (var contentReader = new ZippedStreamReader(readStream, compressed))
{
// import the stream
var pageCounter = 0;
await storageContext.ImportFromJsonAsync(tableName, contentReader, (c) =>
{
switch (c)
{
case ImportExportOperation.processingPage:
_logger.LogInformation($" Processing page #{pageCounter}...");
break;
case ImportExportOperation.processedPage:
pageCounter++;
break;
}
});
}
}
}
}
// proces the token
continuationToken = blobResult.ContinuationToken;
} while (continuationToken != null);
}
}

await Task.CompletedTask;
}
*/
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<WarningLevel>4</WarningLevel>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<LangVersion>default</LangVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<LangVersion>default</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.1" />
Expand All @@ -24,6 +28,8 @@
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Xunit.DependencyInjection" Version="8.5.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
Expand All @@ -33,6 +39,8 @@
<None Remove="TestEnvironments\" />
<None Remove="Extensions\" />
<None Remove="Delegates\" />
<None Remove="Microsoft.Extensions.Logging" />
<None Remove="Microsoft.Extensions.Logging.Debug" />
</ItemGroup>
<ItemGroup>
<Folder Include="Models\" />
Expand Down
86 changes: 86 additions & 0 deletions CoreHelpers.WindowsAzure.Storage.Table.Tests/ITS21VerifyBackup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using CoreHelpers.WindowsAzure.Storage.Table.Backup;
using CoreHelpers.WindowsAzure.Storage.Table.Tests.Contracts;
using CoreHelpers.WindowsAzure.Storage.Table.Tests.Extensions;
using CoreHelpers.WindowsAzure.Storage.Table.Tests.Models;
using Xunit.DependencyInjection;

namespace CoreHelpers.WindowsAzure.Storage.Table.Tests
{
[Startup(typeof(Startup))]
[Collection("Sequential")]
public class ITS21VerifyBackup
{
private readonly IStorageContext _rootContext;
private readonly IBackupService _backupService;
private readonly ITestEnvironment _testEnvironment;

public ITS21VerifyBackup(IStorageContext context, IBackupService backupService, ITestEnvironment testEnvironment)
{
_rootContext = context;
_backupService = backupService;
_testEnvironment = testEnvironment;
}

[Fact]
public async Task CreateAndVerifyBackup()
{
var containerName = $"bck{Guid.NewGuid().ToString()}".Replace("_", "");
var targetPath = $"CreateAndVerifyBackup/{Guid.NewGuid()}";

using (var scp = _rootContext.CreateChildContext())
{
// set the tablename context
scp.SetTableContext();

// configure the entity mapper
scp.AddAttributeMapper(typeof(DemoEntityQuery), "BackupDemoEntityQuery");

// verify that we have no items
Assert.Empty((await scp.EnableAutoCreateTable().Query<DemoEntityQuery>().Now()));

// create items in two different partitions
var modelsP1 = new List<DemoEntityQuery>()
{
new DemoEntityQuery() {P = "P1", R = "E1", StringField = "Demo01"},
new DemoEntityQuery() {P = "P1", R = "E2", StringField = "Demo02"},
};

await scp.EnableAutoCreateTable().MergeOrInsertAsync<DemoEntityQuery>(modelsP1);

using (var backupContext = await _backupService.OpenBackupContext(_testEnvironment.ConnectionString, containerName, targetPath, "Backup"))
{
await backupContext.Backup(scp, null, true);
}

await scp.DropTableAsync<DemoEntityQuery>();
}

using (var scp = _rootContext.CreateChildContext())
{
// set the tablename context
scp.SetTableContext();

// configure the entity mapper
scp.AddAttributeMapper(typeof(DemoEntityQuery), "BackupDemoEntityQuery");

var itemsBeforeRestore= await scp.EnableAutoCreateTable().Query<DemoEntityQuery>().Now();
Assert.Empty(itemsBeforeRestore);

// verify that we have no items
Assert.Empty((await scp.EnableAutoCreateTable().Query<DemoEntityQuery>().Now()));

// restore
using (var restoreContext = await _backupService.OpenRestorContext(_testEnvironment.ConnectionString, containerName, targetPath, "Backup"))
{
await restoreContext.Restore(scp, null);
}

// verify if we have the values
var items = await scp.EnableAutoCreateTable().Query<DemoEntityQuery>().Now();
Assert.Equal(2, items.Count());
}
}
}
}

Loading

0 comments on commit 06c9ae6

Please sign in to comment.