Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize S3 storage writing for MSQ durable storage #16481

Merged
merged 34 commits into from
Jun 7, 2024

Conversation

Akshat-Jain
Copy link
Contributor

@Akshat-Jain Akshat-Jain commented May 21, 2024

Description

This PR optimises S3 storage writing for MSQ durable storage by uploading the chunks to S3 in a separate threadpool.

Currently, we were creating and uploading a chunk to S3 sequentially, which isn't optimal.

This PR changes the logic such that the creation of chunks still happen in the same processing threadpool, but the uploading of chunks is offloaded to a separate threadpool.

Test Plan

  1. For every operation, I verified the number of rows returned with this PR's changes match the number of rows returned without this PR's changes.
  2. For every operation, I verified the output files and their sizes (in S3) with this PR's changes match the corresponding files uploaded without this PR's changes.

Timing Data

Following section summarizes the query duration comparison for a few different queries. We compare the original duration with the new duration across a few combinations of parameters.

Note:

  1. trips_xaa is one of the sample datasources with 60M rows.
  2. All timing durations mentioned below are averages of 3 data points.
  3. All tasks used the following query context, unless a different value is explicitly mentioned in the Other Notes column:
{
  "maxNumTasks": 5,
  "durableShuffleStorage": true,
  "selectDestination": "durableStorage",
  "rowsPerPage": 10000000
}

Query 1

select trip_id, sum(tip_amount) from "trips_xaa" group by trip_id

Original timings:

Chunk Size Time Other Notes
100 MB 10:31 minutes
5 MB 10:51 minutes
5 MB 9:53 minutes maxNumTasks = 2

New timings:

Chunk Size Threadpool size Max concurrent chunks Time
100 MB 5 5 8:56 minutes
100 MB 10 10 6:58 minutes
5 MB 5 5 10:26 minutes
5 MB 10 10 7:35 minutes
5 MB 30 30 6:52 minutes

Query 2

SELECT sum("commentLength"), "countryName" FROM "wikipedia_s3" group by "countryName" limit 4

Original timings:

Chunk Size Time
100 MB 32 seconds
5 MB 29 seconds

New timings:

Chunk Size Threadpool size Max concurrent chunks Time
100 MB 5 5 31 seconds
5 MB 5 5 28 seconds
5 MB 10 10 28 seconds

Query 3

SELECT "trip_id", sum(tip_amount) FROM "trips_xaa" where "fare_amount" > 100 group by "trip_id", "tip_amount" order by "trip_id"

Original timings:

Chunk Size Time
100 MB 4:52 minutes
5 MB 5:04 minutes

New timings:

Chunk Size Threadpool size Max concurrent chunks Time
100 MB 5 5 4:44 minutes
5 MB 5 5 4:55 minutes
5 MB 10 10 4:30 minutes

Query 4

select trip_id, sum(tip_amount) from "trips_xaa" where __time < TIMESTAMP '2014-04-06 11:15:25' group by trip_id

Original timings:

Chunk Size Time
100 MB 1:56 minutes
5 MB 1:52 minutes

New timings:

Chunk Size Threadpool size Max concurrent chunks Time
100 MB 5 5 1:45 minutes
5 MB 5 5 1:45 minutes
5 MB 10 10 1:44 minutes

Query 5

select * from "trips_xaa" where __time < TIMESTAMP '2013-11-06 11:15:25'

The optimization is much more significant when we have I/O bound queries like this one with durable storage enabled for storing intermediary stage outputs as well as the query results. This query returned 56.8 MB of data.

Original timings:

Chunk Size Time Other Notes
5 MB 2:11 minutes maxNumTasks = 2

New timings:

Chunk Size Threadpool size Max concurrent chunks Time Other Notes
5 MB 10 10 56 seconds maxNumTasks = 2

Query 6

select * from "trips_xaa" where __time < TIMESTAMP '2014-02-06 11:15:25'

This is another I/O bound query like the previous one. This query returned 454 MB of data.

Original timings:

Chunk Size Time Other Notes
5 MB 13:50 minutes maxNumTasks = 2

New timings:

Chunk Size Threadpool size Max concurrent chunks Time Other Notes
5 MB 10 10 5:08 minutes maxNumTasks = 2

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@LakshSingla
Copy link
Contributor

