Skip to content

Commit

Permalink
Fix fetch of task location in SpecificTaskServiceLocator (#16462)
Browse files Browse the repository at this point in the history
* Fix fetch of task location in SpecificTaskServiceLocator

* Resolve future if exception occurs while invoking API

* Remove unused import
  • Loading branch information
kfaraz authored May 20, 2024
1 parent a124c6c commit 15d27f3
Showing 1 changed file with 105 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.rpc.ServiceLocation;
Expand All @@ -57,7 +56,6 @@ public class SpecificTaskServiceLocator implements ServiceLocator

private final String taskId;
private final OverlordClient overlordClient;
private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher();
private final Object lock = new Object();

@GuardedBy("lock")
Expand Down Expand Up @@ -125,60 +123,26 @@ public void onSuccess(final Map<String, TaskStatus> taskStatusMap)
lastUpdateTime = System.currentTimeMillis();

final TaskStatus status = taskStatusMap.get(taskId);

if (status == null) {
// If the task status is unknown, we'll treat it as closed.
lastKnownState = null;
lastKnownLocation = null;
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
// Do not resolve the future just yet, try the fallback API instead
fetchFallbackTaskLocation();
} else {
lastKnownState = status.getStatusCode();
final TaskLocation location;
if (TaskLocation.unknown().equals(status.getLocation())) {
location = locationFetcher.getLocation();
} else {
location = status.getLocation();
}

if (TaskLocation.unknown().equals(location)) {
lastKnownLocation = null;
} else {
lastKnownLocation = new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);
}
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}

if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}

@Override
public void onFailure(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
resolvePendingFutureOnException(t);
}
},
MoreExecutors.directExecutor()
Execs.directExecutor()
);

return Futures.nonCancellationPropagating(retVal);
Expand Down Expand Up @@ -209,18 +173,104 @@ public void close()
}
}

private class TaskLocationFetcher
private void resolvePendingFuture(TaskState state, TaskLocation location)
{
TaskLocation getLocation()
{
final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
overlordClient.taskStatus(taskId),
true
);
if (statusResponse == null || statusResponse.getStatus() == null) {
return TaskLocation.unknown();
} else {
return statusResponse.getStatus().getLocation();
synchronized (lock) {
if (pendingFuture != null) {
lastKnownState = state;
lastKnownLocation = location == null ? null : new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);

if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}

private void resolvePendingFutureOnException(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);

// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}

/**
* Invokes the single task status API {@link OverlordClient#taskStatus} if the
* multi-task status API returns an unknown location (this can happen if the
* Overlord is running on a version older than Druid 30.0.0 (pre #15724)).
*/
private void fetchFallbackTaskLocation()
{
synchronized (lock) {
if (pendingFuture != null) {
final ListenableFuture<TaskStatusResponse> taskStatusFuture;
try {
taskStatusFuture = overlordClient.taskStatus(taskId);
}
catch (Exception e) {
resolvePendingFutureOnException(e);
return;
}

pendingFuture.addListener(
() -> {
if (!taskStatusFuture.isDone()) {
// pendingFuture may resolve without taskStatusFuture due to close().
taskStatusFuture.cancel(true);
}
},
Execs.directExecutor()
);

Futures.addCallback(
taskStatusFuture,
new FutureCallback<TaskStatusResponse>()
{
@Override
public void onSuccess(final TaskStatusResponse taskStatusResponse)
{
synchronized (lock) {
if (pendingFuture != null) {
lastUpdateTime = System.currentTimeMillis();

final TaskStatusPlus status = taskStatusResponse.getStatus();
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
resolvePendingFuture(status.getStatusCode(), null);
} else {
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}
}
}
}

@Override
public void onFailure(Throwable t)
{
resolvePendingFutureOnException(t);
}
},
Execs.directExecutor()
);
}
}
}
Expand Down

0 comments on commit 15d27f3

Please sign in to comment.