Skip to content

Commit

Permalink
Optimize S3 storage writing for MSQ durable storage (#16481)
Browse files Browse the repository at this point in the history
* Optimise S3 storage writing for MSQ durable storage

* Get rid of static ConcurrentHashMap

* Fix static checks

* Fix tests

* Remove unused constructor parameter chunkValidation + relevant cleanup

* Assert etags as String instead of Integer

* Fix flaky test

* Inject executor service

* Make threadpool size dynamic based on number of cores

* Fix S3StorageDruidModuleTest

* Fix S3StorageConnectorProviderTest

* Fix injection issues

* Add S3UploadConfig to manage maximum number of concurrent chunks dynamically based on chunk size

* Address the minor review comments

* Refactor S3UploadConfig + ExecutorService into S3UploadManager

* Address review comments

* Make updateChunkSizeIfGreater() synchronized instead of recomputeMaxConcurrentNumChunks()

* Address the minor review comments

* Fix intellij-inspections check

* Refactor code to use futures for maxNumConcurrentChunks. Also use executor service with blocking queue for backpressure semantics.

* Update javadoc

* Get rid of cyclic dependency injection between S3UploadManager and S3OutputConfig

* Fix RetryableS3OutputStreamTest

* Remove unnecessary synchronization parts from RetryableS3OutputStream

* Update javadoc

* Add S3UploadManagerTest

* Revert back to S3StorageConnectorProvider extends S3OutputConfig

* Address Karan's review comments

* Address Kashif's review comments

* Change a log message to debug

* Address review comments

* Fix intellij-inspections check

* Fix checkstyle

---------

Co-authored-by: asdf2014 <[email protected]>
  • Loading branch information
Akshat-Jain and asdf2014 authored Jun 7, 2024
1 parent e9f7233 commit 03a38be
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,22 @@ public void createAzureStorageFactoryWithRequiredProperties()
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties);
StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties);

assertInstanceOf(AzureStorageConnectorProvider.class, s3StorageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, s3StorageConnectorProvider.get());
assertEquals("container", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getPrefix());
assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get());
assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix());
assertEquals(new File("/tmp"),
((AzureStorageConnectorProvider) s3StorageConnectorProvider).getTempDir());

((AzureStorageConnectorProvider) storageConnectorProvider).getTempDir());
}

@Test
public void createAzureStorageFactoryWithMissingPrefix()
{

final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@

package org.apache.druid.storage.s3.output;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.io.CountingOutputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
Expand All @@ -49,6 +45,7 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -81,20 +78,13 @@ public class RetryableS3OutputStream extends OutputStream
private final File chunkStorePath;
private final long chunkSize;

private final List<PartETag> pushResults = new ArrayList<>();
private final byte[] singularBuffer = new byte[1];

// metric
private final Stopwatch pushStopwatch;

private Chunk currentChunk;
private int nextChunkId = 1; // multipart upload requires partNumber to be in the range between 1 and 10000
private int numChunksPushed;
/**
* Total size of all chunks. This size is updated whenever the chunk is ready for push,
* not when {@link #write(byte[], int, int)} is called.
*/
private long resultsSize;

/**
* A flag indicating whether there was an upload error.
Expand All @@ -103,27 +93,28 @@ public class RetryableS3OutputStream extends OutputStream
private boolean error;
private boolean closed;

public RetryableS3OutputStream(
S3OutputConfig config,
ServerSideEncryptingAmazonS3 s3,
String s3Key
) throws IOException
{
/**
* Helper class for calculating maximum number of simultaneous chunks allowed on local disk.
*/
private final S3UploadManager uploadManager;

this(config, s3, s3Key, true);
}
/**
* A list of futures to allow us to wait for completion of all uploadPart() calls
* before hitting {@link ServerSideEncryptingAmazonS3#completeMultipartUpload(CompleteMultipartUploadRequest)}.
*/
private final List<Future<UploadPartResult>> futures = new ArrayList<>();