Were the tests run on a middle manager or an indexer?

@Akshat-Jain
Copy link
Contributor Author

Akshat-Jain commented May 27, 2024

Were the tests run on a middle manager or an indexer?

@LakshSingla On Indexer

@rohangarg
Copy link
Member

@Akshat-Jain : I had some doubts regarding the changes :

  1. Is there an example of how much time does the S3 writes take in real jobs? That can include the fraction of the overall time being consumed as well. Also, maybe that varies for small jobs vs large jobs and the concurrency in the system.
  2. I see that parallelism is introduced via a thread-pool in the system. Does that pool ensure some fairness for each writer? Earlier each writer used to get atleast one thread instantly for writing - what are your thoughts on that requirement?
  3. Also, adding things arbitrarily in thread-pool might mean that the some parts for a writer may have to wait to be uploaded. Is there any mechanism to know a summary of the wait times of the parts? I am asking this since I'm not sure how would a person evaluate the performance of a write in a concurrent system.
  4. I see some thread safe things being done with semaphores in the output stream - is the output stream expected to be thread safe? Or is that done for coordination with the executor among multiple writers. If it for coordination, then should that code reside inside the thread-pool executor somehow? That is also attached to the backpressure mechanism being built per-writer.
  5. I find it weird that we're doing ALL uploads in the output stream using multipart-uploads. And that includes having initiateMultipart call in the output stream's constructor which I personally don't like. Are there any thoughts on improving that by not using multi-part uploads (and rather use plain PUT request) for small uploads?

Please let me know your thoughts, if that's possible! 👍

@Akshat-Jain
Copy link
Contributor Author

@Akshat-Jain : I had some doubts regarding the changes :

  1. Is there an example of how much time does the S3 writes take in real jobs? That can include the fraction of the overall time being consumed as well. Also, maybe that varies for small jobs vs large jobs and the concurrency in the system.
  2. I see that parallelism is introduced via a thread-pool in the system. Does that pool ensure some fairness for each writer? Earlier each writer used to get atleast one thread instantly for writing - what are your thoughts on that requirement?
  3. Also, adding things arbitrarily in thread-pool might mean that the some parts for a writer may have to wait to be uploaded. Is there any mechanism to know a summary of the wait times of the parts? I am asking this since I'm not sure how would a person evaluate the performance of a write in a concurrent system.
  4. I see some thread safe things being done with semaphores in the output stream - is the output stream expected to be thread safe? Or is that done for coordination with the executor among multiple writers. If it for coordination, then should that code reside inside the thread-pool executor somehow? That is also attached to the backpressure mechanism being built per-writer.
  5. I find it weird that we're doing ALL uploads in the output stream using multipart-uploads. And that includes having initiateMultipart call in the output stream's constructor which I personally don't like. Are there any thoughts on improving that by not using multi-part uploads (and rather use plain PUT request) for small uploads?

Please let me know your thoughts, if that's possible! 👍

@rohangarg Thanks for the detailed questions!

  1. I have put some timing data in the PR description, where you can check queries 5 and 6 to get some idea of the degree to which concurrency helps in output heavy queries. With that said, I can work on getting timing data for queries across these scenarios, which should help us deduce how much time was taken in the durable storage part of the overall thing:
    1. MSQ without this PR's code changes + durable storage disabled
    2. MSQ without this PR's code changes + durable storage enabled
    3. MSQ with this PR's code changes + durable storage enabled
  2. I don't think earlier each writer got a thread instantly for writing. The writing was being done by the processing threadpool, but it's possible to have more workers than the processing threadpool size. I haven't tried such a scenario so far, but I think under this scenario, not all writers will get a thread instantly as there are more workers/writers than the size of processing threadpool. To answer your question about fairness in the PR changes, there's no such mechanism. But as I described, this seems like a separate problem than what this PR is trying to address.
  3. That's a good point. I can add logs for wait times of all parts, but I felt it might be too much logging. Will think more about this. Happy to hear any suggestions as well, if you have any! :)
  4. Semaphore was used to make sure we restrict number of files locally to xyz number of files, to restrict disk space usage at any given point in time. Semaphore wasn't needed in the original code since the disk space was automatically restricted as the same thread was deleting the chunk before writing the next chunk.
  5. I think the rationale here is that we don't know in advance how big the overall file would be. Hence we are doing multi-part uploads for everything. cc: @cryptoe to confirm this point though.

