From e2399cba9f87b860158f2eb0b96244012441f567 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 20 Nov 2023 10:55:11 +0530 Subject: [PATCH] Add tests --- .../msq/exec/DataServerQueryHandler.java | 134 ++-- .../exec/DataServerQueryHandlerFactory.java | 2 +- .../input/table/SegmentsInputSliceReader.java | 2 +- .../BaseLeafFrameProcessorFactory.java | 7 +- .../msq/exec/DataServerQueryHandlerTest.java | 338 +++++----- .../druid/msq/exec/MSQLoadedSegmentTests.java | 626 +++++++++--------- .../druid/msq/test/CalciteMSQTestsHelper.java | 35 +- .../apache/druid/msq/test/MSQTestBase.java | 15 +- 8 files changed, 582 insertions(+), 577 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java index 526a0a099fa0..25408c58cc8a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java @@ -176,13 +176,15 @@ public DataServerQueryResult fetchRowsFromDataServ } } - pendingRequests.addAll(createNextPendingRequests(missingRichSegmentDescriptor, MultiStageQueryContext.getSegmentSources(query.context()))); - - retryCount++; - if (retryCount >= maxRetries) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Unable to fetch results from dataservers in [%d] retries.", retryCount); + pendingRequests = createNextPendingRequests(missingRichSegmentDescriptor, MultiStageQueryContext.getSegmentSources(query.context())); + + if (!pendingRequests.isEmpty()) { + retryCount++; + if (retryCount > maxRetries) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Unable to fetch results from dataservers in [%d] retries.", retryCount); + } } } @@ -209,41 +211,39 @@ private Yielder fetchRowsFromDataServerInternal( .collect(Collectors.toList()); try { - return - RetryUtils.retry( - () -> closer.register(createYielder( - dataServerClient.run( - Queries.withSpecificSegments( - query, - requestDescriptor.getSegments() - .stream() - .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval) - .collect(Collectors.toList()) - ), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)), - throwable -> !(throwable instanceof QueryInterruptedException - && throwable.getCause() instanceof InterruptedException), - PER_SERVER_QUERY_NUM_TRIES - ); - } - catch (QueryInterruptedException e) { - if (e.getCause() instanceof RpcException) { - // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down), - // we would also be unable to fetch the segment. - responseContext.addMissingSegments(segmentDescriptors); - return Yielders.each(Sequences.empty()); - } else { + return RetryUtils.retry( + () -> closer.register(createYielder( + dataServerClient.run( + Queries.withSpecificSegments( + query, + requestDescriptor.getSegments() + .stream() + .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval) + .collect(Collectors.toList()) + ), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)), + throwable -> !(throwable instanceof QueryInterruptedException + && throwable.getCause() instanceof InterruptedException), + PER_SERVER_QUERY_NUM_TRIES + ); + } + catch (QueryInterruptedException e) { + if (e.getCause() instanceof RpcException) { + // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down), + // we would also be unable to fetch the segment. + responseContext.addMissingSegments(segmentDescriptors); + return Yielders.each(Sequences.empty()); + } else { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); + } + } + catch (Exception e) { throw DruidException.forPersona(DruidException.Persona.OPERATOR) .ofCategory(DruidException.Category.RUNTIME_FAILURE) .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); } } - catch (Exception e) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); - } - - } private Yielder createYielder( Sequence sequence, @@ -264,36 +264,44 @@ private List createNextPendingRequests( SegmentSource includeSegmentSource ) { - final Map> serverVsSegmentsMap = new HashMap<>(); - Iterable immutableSegmentLoadInfos = coordinatorClient.fetchServerViewSegments(dataSource, - richSegmentDescriptors.stream() - .map(RichSegmentDescriptor::getFullInterval) - .collect(Collectors.toList()) - ); + Iterable immutableSegmentLoadInfos = + coordinatorClient.fetchServerViewSegments( + dataSource, + richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList()) + ); + + Map segmentVsServerMap = new HashMap<>(); + immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> { + segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo); + }); - for (ImmutableSegmentLoadInfo segmentLoadInfo : immutableSegmentLoadInfos) { - for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) { - if (segmentLoadInfo.getSegment().toDescriptor().equals(richSegmentDescriptor)) { - Set servers = segmentLoadInfo.getServers() - .stream() - .filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes() - .contains(druidServerMetadata.getType())) - .collect(Collectors.toSet()); - if (servers.isEmpty()) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Could not find a server."); - } - - DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers); - serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>()); - SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor(); - serverVsSegmentsMap.get(druidServerMetadata) - .add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber())); + for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) { + if (!segmentVsServerMap.containsKey(toSegmentDescriptorWithFullInterval(richSegmentDescriptor))) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Could not find a server."); + } + ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(toSegmentDescriptorWithFullInterval(richSegmentDescriptor)); + if (segmentLoadInfo.getSegment().toDescriptor().equals(richSegmentDescriptor)) { + Set servers = segmentLoadInfo.getServers() + .stream() + .filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes() + .contains(druidServerMetadata.getType())) + .collect(Collectors.toSet()); + if (servers.isEmpty()) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Could not find a server."); } + + DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers); + serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>()); + SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor(); + serverVsSegmentsMap.get(druidServerMetadata) + .add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber())); } } @@ -350,7 +358,7 @@ private List checkSegmentHandoff(List segm } } - private static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor) + static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor) { return new SegmentDescriptor( richSegmentDescriptor.getFullInterval(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java index 839707e6edec..1caed919ef04 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java @@ -60,7 +60,7 @@ public DataServerQueryHandlerFactory( this.queryCancellationExecutor = ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor"); } - public DataServerQueryHandler createLoadedSegmentDataProvider( + public DataServerQueryHandler createDataServerQueryHandler( String dataSource, ChannelCounters channelCounters, DataServerRequestDescriptor dataServerRequestDescriptor diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java index 7815a90d6cdf..648bd95c6c38 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java @@ -119,7 +119,7 @@ private Iterator dataServerIterator( ) { return servedSegments.stream().map( - dataServerRequestDescriptor -> dataServerQueryHandlerFactory.createLoadedSegmentDataProvider( + dataServerRequestDescriptor -> dataServerQueryHandlerFactory.createDataServerQueryHandler( dataSource, channelCounters, dataServerRequestDescriptor diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java index 12c553a7839e..09301bb63d14 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java @@ -144,10 +144,7 @@ public ProcessorsAndChannels makeProcessors( // Function to generate a processor manger for the regular processors, which run after the segmentMapFnProcessor. final Function>, ProcessorManager> processorManagerFn = segmentMapFnList -> { final Function segmentMapFunction = - CollectionUtils.getOnlyElement( - segmentMapFnList, throwable -> - DruidException.defensive("Only one segment map function expected") - ); + CollectionUtils.getOnlyElement(segmentMapFnList, throwable -> DruidException.defensive("Only one segment map function expected")); return createBaseLeafProcessorManagerWithHandoff( stageDefinition, inputSlices, @@ -186,7 +183,7 @@ private ProcessorManager createBaseLeafProcessorManagerWithHandoff final Queue frameWriterFactoryQueue, final Queue channelQueue, final FrameContext frameContext - ) + ) { final BaseLeafFrameProcessorFactory factory = this; // Read all base inputs in separate processors, one per processor. diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java index 461671100ecc..35e7c4665742 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java @@ -21,20 +21,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.discovery.DataServerClient; import org.apache.druid.discovery.DruidServiceTestUtils; import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.querykit.InputNumberDataSource; import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor; @@ -59,7 +57,6 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; -import java.io.IOException; import java.util.List; import static org.apache.druid.query.Druids.newScanQueryBuilder; @@ -75,173 +72,168 @@ @RunWith(MockitoJUnitRunner.class) public class DataServerQueryHandlerTest { -// private static final String DATASOURCE1 = "dataSource1"; -// private static final DruidServerMetadata DRUID_SERVER_1 = new DruidServerMetadata( -// "name1", -// "host1:5050", -// null, -// 100L, -// ServerType.REALTIME, -// "tier1", -// 0 -// ); -// private static final RichSegmentDescriptor SEGMENT_1 = new RichSegmentDescriptor( -// Intervals.of("2003/2004"), -// Intervals.of("2003/2004"), -// "v1", -// 1, -// ImmutableSet.of(DRUID_SERVER_1) -// ); -// private DataServerClient dataServerClient; -// private CoordinatorClient coordinatorClient; -// private ScanResultValue scanResultValue; -// private ScanQuery query; -// private LoadedSegmentDataProvider target; -// -// @Before -// public void setUp() -// { -// dataServerClient = mock(DataServerClient.class); -// coordinatorClient = mock(CoordinatorClient.class); -// scanResultValue = new ScanResultValue( -// null, -// ImmutableList.of(), -// ImmutableList.of( -// ImmutableList.of("abc", "123"), -// ImmutableList.of("ghi", "456"), -// ImmutableList.of("xyz", "789") -// ) -// ); -// query = newScanQueryBuilder() -// .dataSource(new InputNumberDataSource(1)) -// .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003/2004")))) -// .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") -// .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) -// .context(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 1)) -// .build(); -// QueryToolChestWarehouse queryToolChestWarehouse = new MapQueryToolChestWarehouse( -// ImmutableMap., QueryToolChest>builder() -// .put(ScanQuery.class, new ScanQueryQueryToolChest(null, null)) -// .build() -// ); -// target = spy( -// new LoadedSegmentDataProvider( -// DATASOURCE1, -// new ChannelCounters(), -// mock(ServiceClientFactory.class), -// coordinatorClient, -// DruidServiceTestUtils.newJsonMapper(), -// queryToolChestWarehouse, -// Execs.scheduledSingleThreaded("query-cancellation-executor") -// ) -// ); -// doReturn(dataServerClient).when(target).makeDataServerClient(any()); -// } -// -// @Test -// public void testFetchRowsFromServer() throws IOException -// { -// doReturn(Sequences.simple(ImmutableList.of(scanResultValue))).when(dataServerClient).run(any(), any(), any(), any()); -// -// Pair> dataServerQueryStatusYielderPair = target.fetchRowsFromDataServer( -// query, -// SEGMENT_1, -// ScanQueryFrameProcessor::mappingFunction, -// Closer.create() -// ); -// -// Assert.assertEquals(LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS, dataServerQueryStatusYielderPair.lhs); -// List> events = (List>) scanResultValue.getEvents(); -// Yielder yielder = dataServerQueryStatusYielderPair.rhs; -// events.forEach( -// event -> { -// Assert.assertArrayEquals(event.toArray(), yielder.get()); -// yielder.next(null); -// } -// ); -// } -// -// @Test -// public void testHandoff() throws IOException -// { -// doAnswer(invocation -> { -// ResponseContext responseContext = invocation.getArgument(1); -// responseContext.addMissingSegments(ImmutableList.of(SEGMENT_1)); -// return Sequences.empty(); -// }).when(dataServerClient).run(any(), any(), any(), any()); -// doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1); -// -// Pair> dataServerQueryStatusYielderPair = target.fetchRowsFromDataServer( -// query, -// SEGMENT_1, -// ScanQueryFrameProcessor::mappingFunction, -// Closer.create() -// ); -// -// Assert.assertEquals(LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF, dataServerQueryStatusYielderPair.lhs); -// Assert.assertNull(dataServerQueryStatusYielderPair.rhs); -// } -// -// @Test -// public void testServerNotFoundWithoutHandoffShouldThrowException() -// { -// doThrow( -// new QueryInterruptedException(new RpcException("Could not connect to server")) -// ).when(dataServerClient).run(any(), any(), any(), any()); -// -// doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1); -// -// ScanQuery queryWithRetry = query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 3)); -// -// Assert.assertThrows(DruidException.class, () -> -// target.fetchRowsFromDataServer( -// queryWithRetry, -// SEGMENT_1, -// ScanQueryFrameProcessor::mappingFunction, -// Closer.create() -// ) -// ); -// -// verify(dataServerClient, times(3)).run(any(), any(), any(), any()); -// } -// -// @Test -// public void testServerNotFoundButHandoffShouldReturnWithStatus() throws IOException -// { -// doThrow( -// new QueryInterruptedException(new RpcException("Could not connect to server")) -// ).when(dataServerClient).run(any(), any(), any(), any()); -// -// doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1); -// -// Pair> dataServerQueryStatusYielderPair = target.fetchRowsFromDataServer( -// query, -// SEGMENT_1, -// ScanQueryFrameProcessor::mappingFunction, -// Closer.create() -// ); -// -// Assert.assertEquals(LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF, dataServerQueryStatusYielderPair.lhs); -// Assert.assertNull(dataServerQueryStatusYielderPair.rhs); -// } -// -// @Test -// public void testQueryFail() -// { -// doAnswer(invocation -> { -// ResponseContext responseContext = invocation.getArgument(1); -// responseContext.addMissingSegments(ImmutableList.of(SEGMENT_1)); -// return Sequences.empty(); -// }).when(dataServerClient).run(any(), any(), any(), any()); -// doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1); -// -// Assert.assertThrows(IOE.class, () -> -// target.fetchRowsFromDataServer( -// query, -// SEGMENT_1, -// ScanQueryFrameProcessor::mappingFunction, -// Closer.create() -// ) -// ); -// } + private static final String DATASOURCE1 = "dataSource1"; + private static final DruidServerMetadata DRUID_SERVER_1 = new DruidServerMetadata( + "name1", + "host1:5050", + null, + 100L, + ServerType.REALTIME, + "tier1", + 0 + ); + private static final RichSegmentDescriptor SEGMENT_1 = new RichSegmentDescriptor( + Intervals.of("2003/2004"), + Intervals.of("2003/2004"), + "v1", + 1 + ); + private DataServerClient dataServerClient; + private CoordinatorClient coordinatorClient; + private ScanResultValue scanResultValue; + private ScanQuery query; + private DataServerQueryHandler target; + + @Before + public void setUp() + { + dataServerClient = mock(DataServerClient.class); + coordinatorClient = mock(CoordinatorClient.class); + scanResultValue = new ScanResultValue( + null, + ImmutableList.of(), + ImmutableList.of( + ImmutableList.of("abc", "123"), + ImmutableList.of("ghi", "456"), + ImmutableList.of("xyz", "789") + ) + ); + query = newScanQueryBuilder() + .dataSource(new InputNumberDataSource(1)) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003/2004")))) + .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 1)) + .build(); + QueryToolChestWarehouse queryToolChestWarehouse = new MapQueryToolChestWarehouse( + ImmutableMap., QueryToolChest>builder() + .put(ScanQuery.class, new ScanQueryQueryToolChest(null, null)) + .build() + ); + target = spy( + new DataServerQueryHandler( + DATASOURCE1, + new ChannelCounters(), + mock(ServiceClientFactory.class), + coordinatorClient, + DruidServiceTestUtils.newJsonMapper(), + queryToolChestWarehouse, + Execs.scheduledSingleThreaded("query-cancellation-executor"), + new DataServerRequestDescriptor(DRUID_SERVER_1, ImmutableList.of(SEGMENT_1)) + ) + ); + doReturn(dataServerClient).when(target).makeDataServerClient(any()); + } + + @Test + public void testFetchRowsFromServer() + { + doReturn(Sequences.simple(ImmutableList.of(scanResultValue))).when(dataServerClient).run(any(), any(), any(), any()); + + DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( + query, + ScanQueryFrameProcessor::mappingFunction, + Closer.create() + ); + + Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty()); + List> events = (List>) scanResultValue.getEvents(); + Yielder yielder = dataServerQueryResult.getResultsYielders().get(0); + events.forEach( + event -> { + Assert.assertArrayEquals(event.toArray(), yielder.get()); + yielder.next(null); + } + ); + } + + @Test + public void testHandoff() + { + doAnswer(invocation -> { + ResponseContext responseContext = invocation.getArgument(1); + responseContext.addMissingSegments(ImmutableList.of(DataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1))); + return Sequences.empty(); + }).when(dataServerClient).run(any(), any(), any(), any()); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, DataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); + + DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( + query, + ScanQueryFrameProcessor::mappingFunction, + Closer.create() + ); + + Assert.assertEquals(ImmutableList.of(SEGMENT_1), dataServerQueryResult.getHandedOffSegments().getDescriptors()); + Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty()); + } + + @Test + public void testServerNotFoundWithoutHandoffShouldThrowException() + { + doThrow( + new QueryInterruptedException(new RpcException("Could not connect to server")) + ).when(dataServerClient).run(any(), any(), any(), any()); + + doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, DataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); + + ScanQuery queryWithRetry = query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 3)); + + Assert.assertThrows(DruidException.class, () -> + target.fetchRowsFromDataServer( + queryWithRetry, + ScanQueryFrameProcessor::mappingFunction, + Closer.create() + ) + ); + + verify(dataServerClient, times(5)).run(any(), any(), any(), any()); + } + + @Test + public void testServerNotFoundButHandoffShouldReturnWithStatus() + { + doThrow( + new QueryInterruptedException(new RpcException("Could not connect to server")) + ).when(dataServerClient).run(any(), any(), any(), any()); + + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, DataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); + + DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( + query, + ScanQueryFrameProcessor::mappingFunction, + Closer.create() + ); + + Assert.assertEquals(ImmutableList.of(SEGMENT_1), dataServerQueryResult.getHandedOffSegments().getDescriptors()); + Assert.assertTrue(dataServerQueryResult.getResultsYielders().get(0).isDone()); + } + + @Test + public void testQueryFail() + { + doAnswer(invocation -> { + ResponseContext responseContext = invocation.getArgument(1); + responseContext.addMissingSegments(ImmutableList.of(SEGMENT_1)); + return Sequences.empty(); + }).when(dataServerClient).run(any(), any(), any(), any()); + doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, SEGMENT_1); + + Assert.assertThrows(DruidException.class, () -> + target.fetchRowsFromDataServer( + query, + ScanQueryFrameProcessor::mappingFunction, + Closer.create() + ) + ); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java index c4682bae2345..4acc7699622d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java @@ -25,7 +25,6 @@ import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielders; @@ -55,7 +54,6 @@ import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.Map; import static org.mockito.ArgumentMatchers.any; @@ -65,310 +63,322 @@ public class MSQLoadedSegmentTests extends MSQTestBase { -// public static final Map REALTIME_QUERY_CTX = -// ImmutableMap.builder() -// .putAll(DEFAULT_MSQ_CONTEXT) -// .put(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, SegmentSource.REALTIME.name()) -// .build(); -// public static final DataSegment LOADED_SEGMENT_1 = -// DataSegment.builder() -// .dataSource(CalciteTests.DATASOURCE1) -// .interval(Intervals.of("2003-01-01T00:00:00.000Z/2004-01-01T00:00:00.000Z")) -// .version("1") -// .shardSpec(new LinearShardSpec(0)) -// .size(0) -// .build(); -// -// public static final DruidServerMetadata DATA_SERVER_1 = new DruidServerMetadata( -// "TestDataServer", -// "hostName:9092", -// null, -// 2, -// ServerType.REALTIME, -// "tier1", -// 2 -// ); -// -// @Before -// public void setUp() -// { -// loadedSegmentsMetadata.add(new ImmutableSegmentLoadInfo(LOADED_SEGMENT_1, ImmutableSet.of(DATA_SERVER_1))); -// } -// -// @Test -// public void testSelectWithLoadedSegmentsOnFoo() throws IOException -// { -// RowSignature resultSignature = RowSignature.builder() -// .add("cnt", ColumnType.LONG) -// .add("dim1", ColumnType.STRING) -// .build(); -// -// doReturn( -// Pair.of( -// LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS, -// Yielders.each( -// Sequences.simple( -// ImmutableList.of( -// new Object[]{1L, "qwe"}, -// new Object[]{1L, "tyu"} -// ) -// ) -// ) -// ) -// ) -// .when(loadedSegmentDataProvider) -// .fetchRowsFromDataServer(any(), any(), any(), any()); -// -// testSelectQuery() -// .setSql("select cnt, dim1 from foo") -// .setExpectedMSQSpec( -// MSQSpec.builder() -// .query( -// newScanQueryBuilder() -// .dataSource(CalciteTests.DATASOURCE1) -// .intervals(querySegmentSpec(Filtration.eternity())) -// .columns("cnt", "dim1") -// .context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature)) -// .build() -// ) -// .columnMappings(ColumnMappings.identity(resultSignature)) -// .tuningConfig(MSQTuningConfig.defaultConfig()) -// .destination(TaskReportMSQDestination.INSTANCE) -// .build() -// ) -// .setQueryContext(REALTIME_QUERY_CTX) -// .setExpectedRowSignature(resultSignature) -// .setExpectedResultRows(ImmutableList.of( -// new Object[]{1L, ""}, -// new Object[]{1L, "qwe"}, -// new Object[]{1L, "10.1"}, -// new Object[]{1L, "tyu"}, -// new Object[]{1L, "2"}, -// new Object[]{1L, "1"}, -// new Object[]{1L, "def"}, -// new Object[]{1L, "abc"} -// )) -// .verifyResults(); -// } -// -// @Test -// public void testSelectWithLoadedSegmentsOnFooWithOrderBy() throws IOException -// { -// RowSignature resultSignature = RowSignature.builder() -// .add("cnt", ColumnType.LONG) -// .add("dim1", ColumnType.STRING) -// .build(); -// -// doAnswer( -// invocationOnMock -> { -// ScanQuery query = invocationOnMock.getArgument(0); -// ScanQuery.verifyOrderByForNativeExecution(query); -// Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit()); -// return Pair.of( -// LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS, -// Yielders.each( -// Sequences.simple( -// ImmutableList.of( -// new Object[]{1L, "qwe"}, -// new Object[]{1L, "tyu"} -// ) -// ) -// ) -// ); -// } -// -// ) -// .when(loadedSegmentDataProvider) -// .fetchRowsFromDataServer(any(), any(), any(), any()); -// -// testSelectQuery() -// .setSql("select cnt, dim1 from foo order by dim1") -// .setExpectedMSQSpec( -// MSQSpec.builder() -// .query( -// newScanQueryBuilder() -// .dataSource(CalciteTests.DATASOURCE1) -// .intervals(querySegmentSpec(Filtration.eternity())) -// .columns("cnt", "dim1") -// .orderBy(ImmutableList.of(new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING))) -// .context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature)) -// .build() -// ) -// .columnMappings(ColumnMappings.identity(resultSignature)) -// .tuningConfig(MSQTuningConfig.defaultConfig()) -// .destination(TaskReportMSQDestination.INSTANCE) -// .build() -// ) -// .setQueryContext(REALTIME_QUERY_CTX) -// .setExpectedRowSignature(resultSignature) -// .setExpectedResultRows(ImmutableList.of( -// new Object[]{1L, ""}, -// new Object[]{1L, "1"}, -// new Object[]{1L, "10.1"}, -// new Object[]{1L, "2"}, -// new Object[]{1L, "abc"}, -// new Object[]{1L, "def"}, -// new Object[]{1L, "qwe"}, -// new Object[]{1L, "tyu"} -// )) -// .verifyResults(); -// } -// -// @Test -// public void testGroupByWithLoadedSegmentsOnFoo() throws IOException -// { -// RowSignature rowSignature = RowSignature.builder() -// .add("cnt", ColumnType.LONG) -// .add("cnt1", ColumnType.LONG) -// .build(); -// -// doReturn( -// Pair.of(LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS, -// Yielders.each( -// Sequences.simple( -// ImmutableList.of( -// ResultRow.of(1L, 2L) -// ) -// ) -// ) -// ) -// ) -// .when(loadedSegmentDataProvider) -// .fetchRowsFromDataServer(any(), any(), any(), any()); -// -// testSelectQuery() -// .setSql("select cnt,count(*) as cnt1 from foo group by cnt") -// .setExpectedMSQSpec( -// MSQSpec.builder() -// .query(GroupByQuery.builder() -// .setDataSource(CalciteTests.DATASOURCE1) -// .setInterval(querySegmentSpec(Filtration -// .eternity())) -// .setGranularity(Granularities.ALL) -// .setDimensions(dimensions( -// new DefaultDimensionSpec( -// "cnt", -// "d0", -// ColumnType.LONG -// ) -// )) -// .setAggregatorSpecs(aggregators(new CountAggregatorFactory( -// "a0"))) -// .setContext(REALTIME_QUERY_CTX) -// .build()) -// .columnMappings( -// new ColumnMappings(ImmutableList.of( -// new ColumnMapping("d0", "cnt"), -// new ColumnMapping("a0", "cnt1"))) -// ) -// .tuningConfig(MSQTuningConfig.defaultConfig()) -// .destination(TaskReportMSQDestination.INSTANCE) -// .build() -// ) -// .setQueryContext(REALTIME_QUERY_CTX) -// .setExpectedRowSignature(rowSignature) -// .setExpectedResultRows(ImmutableList.of(new Object[]{1L, 8L})) -// .verifyResults(); -// } -// -// @Test -// public void testGroupByWithOnlyLoadedSegmentsOnFoo() throws IOException -// { -// RowSignature rowSignature = RowSignature.builder() -// .add("cnt", ColumnType.LONG) -// .add("cnt1", ColumnType.LONG) -// .build(); -// -// doReturn( -// Pair.of(LoadedSegmentDataProvider.DataServerQueryStatus.SUCCESS, -// Yielders.each( -// Sequences.simple( -// ImmutableList.of( -// ResultRow.of(1L, 2L))))) -// ).when(loadedSegmentDataProvider) -// .fetchRowsFromDataServer(any(), any(), any(), any()); -// -// testSelectQuery() -// .setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP '2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 00:00:00') group by cnt") -// .setExpectedMSQSpec( -// MSQSpec.builder() -// .query(GroupByQuery.builder() -// .setDataSource(CalciteTests.DATASOURCE1) -// .setInterval(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")) -// .setGranularity(Granularities.ALL) -// .setDimensions(dimensions( -// new DefaultDimensionSpec( -// "cnt", -// "d0", -// ColumnType.LONG -// ) -// )) -// .setQuerySegmentSpec(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")))) -// .setAggregatorSpecs(aggregators(new CountAggregatorFactory( -// "a0"))) -// .setContext(REALTIME_QUERY_CTX) -// .build()) -// .columnMappings( -// new ColumnMappings(ImmutableList.of( -// new ColumnMapping("d0", "cnt"), -// new ColumnMapping("a0", "cnt1"))) -// ) -// .tuningConfig(MSQTuningConfig.defaultConfig()) -// .destination(TaskReportMSQDestination.INSTANCE) -// .build() -// ) -// .setQueryContext(REALTIME_QUERY_CTX) -// .setExpectedRowSignature(rowSignature) -// .setExpectedResultRows(ImmutableList.of(new Object[]{1L, 2L})) -// .verifyResults(); -// } -// -// @Test -// public void testDataServerQueryFailedShouldFail() throws IOException -// { -// RowSignature rowSignature = RowSignature.builder() -// .add("cnt", ColumnType.LONG) -// .add("cnt1", ColumnType.LONG) -// .build(); -// -// doThrow( -// new ISE("Segment could not be found on data server, but segment was not handed off.") -// ) -// .when(loadedSegmentDataProvider) -// .fetchRowsFromDataServer(any(), any(), any(), any()); -// -// testSelectQuery() -// .setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP '2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 00:00:00') group by cnt") -// .setExpectedMSQSpec( -// MSQSpec.builder() -// .query(GroupByQuery.builder() -// .setDataSource(CalciteTests.DATASOURCE1) -// .setInterval(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")) -// .setGranularity(Granularities.ALL) -// .setDimensions(dimensions( -// new DefaultDimensionSpec( -// "cnt", -// "d0", -// ColumnType.LONG -// ) -// )) -// .setQuerySegmentSpec(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")))) -// .setAggregatorSpecs(aggregators(new CountAggregatorFactory( -// "a0"))) -// .setContext(REALTIME_QUERY_CTX) -// .build()) -// .columnMappings( -// new ColumnMappings(ImmutableList.of( -// new ColumnMapping("d0", "cnt"), -// new ColumnMapping("a0", "cnt1"))) -// ) -// .tuningConfig(MSQTuningConfig.defaultConfig()) -// .destination(TaskReportMSQDestination.INSTANCE) -// .build() -// ) -// .setQueryContext(REALTIME_QUERY_CTX) -// .setExpectedRowSignature(rowSignature) -// .setExpectedExecutionErrorMatcher(CoreMatchers.instanceOf(ISE.class)) -// .verifyExecutionError(); -// } + public static final Map REALTIME_QUERY_CTX = + ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, SegmentSource.REALTIME.name()) + .build(); + public static final DataSegment LOADED_SEGMENT_1 = + DataSegment.builder() + .dataSource(CalciteTests.DATASOURCE1) + .interval(Intervals.of("2003-01-01T00:00:00.000Z/2004-01-01T00:00:00.000Z")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + public static final DruidServerMetadata DATA_SERVER_1 = new DruidServerMetadata( + "TestDataServer", + "hostName:9092", + null, + 2, + ServerType.REALTIME, + "tier1", + 2 + ); + + @Before + public void setUp() + { + loadedSegmentsMetadata.add(new ImmutableSegmentLoadInfo(LOADED_SEGMENT_1, ImmutableSet.of(DATA_SERVER_1))); + } + + @Test + public void testSelectWithLoadedSegmentsOnFoo() + { + RowSignature resultSignature = RowSignature.builder() + .add("cnt", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .build(); + + doReturn( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + ) + ) + )), + ImmutableList.of(), + "foo" + )).when(dataServerQueryHandler) + .fetchRowsFromDataServer(any(), any(), any()); + + testSelectQuery() + .setSql("select cnt, dim1 from foo") + .setExpectedMSQSpec( + MSQSpec.builder() + .query( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("cnt", "dim1") + .context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature)) + .build() + ) + .columnMappings(ColumnMappings.identity(resultSignature)) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setQueryContext(REALTIME_QUERY_CTX) + .setExpectedRowSignature(resultSignature) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, ""}, + new Object[]{1L, "tyu"}, + new Object[]{1L, "10.1"}, + new Object[]{1L, "2"}, + new Object[]{1L, "1"}, + new Object[]{1L, "def"}, + new Object[]{1L, "abc"} + )) + .verifyResults(); + } + + @Test + public void testSelectWithLoadedSegmentsOnFooWithOrderBy() + { + RowSignature resultSignature = RowSignature.builder() + .add("cnt", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .build(); + + doAnswer( + invocationOnMock -> { + ScanQuery query = invocationOnMock.getArgument(0); + ScanQuery.verifyOrderByForNativeExecution(query); + Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit()); + return new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + ) + ) + )), + ImmutableList.of(), + "foo" + ); + } + ) + .when(dataServerQueryHandler) + .fetchRowsFromDataServer(any(), any(), any()); + + testSelectQuery() + .setSql("select cnt, dim1 from foo order by dim1") + .setExpectedMSQSpec( + MSQSpec.builder() + .query( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("cnt", "dim1") + .orderBy(ImmutableList.of(new ScanQuery.OrderBy("dim1", ScanQuery.Order.ASCENDING))) + .context(defaultScanQueryContext(REALTIME_QUERY_CTX, resultSignature)) + .build() + ) + .columnMappings(ColumnMappings.identity(resultSignature)) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setQueryContext(REALTIME_QUERY_CTX) + .setExpectedRowSignature(resultSignature) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1L, ""}, + new Object[]{1L, "1"}, + new Object[]{1L, "10.1"}, + new Object[]{1L, "2"}, + new Object[]{1L, "abc"}, + new Object[]{1L, "def"}, + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + )) + .verifyResults(); + } + + @Test + public void testGroupByWithLoadedSegmentsOnFoo() + { + RowSignature rowSignature = RowSignature.builder() + .add("cnt", ColumnType.LONG) + .add("cnt1", ColumnType.LONG) + .build(); + + doReturn( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + ResultRow.of(1L, 2L) + ) + ) + )), + ImmutableList.of(), + "foo" + ) + ) + .when(dataServerQueryHandler) + .fetchRowsFromDataServer(any(), any(), any()); + + testSelectQuery() + .setSql("select cnt,count(*) as cnt1 from foo group by cnt") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration + .eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions( + new DefaultDimensionSpec( + "cnt", + "d0", + ColumnType.LONG + ) + )) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory( + "a0"))) + .setContext(REALTIME_QUERY_CTX) + .build()) + .columnMappings( + new ColumnMappings(ImmutableList.of( + new ColumnMapping("d0", "cnt"), + new ColumnMapping("a0", "cnt1"))) + ) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setQueryContext(REALTIME_QUERY_CTX) + .setExpectedRowSignature(rowSignature) + .setExpectedResultRows(ImmutableList.of(new Object[]{1L, 8L})) + .verifyResults(); + } + + @Test + public void testGroupByWithOnlyLoadedSegmentsOnFoo() + { + RowSignature rowSignature = RowSignature.builder() + .add("cnt", ColumnType.LONG) + .add("cnt1", ColumnType.LONG) + .build(); + + doReturn( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + ResultRow.of(1L, 2L) + ) + ) + )), + ImmutableList.of(), + "foo" + ) + ) + .when(dataServerQueryHandler) + .fetchRowsFromDataServer(any(), any(), any()); + + testSelectQuery() + .setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP '2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 00:00:00') group by cnt") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions( + new DefaultDimensionSpec( + "cnt", + "d0", + ColumnType.LONG + ) + )) + .setQuerySegmentSpec(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")))) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory( + "a0"))) + .setContext(REALTIME_QUERY_CTX) + .build()) + .columnMappings( + new ColumnMappings(ImmutableList.of( + new ColumnMapping("d0", "cnt"), + new ColumnMapping("a0", "cnt1"))) + ) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setQueryContext(REALTIME_QUERY_CTX) + .setExpectedRowSignature(rowSignature) + .setExpectedResultRows(ImmutableList.of(new Object[]{1L, 2L})) + .verifyResults(); + } + + @Test + public void testDataServerQueryFailedShouldFail() + { + RowSignature rowSignature = RowSignature.builder() + .add("cnt", ColumnType.LONG) + .add("cnt1", ColumnType.LONG) + .build(); + + doThrow( + new ISE("Segment could not be found on data server, but segment was not handed off.") + ) + .when(dataServerQueryHandler) + .fetchRowsFromDataServer(any(), any(), any()); + + testSelectQuery() + .setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP '2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 00:00:00') group by cnt") + .setExpectedMSQSpec( + MSQSpec.builder() + .query(GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions( + new DefaultDimensionSpec( + "cnt", + "d0", + ColumnType.LONG + ) + )) + .setQuerySegmentSpec(new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2003-01-01T00:00:00.000Z/2005-01-01T00:00:00.000Z")))) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory( + "a0"))) + .setContext(REALTIME_QUERY_CTX) + .build()) + .columnMappings( + new ColumnMappings(ImmutableList.of( + new ColumnMapping("d0", "cnt"), + new ColumnMapping("a0", "cnt1"))) + ) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setQueryContext(REALTIME_QUERY_CTX) + .setExpectedRowSignature(rowSignature) + .setExpectedExecutionErrorMatcher(CoreMatchers.instanceOf(ISE.class)) + .verifyExecutionError(); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 769f48fe0bad..a6b5f3ac3a17 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.msq.exec.DataServerQueryHandler; import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; import org.apache.druid.msq.guice.MSQIndexingModule; @@ -83,6 +84,7 @@ import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import javax.annotation.Nullable; import java.io.IOException; @@ -100,7 +102,10 @@ import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS_LOTS_OF_COLUMNS; -import static org.mockito.Mockito.mock; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; /** * Helper class aiding in wiring up the Guice bindings required for MSQ engine to work with the Calcite's tests @@ -170,7 +175,7 @@ public String getFormatString() binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer()); binder.bind(DataSegmentProvider.class) .toInstance((segmentId, channelCounters, isReindex) -> getSupplierForSegment(segmentId)); - binder.bind(DataServerQueryHandlerFactory.class).toInstance(getTestLoadedSegmentDataProviderFactory()); + binder.bind(DataServerQueryHandlerFactory.class).toInstance(getTestDataServerQueryHandlerFactory()); GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig(); GroupingEngine groupingEngine = GroupByQueryRunnerTest.makeQueryRunnerFactory( @@ -188,23 +193,17 @@ public String getFormatString() ); } - private static DataServerQueryHandlerFactory getTestLoadedSegmentDataProviderFactory() + private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactory() { -// // Currently, there is no metadata in this test for loaded segments. Therefore, this should not be called. -// // In the future, if this needs to be supported, mocks for LoadedSegmentDataProvider should be added like -// // org.apache.druid.msq.exec.MSQLoadedSegmentTests. -// LoadedSegmentDataProviderFactory mockFactory = Mockito.mock(LoadedSegmentDataProviderFactory.class); -// LoadedSegmentDataProvider loadedSegmentDataProvider = Mockito.mock(LoadedSegmentDataProvider.class); -// try { -// doThrow(new AssertionError("Test does not support loaded segment query")) -// .when(loadedSegmentDataProvider).fetchRowsFromDataServer(any(), any(), any(), any()); -// doReturn(loadedSegmentDataProvider).when(mockFactory).createLoadedSegmentDataProvider(anyString(), any()); -// } -// catch (IOException e) { -// throw new RuntimeException(e); -// } -// return mockFactory; - return mock(DataServerQueryHandlerFactory.class); + // Currently, there is no metadata in this test for loaded segments. Therefore, this should not be called. + // In the future, if this needs to be supported, mocks for LoadedSegmentDataProvider should be added like + // org.apache.druid.msq.exec.MSQLoadedSegmentTests. + DataServerQueryHandlerFactory mockFactory = Mockito.mock(DataServerQueryHandlerFactory.class); + DataServerQueryHandler dataServerQueryHandler = Mockito.mock(DataServerQueryHandler.class); + doThrow(new AssertionError("Test does not support loaded segment query")) + .when(dataServerQueryHandler).fetchRowsFromDataServer(any(), any(), any()); + doReturn(dataServerQueryHandler).when(mockFactory).createDataServerQueryHandler(anyString(), any(), any()); + return mockFactory; } private static Supplier> getSupplierForSegment(SegmentId segmentId) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 9ca04913815b..ce89706dffe3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -427,7 +427,7 @@ public String getFormatString() .toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool"))); binder.bind(DataSegmentProvider.class) .toInstance((segmentId, channelCounters, isReindex) -> getSupplierForSegment(segmentId)); - binder.bind(DataServerQueryHandlerFactory.class).toInstance(getTestLoadedSegmentDataProviderFactory()); + binder.bind(DataServerQueryHandlerFactory.class).toInstance(getTestDataServerQueryHandlerFactory()); binder.bind(IndexIO.class).toInstance(indexIO); binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker()); @@ -582,14 +582,13 @@ protected long[] createExpectedFrameArray(int length, int value) return array; } - private DataServerQueryHandlerFactory getTestLoadedSegmentDataProviderFactory() + private DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactory() { -// LoadedSegmentDataProviderFactory mockFactory = Mockito.mock(LoadedSegmentDataProviderFactory.class); -// doReturn(loadedSegmentDataProvider) -// .when(mockFactory) -// .createLoadedSegmentDataProvider(anyString(), any()); -// return mockFactory; - return mock(DataServerQueryHandlerFactory.class); + DataServerQueryHandlerFactory mockFactory = Mockito.mock(DataServerQueryHandlerFactory.class); + doReturn(dataServerQueryHandler) + .when(mockFactory) + .createDataServerQueryHandler(anyString(), any(), any()); + return mockFactory; } @Nonnull