Skip to content

Commit

Permalink
Handle and map errors in delete pending segments API (apache#15673)
Browse files Browse the repository at this point in the history
Changes:
- Handle exception in deletePendingSegments API and map to correct HTTP status code
- Clean up exception message using `DruidException`
- Add unit tests
  • Loading branch information
abhishekrb19 authored Jan 15, 2024
1 parent e49a7bb commit 08c01f1
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package org.apache.druid.indexing.overlord;

import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.Comparator;
Expand All @@ -46,23 +47,31 @@ public IndexerMetadataStorageAdapter(

public int deletePendingSegments(String dataSource, Interval deleteInterval)
{
// Check the given interval overlaps the interval(minCreatedDateOfActiveTasks, MAX)
final Optional<DateTime> minCreatedDateOfActiveTasks = taskStorageQueryAdapter
// Find the earliest active task created for the specified datasource; if one exists,
// check if its interval overlaps with the delete interval.
final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorageQueryAdapter
.getActiveTaskInfo(dataSource)
.stream()
.map(TaskInfo::getCreatedTime)
.min(Comparator.naturalOrder());
.min(Comparator.comparing(TaskInfo::getCreatedTime));

final Interval activeTaskInterval = new Interval(
minCreatedDateOfActiveTasks.orElse(DateTimes.MAX),
DateTimes.MAX
);
if (earliestActiveTaskOptional.isPresent()) {
final TaskInfo<Task, TaskStatus> earliestActiveTask = earliestActiveTaskOptional.get();
final Interval activeTaskInterval = new Interval(
earliestActiveTask.getCreatedTime(),
DateTimes.MAX
);

Preconditions.checkArgument(
!deleteInterval.overlaps(activeTaskInterval),
"Cannot delete pendingSegments because there is at least one active task created at %s",
activeTaskInterval.getStart()
);
if (deleteInterval.overlaps(activeTaskInterval)) {
throw InvalidInput.exception(
"Cannot delete pendingSegments for datasource[%s] as there is at least one active task[%s] created at[%s] "
+ "that overlaps with the delete interval[%s]. Please retry when there are no active tasks.",
dataSource,
earliestActiveTask.getId(),
activeTaskInterval.getStart(),
deleteInterval
);
}
}

return indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(dataSource, deleteInterval);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void stop()
}

/**
* Returns true if it's the leader and its all services have been properly initialized.
* Returns true if it's the leader and all its services have been initialized.
*/
public boolean isLeader()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -939,10 +939,25 @@ public Response killPendingSegments(
}

if (taskMaster.isLeader()) {
final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build();
try {
final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build();
}
catch (DruidException e) {
return Response.status(e.getStatusCode())
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
catch (Exception e) {
log.warn(e, "Failed to delete pending segments for datasource[%s] and interval[%s].", dataSource, deleteInterval);
return Response.status(Status.INTERNAL_SERVER_ERROR)
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
} else {
return Response.status(Status.SERVICE_UNAVAILABLE).build();
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(ImmutableMap.of("error", "overlord is not the leader or not initialized yet"))
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,25 @@
package org.apache.druid.indexing.overlord;

import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.List;

public class IndexerMetadataStorageAdapterTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();

private TaskStorageQueryAdapter taskStorageQueryAdapter;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
Expand Down Expand Up @@ -69,7 +66,7 @@ public void testDeletePendingSegments()
NoopTask.create()
),
new TaskInfo<>(
"id1",
"id2",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
Expand All @@ -93,7 +90,7 @@ public void testDeletePendingSegments()
}

@Test
public void testDeletePendingSegmentsOfRunningTasks()
public void testDeletePendingSegmentsOfOneOverlappingRunningTask()
{
final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
new TaskInfo<>(
Expand All @@ -104,7 +101,7 @@ public void testDeletePendingSegmentsOfRunningTasks()
NoopTask.create()
),
new TaskInfo<>(
"id1",
"id2",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
Expand All @@ -125,8 +122,62 @@ public void testDeletePendingSegmentsOfRunningTasks()
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);

expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class));
expectedException.expectMessage("Cannot delete pendingSegments because there is at least one active task created");
indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Cannot delete pendingSegments for datasource[dataSource] as there is at least one active task[id1]"
+ " created at[2017-11-01T00:00:00.000Z] that overlaps with the delete "
+ "interval[2017-01-01T00:00:00.000Z/2017-12-01T00:00:00.000Z]. Please retry when there are no active tasks."
)
);
}

@Test
public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks()
{
final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
new TaskInfo<>(
"id1",
DateTimes.of("2017-12-01"),
TaskStatus.running("id1"),
"dataSource",
NoopTask.create()
),
new TaskInfo<>(
"id2",
DateTimes.of("2017-11-01"),
TaskStatus.running("id2"),
"dataSource",
NoopTask.create()
)
);

EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);

final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01");
EasyMock
.expect(
indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(
EasyMock.anyString(),
EasyMock.eq(deleteInterval)
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);

MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Cannot delete pendingSegments for datasource[dataSource] as there is at least one active task[id2]"
+ " created at[2017-11-01T00:00:00.000Z] that overlaps with the delete"
+ " interval[2017-01-01T00:00:00.000Z/2018-12-01T00:00:00.000Z]. Please retry when there are no active tasks."
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
Expand Down Expand Up @@ -983,10 +985,136 @@ public void testKillPendingSegments()
authConfig
);

final Map<String, Integer> response = (Map<String, Integer>) overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req)
.getEntity();
Assert.assertEquals(2, response.get("numDeleted").intValue());
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numDeleted", 2), response.getEntity());
}

@Test
public void testKillPendingSegmentsThrowsInvalidInputDruidException()
{
expectAuthorizationTokenCheck();

EasyMock.expect(taskMaster.isLeader()).andReturn(true);
final String exceptionMsg = "Some exception msg";
EasyMock
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andThrow(InvalidInput.exception(exceptionMsg))
.once();

EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);

Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);

Assert.assertEquals(400, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity());
}

@Test
public void testKillPendingSegmentsThrowsDefensiveDruidException()
{
expectAuthorizationTokenCheck();

EasyMock.expect(taskMaster.isLeader()).andReturn(true);
final String exceptionMsg = "An internal defensive exception";
EasyMock
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andThrow(DruidException.defensive(exceptionMsg))
.once();

EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);

Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);

Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity());
}

@Test
public void testKillPendingSegmentsThrowsArbitraryException()
{
expectAuthorizationTokenCheck();

EasyMock.expect(taskMaster.isLeader()).andReturn(true);
final String exceptionMsg = "An unexpected illegal state exception";
EasyMock
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andThrow(new IllegalStateException(exceptionMsg))
.once();

EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);

Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);

Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity());
}

@Test
public void testKillPendingSegmentsToNonLeader()
{
expectAuthorizationTokenCheck();

EasyMock.expect(taskMaster.isLeader()).andReturn(false);

EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);

Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);

Assert.assertEquals(503, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "overlord is not the leader or not initialized yet"), response.getEntity());
}

@Test
Expand Down

0 comments on commit 08c01f1

Please sign in to comment.