Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adarshsanjeev committed Nov 20, 2023
1 parent 2dc6f29 commit e2399cb
Show file tree
Hide file tree
Showing 8 changed files with 582 additions and 577 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@ public <RowType, QueryType> DataServerQueryResult<RowType> fetchRowsFromDataServ
}
}

pendingRequests.addAll(createNextPendingRequests(missingRichSegmentDescriptor, MultiStageQueryContext.getSegmentSources(query.context())));

retryCount++;
if (retryCount >= maxRetries) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Unable to fetch results from dataservers in [%d] retries.", retryCount);
pendingRequests = createNextPendingRequests(missingRichSegmentDescriptor, MultiStageQueryContext.getSegmentSources(query.context()));

if (!pendingRequests.isEmpty()) {
retryCount++;
if (retryCount > maxRetries) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Unable to fetch results from dataservers in [%d] retries.", retryCount);
}
}
}

Expand All @@ -209,41 +211,39 @@ private <QueryType, RowType> Yielder<RowType> fetchRowsFromDataServerInternal(
.collect(Collectors.toList());

try {
return
RetryUtils.retry(
() -> closer.register(createYielder(
dataServerClient.run(
Queries.withSpecificSegments(
query,
requestDescriptor.getSegments()
.stream()
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
.collect(Collectors.toList())
), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)),
throwable -> !(throwable instanceof QueryInterruptedException
&& throwable.getCause() instanceof InterruptedException),
PER_SERVER_QUERY_NUM_TRIES
);
}
catch (QueryInterruptedException e) {
if (e.getCause() instanceof RpcException) {
// In the case that all the realtime servers for a segment are gone (for example, if they were scaled down),
// we would also be unable to fetch the segment.
responseContext.addMissingSegments(segmentDescriptors);
return Yielders.each(Sequences.empty());
} else {
return RetryUtils.retry(
() -> closer.register(createYielder(
dataServerClient.run(
Queries.withSpecificSegments(
query,
requestDescriptor.getSegments()
.stream()
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
.collect(Collectors.toList())
), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)),
throwable -> !(throwable instanceof QueryInterruptedException
&& throwable.getCause() instanceof InterruptedException),
PER_SERVER_QUERY_NUM_TRIES
);
}
catch (QueryInterruptedException e) {
if (e.getCause() instanceof RpcException) {
// In the case that all the realtime servers for a segment are gone (for example, if they were scaled down),
// we would also be unable to fetch the segment.
responseContext.addMissingSegments(segmentDescriptors);
return Yielders.each(Sequences.empty());
} else {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation);
}
}
catch (Exception e) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation);
}
}
catch (Exception e) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation);
}

}

private <RowType, QueryType> Yielder<RowType> createYielder(
Sequence<QueryType> sequence,
Expand All @@ -264,36 +264,44 @@ private List<DataServerRequestDescriptor> createNextPendingRequests(
SegmentSource includeSegmentSource
)
{

final Map<DruidServerMetadata, Set<RichSegmentDescriptor>> serverVsSegmentsMap = new HashMap<>();

Iterable<ImmutableSegmentLoadInfo> immutableSegmentLoadInfos = coordinatorClient.fetchServerViewSegments(dataSource,
richSegmentDescriptors.stream()
.map(RichSegmentDescriptor::getFullInterval)
.collect(Collectors.toList())
);
Iterable<ImmutableSegmentLoadInfo> immutableSegmentLoadInfos =
coordinatorClient.fetchServerViewSegments(
dataSource,
richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList())
);

Map<SegmentDescriptor, ImmutableSegmentLoadInfo> segmentVsServerMap = new HashMap<>();
immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> {
segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo);
});

for (ImmutableSegmentLoadInfo segmentLoadInfo : immutableSegmentLoadInfos) {
for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) {
if (segmentLoadInfo.getSegment().toDescriptor().equals(richSegmentDescriptor)) {
Set<DruidServerMetadata> servers = segmentLoadInfo.getServers()
.stream()
.filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes()
.contains(druidServerMetadata.getType()))
.collect(Collectors.toSet());
if (servers.isEmpty()) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Could not find a server.");
}

DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers);
serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>());
SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor();
serverVsSegmentsMap.get(druidServerMetadata)
.add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber()));
for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) {
if (!segmentVsServerMap.containsKey(toSegmentDescriptorWithFullInterval(richSegmentDescriptor))) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Could not find a server.");
}

ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(toSegmentDescriptorWithFullInterval(richSegmentDescriptor));
if (segmentLoadInfo.getSegment().toDescriptor().equals(richSegmentDescriptor)) {
Set<DruidServerMetadata> servers = segmentLoadInfo.getServers()
.stream()
.filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes()
.contains(druidServerMetadata.getType()))
.collect(Collectors.toSet());
if (servers.isEmpty()) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Could not find a server.");
}

DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers);
serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>());
SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor();
serverVsSegmentsMap.get(druidServerMetadata)
.add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber()));
}
}

Expand Down Expand Up @@ -350,7 +358,7 @@ private List<SegmentDescriptor> checkSegmentHandoff(List<SegmentDescriptor> segm
}
}

private static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor)
static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor)
{
return new SegmentDescriptor(
richSegmentDescriptor.getFullInterval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public DataServerQueryHandlerFactory(
this.queryCancellationExecutor = ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor");
}

public DataServerQueryHandler createLoadedSegmentDataProvider(
public DataServerQueryHandler createDataServerQueryHandler(
String dataSource,
ChannelCounters channelCounters,
DataServerRequestDescriptor dataServerRequestDescriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private Iterator<DataServerQueryHandler> dataServerIterator(
)
{
return servedSegments.stream().map(
dataServerRequestDescriptor -> dataServerQueryHandlerFactory.createLoadedSegmentDataProvider(
dataServerRequestDescriptor -> dataServerQueryHandlerFactory.createDataServerQueryHandler(
dataSource,
channelCounters,
dataServerRequestDescriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
// Function to generate a processor manger for the regular processors, which run after the segmentMapFnProcessor.
final Function<List<Function<SegmentReference, SegmentReference>>, ProcessorManager<Object, Long>> processorManagerFn = segmentMapFnList -> {
final Function<SegmentReference, SegmentReference> segmentMapFunction =
CollectionUtils.getOnlyElement(
segmentMapFnList, throwable ->
DruidException.defensive("Only one segment map function expected")
);
CollectionUtils.getOnlyElement(segmentMapFnList, throwable -> DruidException.defensive("Only one segment map function expected"));
return createBaseLeafProcessorManagerWithHandoff(
stageDefinition,
inputSlices,
Expand Down Expand Up @@ -186,7 +183,7 @@ private ProcessorManager<Object, Long> createBaseLeafProcessorManagerWithHandoff
final Queue<FrameWriterFactory> frameWriterFactoryQueue,
final Queue<WritableFrameChannel> channelQueue,
final FrameContext frameContext
)
)
{
final BaseLeafFrameProcessorFactory factory = this;
// Read all base inputs in separate processors, one per processor.
Expand Down
Loading

0 comments on commit e2399cb

Please sign in to comment.