@VisibleForTesting
protected RetryableS3OutputStream(
public RetryableS3OutputStream(
S3OutputConfig config,
ServerSideEncryptingAmazonS3 s3,
String s3Key,
boolean chunkValidation
S3UploadManager uploadManager
) throws IOException
{
this.config = config;
this.s3 = s3;
this.s3Key = s3Key;
this.uploadManager = uploadManager;

final InitiateMultipartUploadResult result;
try {
Expand All @@ -138,9 +129,7 @@ protected RetryableS3OutputStream(
this.chunkStorePath = new File(config.getTempDir(), uploadId + UUID.randomUUID());
FileUtils.mkdirp(this.chunkStorePath);
this.chunkSize = config.getChunkSize();
this.pushStopwatch = Stopwatch.createUnstarted();
this.pushStopwatch.reset();

this.pushStopwatch = Stopwatch.createStarted();
this.currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++)));
}

Expand Down Expand Up @@ -172,7 +161,6 @@ public void write(byte[] b, int off, int len) throws IOException

while (remainingBytesToWrite > 0) {
final int writtenBytes = writeToCurrentChunk(b, offsetToWrite, remainingBytesToWrite);

if (currentChunk.length() >= chunkSize) {
pushCurrentChunk();
currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++)));
Expand All @@ -199,62 +187,11 @@ private void pushCurrentChunk() throws IOException
{
currentChunk.close();
final Chunk chunk = currentChunk;
try {
if (chunk.length() > 0) {
resultsSize += chunk.length();

pushStopwatch.start();
pushResults.add(push(chunk));
pushStopwatch.stop();
numChunksPushed++;
}
}
finally {
if (!chunk.delete()) {
LOG.warn("Failed to delete chunk [%s]", chunk.getAbsolutePath());
}
}
}