Hope these answer your questions. Happy to discuss further on any of the above points if needed, thanks! :)

Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have commented a few readability concerns with the latest set of changes.

a. I am curious as to how it performs on a non-local cluster.
b. As I understand, the PR introduces concurrency, and also modifies the chunk size. I wonder what the improvement will be if we fine tune the chunk size, and not have concurrency in the job. So measuring the time of the jobs like:
Original changes + 100 MB chunk size
Original changes + 20 MB chunk size
Original changes + 5 MB chunk size
New changes

Though the latter can be pretty tedious, so original v/s new is what I am primarily concerned about

@LakshSingla
Copy link
Contributor

LakshSingla commented May 28, 2024

idea of the degree to which concurrency helps in output heavy queries

What is meant by output heavy query?

but it's possible to have more workers than the processing threadpool size.

In which scenario is this possible?

@Akshat-Jain
Copy link
Contributor Author

idea of the degree to which concurrency helps in output heavy queries

What is meant by output heavy query?

but it's possible to have more workers than the processing threadpool size.

In which scenario is this possible?

@LakshSingla

  1. By output heavy query, I meant queries where we have a lot of data to upload to durable storage, and the processing time isn't a lot. Example: select * from huge_datasource
  2. I think our processing threadpool size is fixed based on number of cores of the system. For example, for my system it's 10 processing threads. But I can provide maxNumTasks higher than that? Or do we have a pre-flight check to not allow that? (I should try it out once)

@rohangarg
Copy link
Member

I have put some timing data in the PR description, where you can check queries 5 and 6 to get some idea of the degree to which concurrency helps in output heavy queries.

Thanks, I checked earlier but I couldn't understand their timings much. For instance, were you able to reason about why the serial upload is taking ~ 8 minutes for 450MBs? Also, the chunk size in those queries seemed a bit de-generate to me.

I don't think earlier each writer got a thread instantly for writing. The writing was being done by the processing threadpool, but it's possible to have more workers than the processing threadpool size. I haven't tried such a scenario so far, but I think under this scenario, not all writers will get a thread instantly as there are more workers/writers than the size of processing threadpool. To answer your question about fairness in the PR changes, there's no such mechanism. But as I described, this seems like a separate problem than what this PR is trying to address.

I was actually meaning that the threads which are running and are writing to the output stream currently are able to write it to S3 too on the same thread, which might not be the case which this change and the writes might have to wait. Are we solving that scenario in this change? Further, that brings me to another point I wanted to raise - have we considered supporting parallel uploads via the execution framework (maybe an operator to fan out writes) instead of doing it on a per storage basis? If so, can you please share the reasons for not choosing that? One more positive if it can be done via execution framework is that it will then automatically support all storage connectors.

That's a good point. I can add logs for wait times of all parts, but I felt it might be too much logging. Will think more about this. Happy to hear any suggestions as well, if you have any! :)

You can create summary statistics over the wait times and post them in the logs - hopefully that's good enough :)

Semaphore was used to make sure we restrict number of files locally to xyz number of files, to restrict disk space usage at any given point in time. Semaphore wasn't needed in the original code since the disk space was automatically restricted as the same thread was deleting the chunk before writing the next chunk.

Yes, it makes sense to restrict the number of concurrent files. But I would do that in the executor related code itself instead of leaking it into the output stream.

I think the rationale here is that we don't know in advance how big the overall file would be. Hence we are doing multi-part uploads for everything.

You can determine that the file is small when not even one local chunk is fully filled, and the output stream is closed. And once you've determined that a file is small, direct PUT can be used over multi-part upload.

@Akshat-Jain
Copy link
Contributor Author

Akshat-Jain commented May 28, 2024

@rohangarg

Thanks, I checked earlier but I couldn't understand their timings much. For instance, were you able to reason about why the serial upload is taking ~ 8 minutes for 450MBs?

The serial upload is uploading 5 MBs one-by-one, hence takes an extremely long time.

Also, the chunk size in those queries seemed a bit de-generate to me.

