Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-979109: Support directories with GET and PUT of stages - NODE.JS #737

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ function AzureUtil(azure, filestream) {
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);

const containerClient = client.getContainerClient(azureLocation.containerName);
const blobClient = containerClient.getBlobClient(azureLocation.path + filename);
const blobClient = containerClient.getBlobClient(azureLocation.path + meta['subDirectory'] + filename);

let blobDetails;

Expand Down Expand Up @@ -190,7 +190,7 @@ function AzureUtil(azure, filestream) {
const stageInfo = meta['stageInfo'];
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['dstFileName'];
const blobName = azureLocation.path + meta['subDirectory'] + meta['dstFileName'];

const containerClient = client.getContainerClient(azureLocation.containerName);
const blockBlobClient = containerClient.getBlockBlobClient(blobName);
Expand Down Expand Up @@ -232,7 +232,7 @@ function AzureUtil(azure, filestream) {
const stageInfo = meta['stageInfo'];
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['srcFileName'];
const blobName = azureLocation.path + meta['subDirectory'] + meta['srcFileName'];

const containerClient = client.getContainerClient(azureLocation.containerName);
const blockBlobClient = containerClient.getBlockBlobClient(blobName);
Expand Down
60 changes: 53 additions & 7 deletions lib/file_transfer_agent/file_transfer_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ function FileTransferAgent(context) {
dstFileSize = meta['dstFileSize'];

rowset.push([
meta['dstFileName'],
meta['subDirectory'] + meta['dstFileName'],
dstFileSize,
meta['resultStatus'],
errorDetails
Expand Down Expand Up @@ -633,6 +633,10 @@ function FileTransferAgent(context) {
// Full path name of the file
const fileNameFullPath = path.join(root, fileName);

if (fileName.includes('**')) {
filesToPut = getAllFilesInfo(root, fileName, true);
}

// If file name has a wildcard
if (fileName.includes('*')) {
// Get all file names that matches the wildcard
Expand All @@ -647,8 +651,9 @@ function FileTransferAgent(context) {
currFileObj['srcFilePath'] = matchingFileName;
currFileObj['srcFileSize'] = fileInfo.size;

filesToPut.push(currFileObj);
}
// If file name has a wildcard
else if (fileName.includes('*')) {
filesToPut = getAllFilesInfo(root, fileName, false);
} else {
// No wildcard, get single file
if (fs.existsSync(root)) {
Expand Down Expand Up @@ -734,17 +739,20 @@ function FileTransferAgent(context) {
currFileObj['srcFilePath'] = file['srcFilePath'];
currFileObj['srcFileName'] = file['srcFileName'];
currFileObj['srcFileSize'] = file['srcFileSize'];
currFileObj['subDirectory'] = file['subDirectory'];
currFileObj['stageLocationType'] = stageLocationType;
currFileObj['stageInfo'] = stageInfo;
currFileObj['overwrite'] = overwrite;

fileMetadata.push(currFileObj);
}
} else if (commandType === CMD_TYPE_DOWNLOAD) {
for (const fileName of srcFiles) {
const currFileObj = {};
currFileObj['srcFileName'] = fileName;
currFileObj['dstFileName'] = fileName;

for (var fileName of srcFiles) {
var currFileObj = {};
currFileObj['srcFileName'] = path.basename(fileName);
currFileObj['dstFileName'] = path.basename(fileName);
currFileObj['subDirectory'] = (path.dirname(fileName) !== '.' ? path.dirname(fileName) + '/' : '');
currFileObj['stageLocationType'] = stageLocationType;
currFileObj['stageInfo'] = stageInfo;
currFileObj['useAccelerateEndpoint'] = useAccelerateEndpoint;
Expand Down Expand Up @@ -853,6 +861,44 @@ function FileTransferAgent(context) {
}
}
}

function getAllFilesInfo(dir, fileName, includeSubDir, mainDir = '') {
const fileInfos = [];
const subDirectories = [];
const matchingFileNames = glob.sync(path.join(dir, fileName + '*'));

for (const matchingFileName of matchingFileNames) {
initEncryptionMaterial();

const fileInfo = fs.statSync(matchingFileName);
fileInfo.isDirectory() ? subDirectories.push(matchingFileName) : fileInfos.push(getFileObject(matchingFileName, mainDir));
}

if (includeSubDir) {
for (const subDir of subDirectories) {
fileInfos.push(...getAllFilesInfo(subDir, fileName, includeSubDir, mainDir !== '' ? mainDir : dir));
}
}

return fileInfos;
}

function getFileObject(dir, mainDir) {
const fileInfo = fs.statSync(dir);
const currFileObj = {};
currFileObj['srcFileName'] = dir.substring(dir.lastIndexOf('/') + 1);
currFileObj['srcFilePath'] = dir;
currFileObj['subDirectory'] = mainDir.length !== 0 ? getSubDirectory(dir, mainDir) : mainDir;
currFileObj['srcFileSize'] = fileInfo.size;

return currFileObj;
}

function getSubDirectory(dir, root) {
const rootDriectory = glob.sync(path.join(root, '/'))[0];
const subDir = dir.substring(0, dir.lastIndexOf('/'));
return subDir.split(rootDriectory)[1] + '/';
}
}

//TODO SNOW-992387: Create a function to renew expired client
Expand Down
8 changes: 4 additions & 4 deletions lib/file_transfer_agent/gcs_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ function GCSUtil(httpclient, filestream) {

const metadata = await meta['client'].gcsClient
.bucket(gcsLocation.bucketName)
.file(gcsLocation.path + filename)
.file(gcsLocation.path + meta['subDirectory'] + filename)
.getMetadata();

digest = metadata[0].metadata[SFC_DIGEST];
Expand Down Expand Up @@ -282,7 +282,7 @@ function GCSUtil(httpclient, filestream) {

await meta['client'].gcsClient
.bucket(gcsLocation.bucketName)
.file(gcsLocation.path + meta['dstFileName'])
.file(gcsLocation.path + meta['subDirectory'] + meta['dstFileName'])
.save(fileStream, {
resumable: false,
metadata: {
Expand Down Expand Up @@ -355,14 +355,14 @@ function GCSUtil(httpclient, filestream) {

await meta['client'].gcsClient
.bucket(gcsLocation.bucketName)
.file(gcsLocation.path + meta['srcFileName'])
.file(gcsLocation.path + meta['subDirectory'] + meta['srcFileName'])
.download({
destination: fullDstPath
});

const metadata = await meta['client'].gcsClient
.bucket(gcsLocation.bucketName)
.file(gcsLocation.path + meta['srcFileName'])
.file(gcsLocation.path + meta['subDirectory'] + meta['srcFileName'])
.getMetadata();

encryptionDataprop = metadata[0].metadata[ENCRYPTIONDATAPROP];
Expand Down
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/remote_storage_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ function RemoteStorageUtil() {
if (err) {
reject(err);
}
fullDstPath = path.join(basePath, path.basename(meta['dstFileName']));
fullDstPath = path.join(basePath, meta['subDirectory'], path.basename(meta['dstFileName']));
resolve();
});
});
Expand All @@ -296,7 +296,7 @@ function RemoteStorageUtil() {
await new Promise((resolve) => {
fs.exists(baseDir, (exists) => {
if (!exists) {
fs.mkdir(baseDir, () => {
fs.mkdir(baseDir, { recursive: true }, () => {
resolve();
});
} else {
Expand Down Expand Up @@ -413,4 +413,4 @@ function RemoteStorageUtil() {
};
}

exports.RemoteStorageUtil = RemoteStorageUtil;
exports.RemoteStorageUtil = RemoteStorageUtil;
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/s3_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ function S3Util(s3, filestream) {

const params = {
Bucket: s3location.bucketName,
Key: s3location.s3path + filename
Key: s3location.s3path + meta['subDirectory'] + filename
};

let akey;
Expand Down Expand Up @@ -210,7 +210,7 @@ function S3Util(s3, filestream) {
const params = {
Bucket: s3location.bucketName,
Body: fileStream,
Key: s3location.s3path + meta['dstFileName'],
Key: s3location.s3path + meta['subDirectory'] + meta['dstFileName'],
Metadata: s3Metadata
};

Expand Down Expand Up @@ -249,7 +249,7 @@ function S3Util(s3, filestream) {

const params = {
Bucket: s3location.bucketName,
Key: s3location.s3path + meta['dstFileName'],
Key: s3location.s3path + meta['subDirectory'] + meta['dstFileName'],
};

// call S3 to download file to specified bucket
Expand Down
98 changes: 94 additions & 4 deletions test/integration/testPutGet.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
const ROW_DATA_OVERWRITE_SIZE = 19;

function getPlatformTmpPath(tmpPath) {
let path = `file://${tmpPath}`;
let location = `file://${tmpPath}`;
// Windows user contains a '~' in the path which causes an error
if (process.platform === 'win32') {
const fileName = tmpPath.substring(tmpPath.lastIndexOf('\\'));
path = `file://${process.env.USERPROFILE}\\AppData\\Local\\Temp\\${fileName}`;
// const fileName = tmpPath.substring(tmpPath.lastIndexOf('\\'));
location = `file://${process.env.USERPROFILE}\\AppData\\Local\\Temp\\${path.basename(tmpPath)}`;
}
return path;
return location;
}

function executePutCmd(connection, putQuery, callback, results) {
Expand Down Expand Up @@ -840,6 +840,96 @@
);
});

it('testUploadMultifiles with sub directories', function (done) {
const count = 6;
const results = {};

const tmpdirPath = testUtil.createTestingDirectoryInTemp(crypto.randomUUID());
const directory = getPlatformTmpPath(tmpdirPath);
const getQuery = `GET ${stage} ${directory}`;

const tempDir = [];
const testingDir = path.join(crypto.randomUUID());
const subDirectory = path.join(testingDir, crypto.randomUUID());
const subDirectory2 = path.join(testingDir, crypto.randomUUID(), crypto.randomUUID());

[testingDir, subDirectory, subDirectory2].forEach((dir) => {
const location = testUtil.createTestingDirectoryInTemp(dir);

Check failure on line 857 in test/integration/testPutGet.js

View workflow job for this annotation

GitHub Actions / Run lint

Expected indentation of 6 spaces but found 5
tempDir.push(location);

Check failure on line 858 in test/integration/testPutGet.js

View workflow job for this annotation

GitHub Actions / Run lint

Expected indentation of 6 spaces but found 5
})

Check failure on line 859 in test/integration/testPutGet.js

View workflow job for this annotation

GitHub Actions / Run lint

Missing semicolon

for (const directory of tempDir) {
for (let k = 0; k < 2; k++) {
testUtil.createTempFile(directory, testUtil.createRandomFileName({ prefix: 'uploadTesting' }), ROW_DATA);
}
}

let putQuery = `PUT file://${os.tmpdir()}/${testingDir}/** ${stage}`;
// Windows user contains a '~' in the path which causes an error
if (process.platform === 'win32') {
putQuery = `PUT file://${process.env.USERPROFILE}\\AppData\\Local\\Temp\\${testingDir}\\** ${stage}`;
}

const testResult = [];

async.series(
[
function (callback) {
executePutCmd(connection, putQuery, callback, results);
},
function (callback) {
// Run GET command
connection.execute({
sqlText: getQuery,
streamResult: true,
complete: function (err, stmt) {
if (err) {
callback(err);
} else {
const stream = stmt.streamRows();
stream.on('error', function (err) {
callback(err);
});
stream.on('data', function (row) {
assert.strictEqual(row.status, DOWNLOADED);
assert.strictEqual(row.size, results.fileSize);

// Decompress the downloaded file
const compressedFile = path.join(tmpdirPath, row.file);
const decompressedFile = path.join(tmpdirPath, path.dirname(row.file), 'de-' + path.basename(row.file));
const fileContents = fs.createReadStream(compressedFile);
const writeStream = fs.createWriteStream(decompressedFile);
const unzip = zlib.createGunzip();

fileContents.pipe(unzip).pipe(writeStream).on('finish', function () {
// Verify the data of the downloaded file
// this callback is called asynchronously so we gather results and in stream end we check if all files are correct
const data = fs.readFileSync(decompressedFile).toString();
try {
assert.strictEqual(data, ROW_DATA);
testResult.push(true);
} catch (e) {
testResult.push(e);
}
});
});
stream.on('end', function () {
expectArrayToBeFinallyFilledWithTrue(count, testResult, callback);
});
}
}
});
},
function (callback) {
testUtil.deleteFolderSyncIgnoringErrors(tmpdirPath);
testUtil.deleteFolderSyncIgnoringErrors(tempDir[0]);
callback();
}
],
done,
);
});

/**
* @param expectedResultSize `number` expected result size
* @param testResult `any[]` array with gathered results
Expand Down
21 changes: 21 additions & 0 deletions test/integration/testUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const connOptions = require('./connectionOptions');
const assert = require('assert');
const fs = require('fs');
const crypto = require('crypto');
const os = require('os');
const path = require('path');
const Logger = require('../../lib/logger');

module.exports.createConnection = function (validConnectionOptionsOverride = {}) {
Expand Down Expand Up @@ -266,4 +268,23 @@ module.exports.randomizeName = function (name) {
module.exports.assertLogMessage = function (expectedLevel, expectedMessage, actualMessage) {
const regexPattern = `^{"level":"${expectedLevel}","message":"\\[.*\\]: ${expectedMessage}`;
return assert.match(actualMessage, new RegExp(regexPattern));
};

module.exports.createTestingDirectoryInTemp = function (directory) {
const tempDir = path.join(os.tmpdir(), directory);
fs.mkdirSync(tempDir, { recursive: true });

return tempDir;
};

module.exports.createTempFile = function (directorty, fileName, data) {
const fullpath = path.join(directorty, fileName);
fs.writeFileSync(fullpath, data);
return fullpath;
};

module.exports.createRandomFileName = function ({ prefix, postfix, extension }) {
const randomName = crypto.randomUUID();
const fileName = `${prefix || ''}${randomName}${postfix || ''}${extension || ''}`;
return fileName;
};
Loading