private PartETag push(Chunk chunk) throws IOException
{
try {
return RetryUtils.retry(
() -> uploadPartIfPossible(uploadId, config.getBucket(), s3Key, chunk),
S3Utils.S3RETRY,
config.getMaxRetry()
if (chunk.length() > 0) {
futures.add(
uploadManager.queueChunkForUpload(s3, s3Key, chunk.id, chunk.file, uploadId, config)
);
}
catch (AmazonServiceException e) {
throw new IOException(e);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private PartETag uploadPartIfPossible(
String uploadId,
String bucket,
String key,
Chunk chunk
)
{
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(resultsSize);
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withUploadId(uploadId)
.withBucketName(bucket)
.withKey(key)
.withFile(chunk.file)
.withPartNumber(chunk.id)
.withPartSize(chunk.length());

if (LOG.isDebugEnabled()) {
LOG.debug("Pushing chunk [%s] to bucket[%s] and key[%s].", chunk, bucket, key);
}
UploadPartResult uploadResult = s3.uploadPart(uploadPartRequest);
return uploadResult.getPartETag();
}

@Override
Expand All @@ -268,53 +205,68 @@ public void close() throws IOException

// Closeables are closed in LIFO order
closer.register(() -> {
org.apache.commons.io.FileUtils.forceDelete(chunkStorePath);
LOG.info("Deleted chunkStorePath[%s]", chunkStorePath);

// This should be emitted as a metric
long totalChunkSize = (currentChunk.id - 1) * chunkSize + currentChunk.length();
LOG.info(
"Pushed total [%d] parts containing [%d] bytes in [%d]ms.",
numChunksPushed,
resultsSize,
futures.size(),
totalChunkSize,
pushStopwatch.elapsed(TimeUnit.MILLISECONDS)
);
});

closer.register(() -> org.apache.commons.io.FileUtils.forceDelete(chunkStorePath));

closer.register(() -> {
try {
if (resultsSize > 0 && isAllPushSucceeded()) {
RetryUtils.retry(
() -> s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(config.getBucket(), s3Key, uploadId, pushResults)
),
S3Utils.S3RETRY,
config.getMaxRetry()
);
} else {
RetryUtils.retry(
() -> {
s3.cancelMultiPartUpload(new AbortMultipartUploadRequest(config.getBucket(), s3Key, uploadId));
return null;
},
S3Utils.S3RETRY,
config.getMaxRetry()
);
}
}
catch (Exception e) {
throw new IOException(e);
}
});

try (Closer ignored = closer) {
if (!error) {
pushCurrentChunk();
completeMultipartUpload();
}
}
}

private boolean isAllPushSucceeded()
private void completeMultipartUpload()
{
return !error && !pushResults.isEmpty() && numChunksPushed == pushResults.size();
final List<PartETag> pushResults = new ArrayList<>();
for (Future<UploadPartResult> future : futures) {
if (error) {
future.cancel(true);
}
try {
UploadPartResult result = future.get(1, TimeUnit.HOURS);
pushResults.add(result.getPartETag());
}
catch (Exception e) {
error = true;
LOG.error(e, "Error in uploading part for upload ID [%s]", uploadId);
}
}

try {
boolean isAllPushSucceeded = !error && !pushResults.isEmpty() && futures.size() == pushResults.size();
if (isAllPushSucceeded) {
RetryUtils.retry(
() -> s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(config.getBucket(), s3Key, uploadId, pushResults)
),
S3Utils.S3RETRY,
config.getMaxRetry()
);
} else {
RetryUtils.retry(
() -> {
s3.cancelMultiPartUpload(new AbortMultipartUploadRequest(config.getBucket(), s3Key, uploadId));
return null;
},
S3Utils.S3RETRY,
config.getMaxRetry()
);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private static class Chunk implements Closeable
Expand All @@ -336,16 +288,6 @@ private long length()
return outputStream.getCount();
}

private boolean delete()
{
return file.delete();
}

private String getAbsolutePath()
{
return file.getAbsolutePath();
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class S3ExportStorageProvider implements ExportStorageProvider
@JacksonInject
ServerSideEncryptingAmazonS3 s3;

@JacksonInject
S3UploadManager s3UploadManager;

@JsonCreator
public S3ExportStorageProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
Expand Down Expand Up @@ -90,7 +93,7 @@ public StorageConnector get()
s3ExportConfig.getChunkSize(),
s3ExportConfig.getMaxRetry()
);
return new S3StorageConnector(s3OutputConfig, s3);
return new S3StorageConnector(s3OutputConfig, s3, s3UploadManager);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ public class S3StorageConnector extends ChunkingStorageConnector<GetObjectReques

private final S3OutputConfig config;
private final ServerSideEncryptingAmazonS3 s3Client;
private final S3UploadManager s3UploadManager;

private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
private static final int MAX_NUMBER_OF_LISTINGS = 1000;

public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3, S3UploadManager s3UploadManager)
{
this.config = config;
this.s3Client = serverSideEncryptingAmazonS3;
this.s3UploadManager = s3UploadManager;
Preconditions.checkNotNull(config, "config is null");
Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in s3 config");
try {
Expand Down Expand Up @@ -153,7 +155,7 @@ public InputStream open(GetObjectRequest object, long offset)
@Override
public OutputStream write(String path) throws IOException
{
return new RetryableS3OutputStream(config, s3Client, objectPath(path));
return new RetryableS3OutputStream(config, s3Client, objectPath(path), s3UploadManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ public List<? extends Module> getJacksonModules()
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.export.storage.s3", S3ExportConfig.class);
JsonConfigProvider.bind(binder, "druid.msq.intermediate.storage", S3OutputConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag
@JacksonInject
ServerSideEncryptingAmazonS3 s3;

@JacksonInject
S3UploadManager s3UploadManager;

@JsonCreator
public S3StorageConnectorProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
Expand All @@ -53,6 +56,6 @@ public S3StorageConnectorProvider(
@Override
public StorageConnector get()
{
return new S3StorageConnector(this, s3);
return new S3StorageConnector(this, s3, s3UploadManager);
}
}
Loading

0 comments on commit 03a38be

Please sign in to comment.