So, I had to reduce the chunk size to lowest possible value to be able to simulate upload of a large number of chunks, as I can't do it otherwise due to limited system resources. But it shouldn't affect the comparison/improvement trend, hence we can still rely on this data to get some idea on the degree of savings.

I was actually meaning that the threads which are running and are writing to the output stream currently are able to write it to S3 too on the same thread, which might not be the case which this change and the writes might have to wait. Are we solving that scenario in this change?

Actually it's the opposite. Previously the writes to the output stream (local file) had to wait for the previous chunk write+upload to finish. Now, the write doesn't have to wait for the previous uploads to finish (there's a backpressure semantic to restrict too many writes, but that's different from the current point). Please let me know if this answers the question, happy to clarify this further!

have we considered supporting parallel uploads via the execution framework (maybe an operator to fan out writes) instead of doing it on a per storage basis?

No, we hadn't discussed any approach like that. cc: @cryptoe

But I would do that in the executor related code itself instead of leaking it into the output stream.

Can you please clarify what do you mean by "leaking it into the output stream"? Sorry, I didn't get this part.

You can determine that the file is small when not even one local chunk is fully filled, and the output stream is closed. And once you've determined that a file is small, direct PUT can be used over multi-part upload.

But this would work only when there's a single chunk. How about when there are 5 chunks, for example? We'd have to wait for the last chunk to be processed before we can make the decision that the total number of chunks * chunkSize is small enough to go with PUT approach (and we would have to hold on to processing the initial 4 chunks until then as well). Also, supporting the PUT flow along with the multipart approach doesn't seem ideal to me, it would be one extra thing to maintain for not much benefits. That's my opinion on this atleast. Happy to discuss it further.

@rohangarg
Copy link
Member

The serial upload is uploading 5 MBs one-by-one, hence takes an extremely long time.

Ok, although the bandwidth comes out to be around < 1 MB/sec with this calculation. So, I'm not sure about realistic nature of the test.

Actually it's the opposite. Previously the writes to the output stream (local file) had to wait for the previous chunk write+upload to finish. Now, the write doesn't have to wait for the previous uploads to finish (there's a backpressure semantic to restrict too many writes, but that's different from the current point). Please let me know if this answers the question, happy to clarify this further!

Removing backpressure from the situation, previously if you have written a chunk, you'll upload it and wait for the upload to finish to start writing another chunk - and all that will happen without any extra wait time. Now, if you've written a chunk, you'll put it in an executor which might be consumed by the parts of other writers and you'll wait in the queue for your upload to be initiated. My question was that is the new behavior expected and fine?

Can you please clarify what do you mean by "leaking it into the output stream"? Sorry, I didn't get this part.

I meant that the code for semaphores directly in the output stream seems more related to the behavior of the output stream rather than a property of the uploading mechanism.

But this would work only when there's a single chunk. How about when there are 5 chunks, for example? We'd have to wait for the last chunk to be processed before we can make the decision that the total number of chunks * chunkSize is small enough to go with PUT approach (and we would have to hold on to processing the initial 4 chunks until then as well). Also, supporting the PUT flow along with the multipart approach doesn't seem ideal to me, it would be one extra thing to maintain for not much benefits. That's my opinion on this atleast. Happy to discuss it further.

Yes, this would only work for a single chunk. I've never recommended to extend it any further than a single chunk. Further, a multi-part request would involve 3 API calls (which are chargeable) for small uploads as compared to a PUT call which only makes 1 API call. Also, I think the multi-part API was intended more to be used as a resilient mechanism for large file uploads. So, it feels like unnecessary to me for small files. In any case, it was a suggestion for optimization and you can feel free to ignore it if you want.

@Akshat-Jain
Copy link
Contributor Author

@rohangarg

Ok, although the bandwidth comes out to be around < 1 MB/sec with this calculation. So, I'm not sure about realistic nature of the test.

The data I mentioned was average of 3+ readings. I can do it again (once all review comments are addressed and everyone is okay with the implementation).

Now, if you've written a chunk, you'll put it in an executor which might be consumed by the parts of other writers and you'll wait in the queue for your upload to be initiated. My question was that is the new behavior expected and fine?

Yes, the upload will happen sooner (despite the potential wait) than it would have happened in the original sequential code.

