Skip to content

Commit

Permalink
Fix OverlordClient to read reports as a concrete ReportMap (#16226)
Browse files Browse the repository at this point in the history
Follow up to #16217 

Changes:
- Update `OverlordClient.getReportAsMap()` to return `TaskReport.ReportMap`
- Move the following classes to `org.apache.druid.indexer.report` in the `druid-processing` module
  - `TaskReport`
  - `KillTaskReport`
  - `IngestionStatsAndErrorsTaskReport`
  - `TaskContextReport`
  - `TaskReportFileWriter`
  - `SingleFileTaskReportFileWriter`
  - `TaskReportSerdeTest`
- Remove `MsqOverlordResourceTestClient` as it had only one method
which is already present in `OverlordResourceTestClient` itself
  • Loading branch information
kfaraz authored Apr 15, 2024
1 parent 041d0bf commit 81d7b6e
Show file tree
Hide file tree
Showing 78 changed files with 714 additions and 569 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTaskTest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.MSQControllerTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.google.inject.Key;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.msq.indexing.client;

import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;

@JsonTypeName(MSQTaskReport.REPORT_KEY)
public class MSQTaskReport implements TaskReport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,11 @@ private Optional<ResultSetInformation> getResultSetInformation(
)
{
if (sqlStatementState == SqlStatementState.SUCCESS) {
Map<String, Object> payload =
MSQTaskReportPayload msqTaskReportPayload =
SqlStatementResourceHelper.getPayload(contactOverlord(
overlordClient.taskReportAsMap(queryId),
queryId
));
MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(payload, MSQTaskReportPayload.class);
Optional<List<PageInformation>> pageList = SqlStatementResourceHelper.populatePageList(
msqTaskReportPayload,
msqDestination
Expand Down Expand Up @@ -607,7 +606,8 @@ private Optional<SqlStatementResult> getStatementStatus(
taskResponse,
statusPlus,
sqlStatementState,
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId),
jsonMapper
);
} else {
Optional<List<ColumnNameAndTypes>> signature = SqlStatementResourceHelper.getSignature(msqControllerTask);
Expand Down Expand Up @@ -735,8 +735,9 @@ private Optional<Yielder<Object[]>> getResultYielder(
);
}

MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class);
MSQTaskReportPayload msqTaskReportPayload = SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
);

if (msqTaskReportPayload.getResults().getResultYielder() == null) {
results = Optional.empty();
Expand All @@ -746,8 +747,9 @@ private Optional<Yielder<Object[]>> getResultYielder(

} else if (msqControllerTask.getQuerySpec().getDestination() instanceof DurableStorageMSQDestination) {

MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)), MSQTaskReportPayload.class);
MSQTaskReportPayload msqTaskReportPayload = SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
);

List<PageInformation> pages =
SqlStatementResourceHelper.populatePageList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
Expand All @@ -45,7 +47,10 @@
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.report.MSQStagesReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.sql.SqlStatementState;
Expand Down Expand Up @@ -243,12 +248,13 @@ public static Optional<SqlStatementResult> getExceptionPayload(
TaskStatusResponse taskResponse,
TaskStatusPlus statusPlus,
SqlStatementState sqlStatementState,
Map<String, Object> msqPayload
TaskReport.ReportMap msqPayload,
ObjectMapper jsonMapper
)
{
Map<String, Object> exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
Map<String, Object> exception = getMap(exceptionDetails, "error");
if (exceptionDetails == null || exception == null) {
final MSQErrorReport exceptionDetails = getQueryExceptionDetails(getPayload(msqPayload));
final MSQFault fault = exceptionDetails == null ? null : exceptionDetails.getFault();
if (exceptionDetails == null || fault == null) {
return Optional.of(new SqlStatementResult(
queryId,
sqlStatementState,
Expand All @@ -258,18 +264,15 @@ public static Optional<SqlStatementResult> getExceptionPayload(
null,
DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build("%s", taskResponse.getStatus().getErrorMsg()).toErrorResponse()
.build("%s", taskResponse.getStatus().getErrorMsg())
.toErrorResponse()
));
}

final String errorMessage = String.valueOf(exception.getOrDefault("errorMessage", statusPlus.getErrorMsg()));
exception.remove("errorMessage");
String errorCode = String.valueOf(exception.getOrDefault("errorCode", "unknown"));
exception.remove("errorCode");
Map<String, String> stringException = new HashMap<>();
for (Map.Entry<String, Object> exceptionKeys : exception.entrySet()) {
stringException.put(exceptionKeys.getKey(), String.valueOf(exceptionKeys.getValue()));
}
final String errorMessage = fault.getErrorMessage() == null ? statusPlus.getErrorMsg() : fault.getErrorMessage();
final String errorCode = fault.getErrorCode() == null ? "unknown" : fault.getErrorCode();

final Map<String, String> exceptionContext = buildExceptionContext(fault, jsonMapper);
return Optional.of(new SqlStatementResult(
queryId,
sqlStatementState,
Expand All @@ -285,7 +288,7 @@ protected DruidException makeException(DruidException.DruidExceptionBuilder bob)
DruidException ex = bob.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build(errorMessage);
ex.withContext(stringException);
ex.withContext(exceptionContext);
return ex;
}
}).toErrorResponse()
Expand Down Expand Up @@ -361,22 +364,42 @@ public static MSQStagesReport.Stage getFinalStage(MSQTaskReportPayload msqTaskRe
return null;
}

public static Map<String, Object> getQueryExceptionDetails(Map<String, Object> payload)
@Nullable
private static MSQErrorReport getQueryExceptionDetails(MSQTaskReportPayload payload)
{
return getMap(getMap(payload, "status"), "errorReport");
return payload == null ? null : payload.getStatus().getErrorReport();
}

public static Map<String, Object> getMap(Map<String, Object> map, String key)
@Nullable
public static MSQTaskReportPayload getPayload(TaskReport.ReportMap reportMap)
{
if (map == null) {
if (reportMap == null) {
return null;
}
return (Map<String, Object>) map.get(key);

Optional<MSQTaskReport> report = reportMap.findReport("multiStageQuery");
return report.map(MSQTaskReport::getPayload).orElse(null);
}

public static Map<String, Object> getPayload(Map<String, Object> results)
private static Map<String, String> buildExceptionContext(MSQFault fault, ObjectMapper mapper)
{
Map<String, Object> msqReport = getMap(results, "multiStageQuery");
return getMap(msqReport, "payload");
try {
final Map<String, Object> msqFaultAsMap = new HashMap<>(
mapper.readValue(
mapper.writeValueAsBytes(fault),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
)
);
msqFaultAsMap.remove("errorCode");
msqFaultAsMap.remove("errorMessage");

final Map<String, String> exceptionContext = new HashMap<>();
msqFaultAsMap.forEach((key, value) -> exceptionContext.put(key, String.valueOf(value)));

return exceptionContext;
}
catch (Exception e) {
throw DruidException.defensive("Could not read MSQFault[%s] as a map: [%s]", fault, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.druid.msq.indexing.client;

import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.indexing.MSQControllerTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.msq.indexing.report;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -28,8 +27,8 @@
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
Expand Down Expand Up @@ -243,9 +242,7 @@ public void testWriteTaskReport() throws Exception

final TaskReport.ReportMap reportMap = mapper.readValue(
reportFile,
new TypeReference<TaskReport.ReportMap>()
{
}
TaskReport.ReportMap.class
);

final MSQTaskReport report2 = (MSQTaskReport) reportMap.get(MSQTaskReport.REPORT_KEY);
Expand Down
Loading

0 comments on commit 81d7b6e

Please sign in to comment.