Skip to content

Commit

Permalink
Wire up the ETag from S3's upload response back to the BlobDTO's MD5 …
Browse files Browse the repository at this point in the history
…field, to handle multipart upload correctly (#915)


With the late breaking design change to use subscoped tokens instead of direct S3 PUTs, we ended up using the same file handling logic as BDECs. This meant going through JDBC and the S3 SDK.

As part of recent testing I've discovered that for files greater than 16MB, S3 splits the file into a multipart upload.
The ETag of such a file is NOT the MD5 hash, which is what's also documented.

For BDECs, we calculate the MD5 hash ourselves and send it to snowflake, where it's stored in the fileContentKey field.
For parquet files operating specifically in the iceberg table, there is a check in XP to ensure that the ETag of the blob being read is identical to the fileContentKey stored in snowflake metadata.

Connecting these dots - what's happening before this fix is that for iceberg ingestion of files greater than 16 MB, the SDK sends the MD5 hash into the fileContentKey property whereas XP expects it to be the ETag value (which is NOT the MD5 of the contents IF its a multipart upload).

The proper fix is to make JDBC return the ETag value after uploading the file, through all the layers of JDBC classes, to the API that ingest SDK uses (uploadWithoutConnection).

Since we need to fix this right away, this PR copies over those parts of JDBC that are used for iceberg ingestion. As soon as JDBC driver has the new fix we'll remove all these classes.

Note that this PR accidentally changes the timeout to 20 seconds, another PR tomorrow is going to make that change and i'll back it out of this branch before merging.
  • Loading branch information
sfc-gh-hmadan authored Nov 22, 2024
1 parent 59ce9e0 commit 84727c3
Show file tree
Hide file tree
Showing 22 changed files with 2,645 additions and 31 deletions.
19 changes: 19 additions & 0 deletions .run/IcebergBigFilesIT.testMultiplePartUpload.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="IcebergBigFilesIT.testMultiplePartUpload" type="JUnit" factoryName="JUnit" nameIsGenerated="true">
<module name="snowflake-ingest-sdk" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="net.snowflake.ingest.streaming.internal.it.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<option name="PACKAGE_NAME" value="net.snowflake.ingest.streaming.internal.it" />
<option name="MAIN_CLASS_NAME" value="net.snowflake.ingest.streaming.internal.it.IcebergBigFilesIT" />
<option name="METHOD_NAME" value="testMultiplePartUpload" />
<option name="TEST_OBJECT" value="method" />
<option name="VM_PARAMETERS" value="-ea --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.code=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,14 @@ static <T> Blob constructBlobAndMetadata(

logger.logInfo(
"Finish building chunk in blob={}, table={}, rowCount={}, startOffset={},"
+ " estimatedUncompressedSize={}, chunkLength={}, compressedSize={},"
+ " estimatedUncompressedSize={}, md5={}, chunkLength={}, compressedSize={},"
+ " encrypt={}, bdecVersion={}",
filePath,
firstChannelFlushContext.getFullyQualifiedTableName(),
serializedChunk.rowCount,
startOffset,
serializedChunk.chunkEstimatedUncompressedSize,
md5,
chunkLength,
compressedChunkDataSize,
internalParameterProvider.getEnableChunkEncryption(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -646,7 +647,14 @@ BlobMetadata upload(
long startTime = System.currentTimeMillis();

Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
storage.put(blobPath, blob);

// The returned etag is non-empty ONLY in case of iceberg uploads. With iceberg files, the XP
// scanner expects the
// MD5 value to exactly match the etag value in S3. This assumption doesn't hold when multipart
// upload kicks in,
// causing scan time failures and table corruption. By plugging in the etag value instead of the
// md5 value,
Optional<String> etag = storage.put(blobPath, blob);

if (uploadContext != null) {
blobStats.setUploadDurationMs(uploadContext);
Expand All @@ -657,16 +665,17 @@ BlobMetadata upload(
}

logger.logInfo(
"Finish uploading blob={}, size={}, timeInMillis={}",
"Finish uploading blob={}, size={}, timeInMillis={}, etag={}",
blobPath.fileRegistrationPath,
blob.length,
System.currentTimeMillis() - startTime);
System.currentTimeMillis() - startTime,
etag);

// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
// spans mixed tables or not
return BlobMetadata.createBlobMetadata(
blobPath.fileRegistrationPath,
BlobBuilder.computeMD5(blob),
etag.isPresent() ? etag.get() : BlobBuilder.computeMD5(blob),
bdecVersion,
metadata,
blobStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package net.snowflake.ingest.streaming.internal;

import java.util.Optional;

/**
* Interface that represents a storage location to which we should upload data files. It is the
* account's internal stage for snowflake tables, and the table's external volume for iceberg
Expand All @@ -15,6 +17,8 @@ interface IStorage {
*
* @param blobPath
* @param blob
* @return The String ETag returned by the upload. Can be null in situations where the underlying
* layer does not have an ETag to return.
*/
void put(BlobPath blobPath, byte[] blob);
Optional<String> put(BlobPath blobPath, byte[] blob);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.cloud.storage.StageInfo;
import net.snowflake.client.jdbc.internal.apache.commons.io.FileUtils;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.IcebergFileTransferAgent;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
Expand Down Expand Up @@ -67,6 +68,7 @@ class InternalStage implements IStorage {

// Proxy parameters that we set while calling the Snowflake JDBC to upload the streams
private final Properties proxyProperties;
private final boolean useIcebergFileTransferAgent;

private FileLocationInfo fileLocationInfo;
private SnowflakeFileTransferMetadataWithAge fileTransferMetadataWithAge;
Expand All @@ -78,6 +80,8 @@ class InternalStage implements IStorage {
* @param clientName The client name
* @param clientPrefix client prefix
* @param tableRef
* @param useIcebergFileTransferAgent Whether to use the local copy of the JDBC file transfer
* agent so that the etag of the uploaded file can be registered to snowflake
* @param fileLocationInfo The file location information from open channel response
* @param maxUploadRetries The maximum number of retries to attempt
*/
Expand All @@ -86,6 +90,7 @@ class InternalStage implements IStorage {
String clientName,
String clientPrefix,
TableRef tableRef,
boolean useIcebergFileTransferAgent,
FileLocationInfo fileLocationInfo,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
Expand All @@ -94,6 +99,7 @@ class InternalStage implements IStorage {
clientName,
clientPrefix,
tableRef,
useIcebergFileTransferAgent,
(SnowflakeFileTransferMetadataWithAge) null,
maxUploadRetries);
Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
Expand All @@ -107,6 +113,7 @@ class InternalStage implements IStorage {
* @param clientName the client name
* @param clientPrefix
* @param tableRef
* @param useIcebergFileTransferAgent
* @param testMetadata SnowflakeFileTransferMetadataWithAge to test with
* @param maxUploadRetries the maximum number of retries to attempt
*/
Expand All @@ -115,6 +122,7 @@ class InternalStage implements IStorage {
String clientName,
String clientPrefix,
TableRef tableRef,
boolean useIcebergFileTransferAgent,
SnowflakeFileTransferMetadataWithAge testMetadata,
int maxUploadRetries)
throws SnowflakeSQLException, IOException {
Expand All @@ -123,11 +131,12 @@ class InternalStage implements IStorage {
this.clientPrefix = clientPrefix;
this.tableRef = tableRef;
this.maxUploadRetries = maxUploadRetries;
this.useIcebergFileTransferAgent = useIcebergFileTransferAgent;
this.proxyProperties = generateProxyPropertiesForJDBC();
this.fileTransferMetadataWithAge = testMetadata;
}

private void putRemote(String fullFilePath, byte[] data, int retryCount)
private Optional<String> putRemote(String fullFilePath, byte[] data, int retryCount)
throws SnowflakeSQLException, IOException {
SnowflakeFileTransferMetadataV1 fileTransferMetadataCopy;
if (this.fileTransferMetadataWithAge.fileTransferMetadata.isForOneFile()) {
Expand Down Expand Up @@ -166,17 +175,29 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
refreshSnowflakeMetadata(false /* force */);
}

SnowflakeFileTransferAgent.uploadWithoutConnection(
SnowflakeFileTransferConfig.Builder.newInstance()
.setSnowflakeFileTransferMetadata(fileTransferMetadataCopy)
.setUploadStream(inStream)
.setRequireCompress(false)
.setOcspMode(OCSPMode.FAIL_OPEN)
.setStreamingIngestClientKey(this.clientPrefix)
.setStreamingIngestClientName(this.clientName)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
.build());
if (this.useIcebergFileTransferAgent) {
return Optional.ofNullable(
IcebergFileTransferAgent.uploadWithoutConnection(
fileTransferMetadataCopy,
inStream,
proxyProperties,
clientPrefix,
clientName,
fullFilePath));
} else {
SnowflakeFileTransferAgent.uploadWithoutConnection(
SnowflakeFileTransferConfig.Builder.newInstance()
.setSnowflakeFileTransferMetadata(fileTransferMetadataCopy)
.setUploadStream(inStream)
.setRequireCompress(false)
.setOcspMode(OCSPMode.FAIL_OPEN)
.setStreamingIngestClientKey(this.clientPrefix)
.setStreamingIngestClientName(this.clientName)
.setProxyProperties(this.proxyProperties)
.setDestFileName(fullFilePath)
.build());
return Optional.empty();
}
} catch (Exception e) {
if (retryCount == 0) {
// for the first exception, we always perform a metadata refresh.
Expand All @@ -198,7 +219,7 @@ private void putRemote(String fullFilePath, byte[] data, int retryCount)
maxUploadRetries,
e.getMessage(),
getStackTrace(e));
this.putRemote(fullFilePath, data, retryCount);
return this.putRemote(fullFilePath, data, retryCount);
}
}

Expand Down Expand Up @@ -327,12 +348,13 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)
}

/** Upload file to internal stage */
public void put(BlobPath blobPath, byte[] blob) {
public Optional<String> put(BlobPath blobPath, byte[] blob) {
if (this.isLocalFS()) {
putLocal(this.fileTransferMetadataWithAge.localLocation, blobPath.fileRegistrationPath, blob);
return Optional.empty();
} else {
try {
putRemote(blobPath.uploadPath, blob, 0);
return putRemote(blobPath.uploadPath, blob, 0);
} catch (SnowflakeSQLException | IOException e) {
throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class InternalStageManager implements IStorageManager {
clientName,
clientPrefix,
NO_TABLE_REF,
false /* useIcebergFileTransferAgent */,
response.getStageLocation(),
DEFAULT_MAX_UPLOAD_RETRIES);
} else {
Expand All @@ -87,6 +88,7 @@ class InternalStageManager implements IStorageManager {
"testClient",
null /* clientPrefix */,
NO_TABLE_REF,
false /* useIcebergFileTransferAgent */,
(SnowflakeFileTransferMetadataWithAge) null,
DEFAULT_MAX_UPLOAD_RETRIES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ private InternalStage createStageForTable(TableRef tableRef) {

try {
return new InternalStage(
this, clientName, getClientPrefix(), tableRef, locationInfo, DEFAULT_MAX_UPLOAD_RETRIES);
this,
clientName,
getClientPrefix(),
tableRef,
true /* useIcebergFileTransferAgent */,
locationInfo,
DEFAULT_MAX_UPLOAD_RETRIES);
} catch (SFException ex) {
logger.logError(
"ExtVolManager.registerTable for tableRef={} failed with exception={}", tableRef, ex);
Expand Down
Loading

0 comments on commit 84727c3

Please sign in to comment.