I meant that the code for semaphores directly in the output stream seems more related to the behavior of the output stream rather than a property of the uploading mechanism.

We don't want the main threads to be blocked by the upload threads (except for the last chunk where we have to complete the multipart upload). Hence, the main thread cannot release the semaphore post finish of upload(), if that's what the suggestion was.

Yes, this would only work for a single chunk.

Cool, I agree then. The only counter point I have would be the maintainability angle. I'll discuss it with Karan offline if it makes sense to go for it. Either way, this can go in a separate follow-up PR, if it has to. Hope that works!

@Akshat-Jain Akshat-Jain requested review from kfaraz and LakshSingla June 3, 2024 11:41
chunkSize = Math.max(chunkSize, s3ExportConfig.getChunkSize().getBytes());
}

return (int) (S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES / chunkSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the numerator be the overall disk size available for keeping chunks?
The value used here S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES is 5gb which just seems to be the maximum object size limit imposed by S3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfaraz I was trying to do it similar to how it's done in WorkerStorageParameters. IIUC, in WorkerStorageParameters, we have a hard-coded size of 1 GB (MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES = 1_000_000_000L;) which is used instead of computing the overall available disk space.

Also, I don't think we should use the overall disk size available for this calculation, and should rather have a hard-limit? I chose 5 GB as that's the maximum allowed chunk size by S3. In the default case, chunk size is 100 MB, so this means around 50 chunks allowed on disk at once, which seems fine?

Thoughts? cc: @cryptoe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5GB doesn't seem like a bad default but limiting it to 5GB even when we might have much more disk available seems weird. Also, if 5GB is the limit we want to use, we should define a separate constant rather than reuse the constant for max size.

we should use the overall disk size available for this calculation

Just to clarify, I have not suggested using the entire disk space for storing chunks, rather the portion actually allocated for this purpose. That allocation may either be a hard-coded value, a fixed percentage of the overall disk space or even a config.

In any case, that amount of disk space should be passed as an argument to the computeMaxNumChunksOnDisk method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfaraz I had a discussion with Karan about it. The conclusion was that we don't need to block this PR for this review comment, and a follow-up task could be to add a metric for gathering data on how long are the processing threads actually blocked on this constraint. We could then tune this accordingly based on the gathered data.
Hope that works!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. 👍🏻

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comments, @Akshat-Jain !

I have left a few more comments to clean up the tests but they need not block this PR and maybe addressed in a follow-up.

@Akshat-Jain
Copy link
Contributor Author

@kfaraz Thank you for the extremely exhaustive review comments, got to learn a LOT from them throughout this PR! 😄

@kfaraz
Copy link
Contributor

kfaraz commented Jun 6, 2024

Happy to help, @Akshat-Jain ! 🙂
Thank you for the changes and for the discussion!

@cryptoe cryptoe merged commit 03a38be into apache:master Jun 7, 2024
87 checks passed
@abhishekrb19
Copy link
Contributor

@Akshat-Jain, while looking into some test failures on one of my PRs, I noticed that the test org.apache.druid.storage.s3.output.RetryableS3OutputStreamTest.testWriteSmallBufferShouldSucceed sometimes fails. Maven will retry flakey tests up to 3 times. Please see the failed job and the screenshot below: https://github.com/apache/druid/actions/runs/9620286940/job/26538536109.

CleanShot 2024-06-21 at 19 36 58@2x

It's also reproducible easily in the IDE: try running this test with "Repeat Until Failure" configuration. I haven't looked into what's causing the flakiness, but it seems like there's an ordering mismatch in the uploaded S3 chunks. I'm not sure if there's a race condition in the upload executor added in this PR, but I thought I'd mention it in case this is a legitimate issue.

kfaraz pushed a commit that referenced this pull request Jun 24, 2024
As part of #16481, we have started uploading the chunks in parallel.
That means that it's not necessary for the part that finished uploading last
to be less than or equal to the chunkSize (as the final part could've been uploaded earlier).

This made a test in RetryableS3OutputStreamTest flaky where we were
asserting that the final part should be smaller than chunk size.

This commit fixes the test, and also adds another test where the file size
is such that all chunk sizes would be of equal size.
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants