Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Possible Race Condition #3269

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,31 @@ private[netty] object NettyConnectionPool {
}

/**
* Refreshes the idle timeout handler on the channel pipeline.
* Attempts to refresh the idle timeout handler on the channel pipeline.
* @return
* true if the handler was successfully refreshed prior to the channel being
* closed
* closed and the channel is still open.
*
* true if the `timeout` is None and the channel is still open.
*
* false of the channel is closed.
*/
private def refreshIdleTimeoutHandler(
channel: JChannel,
timeout: Duration,
): Boolean = {
channel
.pipeline()
.replace(
Names.ReadTimeoutHandler,
Names.ReadTimeoutHandler,
new ReadTimeoutHandler(timeout.toMillis, TimeUnit.MILLISECONDS),
)
channel.isOpen
timeout: Option[Duration],
)(implicit trace: Trace): Task[Boolean] = {
ZIO.attempt {
timeout.map { timeout =>
channel
.pipeline()
.replace(
Names.ReadTimeoutHandler,
Names.ReadTimeoutHandler,
new ReadTimeoutHandler(timeout.toMillis, TimeUnit.MILLISECONDS),
)
}
}.when(channel.isOpen)
.as(channel.isOpen)
Comment on lines +158 to +169
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "breaks" the current logic for retrying channels from the pool until we succeed. I think a better solution would be to keep the existing code as is and wrap it in a try-catch block so that we don't fail requests when this happens

  private def refreshIdleTimeoutHandler(
    channel: JChannel,
    timeout: Duration,
  ): Boolean = {
  try {
    channel
      .pipeline()
      .replace(
        Names.ReadTimeoutHandler,
        Names.ReadTimeoutHandler,
        new ReadTimeoutHandler(timeout.toMillis, TimeUnit.MILLISECONDS),
      )
    channel.isOpen
  } catch {
    case _: NoSuchElementException => false
  }

}

private final class ReadTimeoutErrorHandler(nettyRuntime: NettyRuntime)(implicit trace: Trace)
Expand Down Expand Up @@ -256,10 +264,12 @@ private[netty] object NettyConnectionPool {
// We retry a few times hoping to obtain an open channel
// NOTE: We need to release the channel before retrying, so that it can be closed and removed from the pool
// We do that in a forked fiber so that we don't "block" the current fiber while the new resource is obtained
if (channel.isOpen && idleTimeout.fold(true)(refreshIdleTimeoutHandler(channel, _)))
ZIO.succeed(channel)
else
invalidate(channel) *> release.forkDaemon *> ZIO.fail(None)

refreshIdleTimeoutHandler(channel, idleTimeout).flatMap {
case true => ZIO.succeed(channel)
case false => invalidate(channel) *> release.forkDaemon *> ZIO.fail(None)
}

}
.retry(retrySchedule(key))
.catchAll {
Expand Down
Loading