-
Notifications
You must be signed in to change notification settings - Fork 249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[batch] maybe reduce average JVMJob "connecting to jvm" time #13870
[batch] maybe reduce average JVMJob "connecting to jvm" time #13870
Conversation
f2145c4
to
d550af9
Compare
|
||
async def recreate_jvm(self, jvm: JVM): | ||
self._jvms.remove(jvm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a latent bug; the JVM is still owned by the job when recreate_jvm is called, it won't be in this array. If this every happened it would fail.
0b6d075
to
ba7204c
Compare
@@ -2984,8 +3004,9 @@ async def shutdown(self): | |||
log.info('Worker.shutdown') | |||
self._jvm_initializer_task.cancel() | |||
async with AsyncExitStack() as cleanup: | |||
for jvm in self._jvms: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems a bit odd; the JVMs might still be held by jobs, right?
ba7204c
to
7767141
Compare
7767141
to
92bf566
Compare
It takes about 1.3s to start a JVM.
|
batch/batch/worker/worker.py
Outdated
n_cores = self._jvm_waiters.get_nowait() | ||
jvmqueue = self._jvms_by_cores[n_cores] | ||
jvmqueue.queue.put_nowait(await JVM.create(global_jvm_index, n_cores, self)) | ||
jvmqueue.total += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time following this code and why the outer queue of jvm_waiters
is necessary.
|
||
assert self._waiting_for_jvm_with_n_cores.empty() | ||
assert all(jvmpool.full() for jvmpool in self._jvmpools_by_cores.values()) | ||
log.info(f'JVMs initialized {self._jvmpools_by_cores}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jigold Thanks for pushing back! What do you think of it now?
To directly answer your question: one queue (jvmpool.queue
) is a place for a consumer to borrow a JVM, the other queue (waiting_for_jvm_with_n_cores
) is a place for a producer to learn that a consumer is waiting.
Without waiting_for_jvm_with_n_cores
, _initialize_jvms
has no way to be told that someone is waiting for a JVM. asyncio.Queue
doesn't expose a method like has_waiters()
.
Much better! I understand what's going on now. Just to make sure I understand where the performance improvements are, we don't wait for all JVMs to be intitialized before accepting JVM jobs and the queue is FIFO so we reuse the same JVMs that are warm already? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment.
No. We have no way to accept only JVM jobs or only Batch jobs, so we either accept all jobs or no jobs. In main, we accept jobs before the JVMs have initialized. We wait for all JVMs to initialize before giving JVMs to any JVM Job. So, concretely, in main and in this PR we accept jobs before JVMs are ready; however, in this PR we don't wait for all JVMs to initialize before running jobs. There are two improvements in this PR:
(2) might sound slower (why start serially when we can stat in parallel?) but it appears that 30 JVMs competing for CPU time dramatically slows down average start up time. In both main and this PR it takes about ~25s for all JVMs to be ready; however, in this PR, some jobs can start much sooner than 25s b/c their JVMs are started first. |
No description provided.