Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelIterable: Queue Size w/ O(1) #11895

Closed
wants to merge 1 commit into from

Conversation

shanielh
Copy link
Contributor

Instead of using ConcurrentLinkedQueue.size() which runs over the Linked Queue
in order to get the size of the queue, manage an AtomicInteger with the size
of the queue.

ConcurrentLinkedQueue.size() documentation states that this method is not
useful for concurrent applications.

Note: I have a JFR dump that shows this method uses 35% CPU utilization, this
is why I think this commit is important.

Instead of using ConcurrentLinkedQueue.size() which runs over the Linked Queue
in order to get the size of the queue, manage an AtomicInteger with the size
of the queue.

ConcurrentLinkedQueue.size() documentation states that this method is not
useful for concurrent applications.
@github-actions github-actions bot added the core label Dec 31, 2024
Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well ! Thank you for the fix !

have a JFR dump that shows this method uses 35% CPU utilization, this
is why I think this commit is important

interesting queue must really be huge, do you know what the manifest size / count we are looking at or more details of the table state ?

@shanielh
Copy link
Contributor Author

shanielh commented Jan 1, 2025

LGTM as well ! Thank you for the fix !

have a JFR dump that shows this method uses 35% CPU utilization, this
is why I think this commit is important

interesting queue must really be huge, do you know what the manifest size / count we are looking at or more details of the table state ?

Actually I was using ParallelIterable in order to read multiple parquet files in order to compact them, and to scan manifest files.

Table had 180 manifest files with a lot of files:

select count(*), 
       sum(added_data_files_count), 
       sum(existing_data_files_count), 
       sum(deleted_data_files_count) 
  from schema."table$manifests";
count(*) sum(added_data_files_count) sum(existing_data_files_count) sum(deleted_data_files_count)
180 1826 2703684 6844

@RussellSpitzer
Copy link
Member

I wonder if this is as important if we switch ParallelIterable to use the implementation suggested here #11768 which limits the queue depth significantly and changes the yielding behavior.

I think it's a good perf change here but I do worry about disconnecting the poll/push operations from actually changing the size tracker for the queue. We probably aren't actually going to have any issues here though since we are already check the size as basically random times without regard to ongoing concurrent operations.

@shanielh
Copy link
Contributor Author

shanielh commented Jan 3, 2025

I wonder if this is as important if we switch ParallelIterable to use the implementation suggested here #11768 which limits the queue depth significantly and changes the yielding behavior.

I think it's a good perf change here but I do worry about disconnecting the poll/push operations from actually changing the size tracker for the queue. We probably aren't actually going to have any issues here though since we are already check the size as basically random times without regard to ongoing concurrent operations.

Since we poll the size and it's a concurrent data structure, it doesn't really matter if the size is accurate or not, but eventually it is accurate.

As for #11768, we use a different S3FileIO which uses a different mechanism for InputStream, instead of keeping the connection open against S3, we download chunks of data and store it in the memory (on demand, of course). This way we can use ParallelIterable without having to think on the number of connections against S3. This will increase the cost as you might download a file using multiple GET calls instead of one, but allows you to run long lasting InputStream(s).

@shanielh
Copy link
Contributor Author

@RussellSpitzer, I see that #11768 is closed now, we use the PR in a forked version for over a week now and we've observed no issues, any chance to merge this? BTW, the fix for #11768 added another usage of queue.size() which is highly not recommended for concurrent applications, so the state would be even less optimal now, I can rebase if needed.

@Fokko Fokko requested a review from findepi January 13, 2025 12:41
@RussellSpitzer
Copy link
Member

@RussellSpitzer, I see that #11768 is closed now, we use the PR in a forked version for over a week now and we've observed no issues, any chance to merge this? BTW, the fix for #11768 added another usage of queue.size() which is highly not recommended for concurrent applications, so the state would be even less optimal now, I can rebase if needed.

As I mentioned before, 11768 reduces the number of calls to queue.size() from O(number of elements) to O(number of files) by moving the check out of the hot path so size() should now be drastically reduced in the number of times it is actually called. Because of this I'm not sure this PR is actually required anymore but it still probably makes sense from a best practices prospective. That said I really would like us to work on a full rework (as I note here #11781 (comment) )

@shanielh shanielh closed this Jan 13, 2025
@tbaeg
Copy link

tbaeg commented Jan 13, 2025

I think incremental improvement for the existing implementation (even if slated for rewrite) should be included.

Of note, we cherry-picked commits from #11768 and this commit as we were experiencing deadlocking and high CPU usage. We have been running stable in production for over a week now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants