Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up the ETag from S3's upload response back to the BlobDTO's MD5 field, to handle multipart upload correctly #915

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,14 @@ static <T> Blob constructBlobAndMetadata(

logger.logInfo(
"Finish building chunk in blob={}, table={}, rowCount={}, startOffset={},"
+ " estimatedUncompressedSize={}, chunkLength={}, compressedSize={},"
+ " estimatedUncompressedSize={}, md5={}, chunkLength={}, compressedSize={},"
+ " encrypt={}, bdecVersion={}",
filePath,
firstChannelFlushContext.getFullyQualifiedTableName(),
serializedChunk.rowCount,
startOffset,
serializedChunk.chunkEstimatedUncompressedSize,
md5,
chunkLength,
compressedChunkDataSize,
internalParameterProvider.getEnableChunkEncryption(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -646,7 +647,14 @@ BlobMetadata upload(
long startTime = System.currentTimeMillis();

Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
storage.put(blobPath, blob);

// The returned etag is non-empty ONLY in case of iceberg uploads. With iceberg files, the XP
// scanner expects the
// MD5 value to exactly match the etag value in S3. This assumption doesn't hold when multipart
// upload kicks in,
// causing scan time failures and table corruption. By plugging in the etag value instead of the
// md5 value,
Optional<String> etag = storage.put(blobPath, blob);

if (uploadContext != null) {
blobStats.setUploadDurationMs(uploadContext);
Expand All @@ -657,16 +665,17 @@ BlobMetadata upload(
}

logger.logInfo(
"Finish uploading blob={}, size={}, timeInMillis={}",
"Finish uploading blob={}, size={}, timeInMillis={}, etag={}",
blobPath.fileRegistrationPath,
blob.length,
System.currentTimeMillis() - startTime);
System.currentTimeMillis() - startTime,
etag);

// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
// spans mixed tables or not
return BlobMetadata.createBlobMetadata(
blobPath.fileRegistrationPath,
BlobBuilder.computeMD5(blob),
etag.isPresent() ? etag.get() : BlobBuilder.computeMD5(blob),
bdecVersion,
metadata,
blobStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package net.snowflake.ingest.streaming.internal;

import java.util.Optional;

/**
* Interface that represents a storage location to which we should upload data files. It is the
* account's internal stage for snowflake tables, and the table's external volume for iceberg
Expand All @@ -15,6 +17,8 @@ interface IStorage {
*
* @param blobPath
* @param blob
* @return The String ETag returned by the upload. Can be null in situations where the underlying
* layer does not have an ETag to return.
*/
void put(BlobPath blobPath, byte[] blob);
Optional<String> put(BlobPath blobPath, byte[] blob);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.cloud.storage.StageInfo;
import net.snowflake.client.jdbc.internal.apache.commons.io.FileUtils;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.IcebergFileTransferAgent;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
Expand Down Expand Up @@ -67,6 +68,7 @@ class InternalStage implements IStorage {

// Proxy parameters that we set while calling the Snowflake JDBC to upload the streams
private final Properties proxyProperties;
private final boolean useIcebergFileTransferAgent;

private FileLocationInfo fileLocationInfo;
private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
Expand All @@ -78,6 +80,8 @@ class InternalStage implements IStorage {
* @param clientName The client name
* @param clientPrefix client prefix
* @param tableRef
* @param useIcebergFileTransferAgent Whether to use the local copy of the JDBC file transfer
* agent so that the etag of the uploaded file can be registered to snowflake
* @param fileLocationInfo The file location information from open channel response
* @param maxUploadRetries The maximum number of retries to attempt
*/
Expand All @@ -86,6 +90,7 @@ class InternalStage implements IStorage {
String clientName,
String clientPrefix,
TableRef tableRef,
boolean useIcebergFileTransferAgent,
FileLocationInfo fileLocationInfo,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
Expand All @@ -94,6 +99,7 @@ class InternalStage implements IStorage {
clientName,
clientPrefix,
tableRef,
useIcebergFileTransferAgent,
(SnowflakeFileTransferMetadataWithAge) null,
maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
Expand All @@ -107,6 +113,7 @@ class InternalStage implements IStorage {
* @param clientName the client name
* @param clientPrefix
* @param tableRef
* @param useIcebergFileTransferAgent
* @param testMetadata SnowflakeFileTransferMetadataWithAge to test with
* @param maxUploadRetries the maximum number of retries to attempt
*/
Expand All @@ -115,6 +122,7 @@ class InternalStage implements IStorage {
String clientName,
String clientPrefix,
TableRef tableRef,
boolean useIcebergFileTransferAgent,
SnowflakeFileTransferMetadataWithAge testMetadata,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
Expand All @@ -123,11 +131,12 @@ class InternalStage implements IStorage {
this.clientPrefix = clientPrefix;
this.tableRef = tableRef;
this.maxUploadRetries = maxUploadRetries;
this.useIcebergFileTransferAgent = useIcebergFileTransferAgent;
this.proxyProperties = generateProxyPropertiesForJDBC();
this.fileTransferMetadataWithAge = testMetadata;
}

private void putRemote(String fullFilePath, byte[] data, int retryCount)
private Optional<String> putRemote(String fullFilePath, byte[] data, int retryCount)
throws SnowflakeSQLException, IOException {
SnowflakeFileTransferMetadataV1 fileTransferMetadataCopy;
if (this.fileTransferMetadataWithAge.fileTransferMetadata.isForOneFile()) {
Expand Down Expand Up @@ -166,17 +175,29 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
refreshSnowflakeMetadata(false /* force */);
}

SnowflakeFileTransferAgent.uploadWithoutConnection(
SnowflakeFileTransferConfig.Builder.newInstance()
.setSnowflakeFileTransferMetadata(fileTransferMetadataCopy)
.setUploadStream(inStream)
.setRequireCompress(false)
.setOcspMode(OCSPMode.FAIL_OPEN)
.setStreamingIngestClientKey(this.clientPrefix)
.setStreamingIngestClientName(this.clientName)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
.build());
if (this.useIcebergFileTransferAgent) {
return Optional.of(
IcebergFileTransferAgent.uploadWithoutConnection(
fileTransferMetadataCopy,
inStream,
proxyProperties,
clientPrefix,
clientName,
fullFilePath));
} else {
SnowflakeFileTransferAgent.uploadWithoutConnection(
SnowflakeFileTransferConfig.Builder.newInstance()
.setSnowflakeFileTransferMetadata(fileTransferMetadataCopy)
.setUploadStream(inStream)
.setRequireCompress(false)
.setOcspMode(OCSPMode.FAIL_OPEN)
.setStreamingIngestClientKey(this.clientPrefix)
.setStreamingIngestClientName(this.clientName)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
.build());
return Optional.empty();
}
} catch (Exception e) {
if (retryCount == 0) {
// for the first exception, we always perform a metadata refresh.
Expand All @@ -198,7 +219,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
maxUploadRetries,
e.getMessage(),
getStackTrace(e));
this.putRemote(fullFilePath, data, retryCount);
return this.putRemote(fullFilePath, data, retryCount);
}
}

Expand Down Expand Up @@ -327,12 +348,13 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
}

/** Upload file to internal stage */
public void put(BlobPath blobPath, byte[] blob) {
public Optional<String> put(BlobPath blobPath, byte[] blob) {
if (this.isLocalFS()) {
putLocal(this.fileTransferMetadataWithAge.localLocation, blobPath.fileRegistrationPath, blob);
return Optional.empty();
} else {
try {
putRemote(blobPath.uploadPath, blob, 0);
return putRemote(blobPath.uploadPath, blob, 0);
} catch (SnowflakeSQLException | IOException e) {
throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class InternalStageManager implements IStorageManager {
clientName,
clientPrefix,
NO_TABLE_REF,
false /* useIcebergFileTransferAgent */,
response.getStageLocation(),
DEFAULT_MAX_UPLOAD_RETRIES);
} else {
Expand All @@ -87,6 +88,7 @@ class InternalStageManager implements IStorageManager {
"testClient",
null /* clientPrefix */,
NO_TABLE_REF,
false /* useIcebergFileTransferAgent */,
(SnowflakeFileTransferMetadataWithAge) null,
DEFAULT_MAX_UPLOAD_RETRIES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ private InternalStage createStageForTable(TableRef tableRef) {

try {
return new InternalStage(
this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES);
this,
clientName,
getClientPrefix(),
tableRef,
true /* useIcebergFileTransferAgent */,
locationInfo,
DEFAULT_MAX_UPLOAD_RETRIES);
} catch (SFException ex) {
logger.logError(
"ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, ex);
Expand Down
Loading
Loading