diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 7346ee6722..b6643f3209 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -23,6 +23,7 @@ public enum Key { SQL_SLOWLOG("plugins.sql.slowlog"), SQL_CURSOR_KEEP_ALIVE("plugins.sql.cursor.keep_alive"), SQL_DELETE_ENABLED("plugins.sql.delete.enabled"), + SQL_PAGINATION_API_SEARCH_AFTER("plugins.sql.pagination.api"), /** PPL Settings. */ PPL_ENABLED("plugins.ppl.enabled"), diff --git a/docs/dev/opensearch-pagination.md b/docs/dev/opensearch-pagination.md index 4982b13d7f..1919af30fe 100644 --- a/docs/dev/opensearch-pagination.md +++ b/docs/dev/opensearch-pagination.md @@ -477,4 +477,44 @@ Response: } +``` + +#### plugins.sql.pagination.api + +This setting controls whether the SQL search queries in OpenSearch use Point-In-Time (PIT) with search_after or the traditional scroll mechanism for fetching paginated results. + +- Default Value: true +- Possible Values: true or false +- When set to true, the search query in the background uses PIT with search_after instead of scroll to retrieve paginated results. The Cursor Id returned to the user will encode relevant pagination query-related information, which will be used to fetch the subsequent pages of results. +- This setting is node-level. +- This setting can be updated dynamically. + +Example: + +``` +>> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings -d '{ + "transient" : { + "plugins.sql.pagination.api" : "true" + } +}' +``` + +Response: + +``` +{ + "acknowledged" : true, + "persistent" : { }, + "transient" : { + "plugins" : { + "sql" : { + "pagination" : { + "api" : "true" + } + } + } + } +} + + ``` diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 6b24e41f87..48e83eab9a 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -196,6 +196,50 @@ Result set:: Note: the legacy settings of ``opendistro.sql.cursor.keep_alive`` is deprecated, it will fallback to the new settings if you request an update with the legacy name. +plugins.sql.pagination.api +================================ + +Description +----------- + +This setting controls whether the SQL search queries in OpenSearch use Point-In-Time (PIT) with search_after or the traditional scroll mechanism for fetching paginated results. + +1. Default Value: true +2. Possible Values: true or false +3. When set to true, the search query in the background uses PIT with search_after instead of scroll to retrieve paginated results. The Cursor Id returned to the user will encode relevant pagination query-related information, which will be used to fetch the subsequent pages of results. +4. This setting is node-level. +5. This setting can be updated dynamically. + + +Example +------- + +You can update the setting with a new value like this. + +SQL query:: + + >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings -d '{ + "transient" : { + "plugins.sql.pagination.api" : "true" + } + }' + +Result set:: + + { + "acknowledged" : true, + "persistent" : { }, + "transient" : { + "plugins" : { + "sql" : { + "pagination" : { + "api" : "true" + } + } + } + } + } + plugins.query.size_limit =========================== diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/JoinIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/JoinIT.java index 8019454b77..8c2ea96474 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/JoinIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/JoinIT.java @@ -288,6 +288,8 @@ public void hintMultiSearchCanRunFewTimesNL() throws IOException { Assert.assertThat(hits.length(), equalTo(42)); } + // TODO: Fix joinWithGeoIntersectNL test when SQL_PAGINATION_API_SEARCH_AFTER is true + @Ignore @Test public void joinWithGeoIntersectNL() throws IOException { @@ -455,7 +457,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderGT() throws IOExcep "SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a" + " JOIN %s d on a.age < d.age WHERE (d.firstname = 'Lynn' OR d.firstname =" + " 'Obrien') AND a.firstname = 'Mcgee'", - TEST_INDEX_PEOPLE, + TEST_INDEX_PEOPLE2, TEST_INDEX_ACCOUNT); JSONObject result = executeQuery(query); @@ -501,7 +503,7 @@ public void joinParseCheckSelectedFieldsSplitNLConditionOrderLT() throws IOExcep "SELECT /*! USE_NL*/ a.firstname, a.lastname, a.gender, d.firstname, d.age FROM %s a" + " JOIN %s d on a.age > d.age WHERE (d.firstname = 'Sandoval' OR d.firstname =" + " 'Hewitt') AND a.firstname = 'Fulton'", - TEST_INDEX_PEOPLE, + TEST_INDEX_PEOPLE2, TEST_INDEX_ACCOUNT); JSONObject result = executeQuery(query); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java index 62a6d63ef7..2b80575e1e 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java @@ -5,13 +5,96 @@ package org.opensearch.sql.legacy.executor; +import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; +import static org.opensearch.search.sort.SortOrder.ASC; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.sql.legacy.domain.Select; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; +import org.opensearch.sql.legacy.pit.PointInTimeHandler; + +/** Executor for search requests with pagination. */ +public abstract class ElasticHitsExecutor { + protected static final Logger LOG = LogManager.getLogger(); + protected PointInTimeHandler pit; + protected Client client; + + /** + * Executes search request + * + * @throws IOException If an input or output exception occurred + * @throws SqlParseException If parsing exception occurred + */ + protected abstract void run() throws IOException, SqlParseException; + + /** + * Get search hits after execution + * + * @return Search hits + */ + protected abstract SearchHits getHits(); + + /** + * Get response for search request with pit/scroll + * + * @param request search request + * @param select sql select + * @param size fetch size + * @param previousResponse response for previous request + * @param pit point in time + * @return search response for subsequent request + */ + public SearchResponse getResponseWithHits( + SearchRequestBuilder request, + Select select, + int size, + SearchResponse previousResponse, + PointInTimeHandler pit) { + // Set Size + request.setSize(size); + SearchResponse responseWithHits; -/** Created by Eliran on 21/8/2016. */ -public interface ElasticHitsExecutor { - void run() throws IOException, SqlParseException; + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + // Set sort field for search_after + boolean ordered = select.isOrderdSelect(); + if (!ordered) { + request.addSort(DOC_FIELD_NAME, ASC); + } + // Set PIT + request.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + // from and size is alternate method to paginate result. + // If select has from clause, search after is not required. + if (previousResponse != null && select.getFrom().isEmpty()) { + request.searchAfter(previousResponse.getHits().getSortFields()); + } + responseWithHits = request.get(); + } else { + // Set scroll + TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE); + if (previousResponse != null) { + responseWithHits = + client + .prepareSearchScroll(previousResponse.getScrollId()) + .setScroll(keepAlive) + .execute() + .actionGet(); + } else { + request.setScroll(keepAlive); + responseWithHits = request.get(); + } + } - SearchHits getHits(); + return responseWithHits; + } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java index f0ffafc470..061868c9b5 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.executor.join; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -12,15 +14,12 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import java.util.stream.Stream; import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; -import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.document.DocumentField; -import org.opensearch.common.unit.TimeValue; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.mapper.MapperService; @@ -28,11 +27,11 @@ import org.opensearch.rest.RestChannel; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.search.sort.FieldSortBuilder; -import org.opensearch.search.sort.SortOrder; import org.opensearch.sql.legacy.domain.Field; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.ElasticHitsExecutor; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder; import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder; import org.opensearch.sql.legacy.query.join.JoinRequestBuilder; @@ -41,16 +40,16 @@ import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder; /** Created by Eliran on 15/9/2015. */ -public abstract class ElasticJoinExecutor implements ElasticHitsExecutor { - private static final Logger LOG = LogManager.getLogger(); +public abstract class ElasticJoinExecutor extends ElasticHitsExecutor { protected List results; // Keep list to avoid copy to new array in SearchHits protected MetaSearchResult metaResults; protected final int MAX_RESULTS_ON_ONE_FETCH = 10000; private Set aliasesOnReturn; private boolean allFieldsReturn; + protected String[] indices; - protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) { + protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) { metaResults = new MetaSearchResult(); aliasesOnReturn = new HashSet<>(); List firstTableReturnedField = requestBuilder.getFirstTable().getReturnedFields(); @@ -58,6 +57,8 @@ protected ElasticJoinExecutor(JoinRequestBuilder requestBuilder) { allFieldsReturn = (firstTableReturnedField == null || firstTableReturnedField.size() == 0) && (secondTableReturnedField == null || secondTableReturnedField.size() == 0); + indices = getIndices(requestBuilder); + this.client = client; } public void sendResponse(RestChannel channel) throws IOException { @@ -85,10 +86,22 @@ public void sendResponse(RestChannel channel) throws IOException { } public void run() throws IOException, SqlParseException { - long timeBefore = System.currentTimeMillis(); - results = innerRun(); - long joinTimeInMilli = System.currentTimeMillis() - timeBefore; - this.metaResults.setTookImMilli(joinTimeInMilli); + try { + long timeBefore = System.currentTimeMillis(); + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + pit = new PointInTimeHandlerImpl(client, indices); + pit.create(); + } + results = innerRun(); + long joinTimeInMilli = System.currentTimeMillis() - timeBefore; + this.metaResults.setTookImMilli(joinTimeInMilli); + } catch (Exception e) { + LOG.error("Failed during join query run.", e); + } finally { + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + pit.delete(); + } + } } protected abstract List innerRun() throws IOException, SqlParseException; @@ -103,7 +116,7 @@ public SearchHits getHits() { public static ElasticJoinExecutor createJoinExecutor( Client client, SqlElasticRequestBuilder requestBuilder) { if (requestBuilder instanceof HashJoinQueryPlanRequestBuilder) { - return new QueryPlanElasticExecutor((HashJoinQueryPlanRequestBuilder) requestBuilder); + return new QueryPlanElasticExecutor(client, (HashJoinQueryPlanRequestBuilder) requestBuilder); } else if (requestBuilder instanceof HashJoinElasticRequestBuilder) { HashJoinElasticRequestBuilder hashJoin = (HashJoinElasticRequestBuilder) requestBuilder; return new HashJoinElasticExecutor(client, hashJoin); @@ -256,23 +269,22 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) { this.metaResults.updateTimeOut(searchResponse.isTimedOut()); } - protected SearchResponse scrollOneTimeWithMax( - Client client, TableInJoinRequestBuilder tableRequest) { - SearchRequestBuilder scrollRequest = - tableRequest - .getRequestBuilder() - .setScroll(new TimeValue(60000)) - .setSize(MAX_RESULTS_ON_ONE_FETCH); - boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect(); - if (!ordered) { - scrollRequest.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); - } - SearchResponse responseWithHits = scrollRequest.get(); - // on ordered select - not using SCAN , elastic returns hits on first scroll - // es5.0 elastic always return docs on scan - // if(!ordered) - // responseWithHits = client.prepareSearchScroll(responseWithHits.getScrollId()) - // .setScroll(new TimeValue(600000)).get(); - return responseWithHits; + public SearchResponse getResponseWithHits( + TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) { + + return getResponseWithHits( + tableRequest.getRequestBuilder(), + tableRequest.getOriginalSelect(), + size, + previousResponse, + pit); + } + + public String[] getIndices(JoinRequestBuilder joinRequestBuilder) { + return Stream.concat( + Stream.of(joinRequestBuilder.getFirstTable().getOriginalSelect().getIndexArr()), + Stream.of(joinRequestBuilder.getSecondTable().getOriginalSelect().getIndexArr())) + .distinct() + .toArray(String[]::new); } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/HashJoinElasticExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/HashJoinElasticExecutor.java index 06a913205d..0e33ab9eef 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/HashJoinElasticExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/HashJoinElasticExecutor.java @@ -20,7 +20,6 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.document.DocumentField; -import org.opensearch.common.unit.TimeValue; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilders; @@ -36,16 +35,13 @@ /** Created by Eliran on 22/8/2015. */ public class HashJoinElasticExecutor extends ElasticJoinExecutor { private HashJoinElasticRequestBuilder requestBuilder; - - private Client client; private boolean useQueryTermsFilterOptimization = false; private final int MAX_RESULTS_FOR_FIRST_TABLE = 100000; HashJoinComparisonStructure hashJoinComparisonStructure; private Set alreadyMatched; public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requestBuilder) { - super(requestBuilder); - this.client = client; + super(client, requestBuilder); this.requestBuilder = requestBuilder; this.useQueryTermsFilterOptimization = requestBuilder.isUseTermFiltersOptimization(); this.hashJoinComparisonStructure = @@ -54,7 +50,6 @@ public HashJoinElasticExecutor(Client client, HashJoinElasticRequestBuilder requ } public List innerRun() throws IOException, SqlParseException { - Map>> optimizationTermsFilterStructure = initOptimizationStructure(); @@ -124,16 +119,12 @@ private List createCombinedResults(TableInJoinRequestBuilder secondTa Integer hintLimit = secondTableRequest.getHintLimit(); SearchResponse searchResponse; boolean finishedScrolling; + if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) { - searchResponse = secondTableRequest.getRequestBuilder().setSize(hintLimit).get(); + searchResponse = getResponseWithHits(secondTableRequest, hintLimit, null); finishedScrolling = true; } else { - searchResponse = - secondTableRequest - .getRequestBuilder() - .setScroll(new TimeValue(60000)) - .setSize(MAX_RESULTS_ON_ONE_FETCH) - .get(); + searchResponse = getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, null); // es5.0 no need to scroll again! // searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()) // .setScroll(new TimeValue(600000)).get(); @@ -214,11 +205,7 @@ private List createCombinedResults(TableInJoinRequestBuilder secondTa if (secondTableHits.length > 0 && (hintLimit == null || fetchedSoFarFromSecondTable >= hintLimit)) { searchResponse = - client - .prepareSearchScroll(searchResponse.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits(secondTableRequest, MAX_RESULTS_ON_ONE_FETCH, searchResponse); } else { break; } @@ -292,12 +279,13 @@ private List fetchAllHits(TableInJoinRequestBuilder tableInJoinReques private List scrollTillLimit( TableInJoinRequestBuilder tableInJoinRequest, Integer hintLimit) { - SearchResponse scrollResp = scrollOneTimeWithMax(client, tableInJoinRequest); + SearchResponse response = + getResponseWithHits(tableInJoinRequest, MAX_RESULTS_ON_ONE_FETCH, null); - updateMetaSearchResults(scrollResp); + updateMetaSearchResults(response); List hitsWithScan = new ArrayList<>(); int curentNumOfResults = 0; - SearchHit[] hits = scrollResp.getHits().getHits(); + SearchHit[] hits = response.getHits().getHits(); if (hintLimit == null) { hintLimit = MAX_RESULTS_FOR_FIRST_TABLE; @@ -311,13 +299,8 @@ private List scrollTillLimit( System.out.println("too many results for first table, stoping at:" + curentNumOfResults); break; } - scrollResp = - client - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); - hits = scrollResp.getHits().getHits(); + response = getResponseWithHits(tableInJoinRequest, MAX_RESULTS_FOR_FIRST_TABLE, response); + hits = response.getHits().getHits(); } return hitsWithScan; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/NestedLoopsElasticExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/NestedLoopsElasticExecutor.java index 56c5f96af5..9356a0058e 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/NestedLoopsElasticExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/NestedLoopsElasticExecutor.java @@ -18,7 +18,6 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.document.DocumentField; -import org.opensearch.common.unit.TimeValue; import org.opensearch.index.mapper.MapperService; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -39,11 +38,9 @@ public class NestedLoopsElasticExecutor extends ElasticJoinExecutor { private static final Logger LOG = LogManager.getLogger(); private final NestedLoopsElasticRequestBuilder nestedLoopsRequest; - private final Client client; public NestedLoopsElasticExecutor(Client client, NestedLoopsElasticRequestBuilder nestedLoops) { - super(nestedLoops); - this.client = client; + super(client, nestedLoops); this.nestedLoopsRequest = nestedLoops; } @@ -111,11 +108,26 @@ protected List innerRun() throws SqlParseException { if (!BackOffRetryStrategy.isHealthy()) { throw new IllegalStateException("Memory circuit is broken"); } - firstTableResponse = - client - .prepareSearchScroll(firstTableResponse.getScrollId()) - .setScroll(new TimeValue(600000)) - .get(); + /* Fetching next result page. + Using scroll api - only scrollId from previous response is required for scroll request. + Using pit with search_after - we need to recreate search request along with pitId and + sort fields from previous response. + Here we are finding required size for recreating search request with pit and search after. + Conditions for size are similar as firstFetch(). + In case of scroll, this size will be ignored and size from first request will be used. + */ + Integer hintLimit = nestedLoopsRequest.getFirstTable().getHintLimit(); + if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) { + firstTableResponse = + getResponseWithHits( + nestedLoopsRequest.getFirstTable(), hintLimit, firstTableResponse); + } else { + firstTableResponse = + getResponseWithHits( + nestedLoopsRequest.getFirstTable(), + MAX_RESULTS_ON_ONE_FETCH, + firstTableResponse); + } } else { finishedWithFirstTable = true; } @@ -287,12 +299,11 @@ private FetchWithScrollResponse firstFetch(TableInJoinRequestBuilder tableReques boolean needScrollForFirstTable = false; SearchResponse responseWithHits; if (hintLimit != null && hintLimit < MAX_RESULTS_ON_ONE_FETCH) { - responseWithHits = tableRequest.getRequestBuilder().setSize(hintLimit).get(); needScrollForFirstTable = false; } else { // scroll request with max. - responseWithHits = scrollOneTimeWithMax(client, tableRequest); + responseWithHits = getResponseWithHits(tableRequest, MAX_RESULTS_ON_ONE_FETCH, null); if (responseWithHits.getHits().getTotalHits() != null && responseWithHits.getHits().getTotalHits().value < MAX_RESULTS_ON_ONE_FETCH) { needScrollForFirstTable = true; diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/QueryPlanElasticExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/QueryPlanElasticExecutor.java index f4b2f5421d..d8e9d41376 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/QueryPlanElasticExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/QueryPlanElasticExecutor.java @@ -6,6 +6,7 @@ package org.opensearch.sql.legacy.executor.join; import java.util.List; +import org.opensearch.client.Client; import org.opensearch.search.SearchHit; import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder; import org.opensearch.sql.legacy.query.planner.core.QueryPlanner; @@ -19,8 +20,8 @@ class QueryPlanElasticExecutor extends ElasticJoinExecutor { private final QueryPlanner queryPlanner; - QueryPlanElasticExecutor(HashJoinQueryPlanRequestBuilder request) { - super(request); + QueryPlanElasticExecutor(Client client, HashJoinQueryPlanRequestBuilder request) { + super(client, request); this.queryPlanner = request.plan(); } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java index 03e16424e7..f58b25e821 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.executor.multi; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -18,7 +20,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.document.DocumentField; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.ArrayUtils; import org.opensearch.index.mapper.MapperService; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -28,16 +30,16 @@ import org.opensearch.sql.legacy.domain.Where; import org.opensearch.sql.legacy.domain.hints.Hint; import org.opensearch.sql.legacy.domain.hints.HintType; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.ElasticHitsExecutor; -import org.opensearch.sql.legacy.executor.join.ElasticUtils; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.DefaultQueryAction; import org.opensearch.sql.legacy.query.multi.MultiQueryRequestBuilder; import org.opensearch.sql.legacy.utils.Util; /** Created by Eliran on 26/8/2016. */ -public class MinusExecutor implements ElasticHitsExecutor { - private Client client; +public class MinusExecutor extends ElasticHitsExecutor { private MultiQueryRequestBuilder builder; private SearchHits minusHits; private boolean useTermsOptimization; @@ -63,45 +65,63 @@ public MinusExecutor(Client client, MultiQueryRequestBuilder builder) { @Override public void run() throws SqlParseException { - if (this.useTermsOptimization && this.fieldsOrderFirstTable.length != 1) { - throw new SqlParseException( - "Terms optimization failed: terms optimization for minus execution is supported with one" - + " field"); - } - if (this.useTermsOptimization && !this.useScrolling) { - throw new SqlParseException( - "Terms optimization failed: using scrolling is required for terms optimization"); - } - if (!this.useScrolling || !this.useTermsOptimization) { - Set comperableHitResults; - if (!this.useScrolling) { - // 1. get results from first search , put in set - // 2. get reults from second search - // 2.1 for each result remove from set - comperableHitResults = simpleOneTimeQueryEach(); + try { + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + pit = + new PointInTimeHandlerImpl( + client, + ArrayUtils.concat( + builder.getOriginalSelect(true).getIndexArr(), + builder.getOriginalSelect(false).getIndexArr())); + pit.create(); + } + + if (this.useTermsOptimization && this.fieldsOrderFirstTable.length != 1) { + throw new SqlParseException( + "Terms optimization failed: terms optimization for minus execution is supported with" + + " one field"); + } + if (this.useTermsOptimization && !this.useScrolling) { + throw new SqlParseException( + "Terms optimization failed: using scrolling is required for terms optimization"); + } + if (!this.useScrolling || !this.useTermsOptimization) { + Set comperableHitResults; + if (!this.useScrolling) { + // 1. get results from first search , put in set + // 2. get reults from second search + // 2.1 for each result remove from set + comperableHitResults = simpleOneTimeQueryEach(); + } else { + // if scrolling + // 1. get all results in scrolls (till some limit) . put on set + // 2. scroll on second table + // 3. on each scroll result remove items from set + comperableHitResults = runWithScrollings(); + } + fillMinusHitsFromResults(comperableHitResults); + return; } else { - // if scrolling - // 1. get all results in scrolls (till some limit) . put on set - // 2. scroll on second table - // 3. on each scroll result remove items from set - comperableHitResults = runWithScrollings(); + // if scrolling and optimization + // 0. save the original second table where , init set + // 1. on each scroll on first table , create miniSet + // 1.1 build where from all results (terms filter) , and run query + // 1.1.1 on each result remove from miniSet + // 1.1.2 add all results left from miniset to bigset + Select firstSelect = this.builder.getOriginalSelect(true); + MinusOneFieldAndOptimizationResult optimizationResult = + runWithScrollingAndAddFilter(fieldsOrderFirstTable[0], fieldsOrderSecondTable[0]); + String fieldName = getFieldName(firstSelect.getFields().get(0)); + Set results = optimizationResult.getFieldValues(); + SearchHit someHit = optimizationResult.getSomeHit(); + fillMinusHitsFromOneField(fieldName, results, someHit); + } + } catch (Exception e) { + LOG.error("Failed during multi query run.", e); + } finally { + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + pit.delete(); } - fillMinusHitsFromResults(comperableHitResults); - return; - } else { - // if scrolling and optimization - // 0. save the original second table where , init set - // 1. on each scroll on first table , create miniSet - // 1.1 build where from all results (terms filter) , and run query - // 1.1.1 on each result remove from miniSet - // 1.1.2 add all results left from miniset to bigset - Select firstSelect = this.builder.getOriginalSelect(true); - MinusOneFieldAndOptimizationResult optimizationResult = - runWithScrollingAndAddFilter(fieldsOrderFirstTable[0], fieldsOrderSecondTable[0]); - String fieldName = getFieldName(firstSelect.getFields().get(0)); - Set results = optimizationResult.getFieldValues(); - SearchHit someHit = optimizationResult.getSomeHit(); - fillMinusHitsFromOneField(fieldName, results, someHit); } } @@ -187,11 +207,12 @@ private void fillMinusHitsFromResults(Set comperableHitResu private Set runWithScrollings() { SearchResponse scrollResp = - ElasticUtils.scrollOneTimeWithHits( - this.client, - this.builder.getFirstSearchRequest(), + getResponseWithHits( + builder.getFirstSearchRequest(), builder.getOriginalSelect(true), - this.maxDocsToFetchOnEachScrollShard); + maxDocsToFetchOnEachScrollShard, + null, + pit); Set results = new HashSet<>(); SearchHit[] hits = scrollResp.getHits().getHits(); @@ -199,7 +220,6 @@ private Set runWithScrollings() { return new HashSet<>(); } int totalDocsFetchedFromFirstTable = 0; - // fetch from first table . fill set. while (hits != null && hits.length != 0) { totalDocsFetchedFromFirstTable += hits.length; @@ -208,19 +228,21 @@ private Set runWithScrollings() { break; } scrollResp = - client - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + builder.getFirstSearchRequest(), + builder.getOriginalSelect(true), + maxDocsToFetchOnEachScrollShard, + scrollResp, + pit); hits = scrollResp.getHits().getHits(); } scrollResp = - ElasticUtils.scrollOneTimeWithHits( - this.client, + getResponseWithHits( this.builder.getSecondSearchRequest(), builder.getOriginalSelect(false), - this.maxDocsToFetchOnEachScrollShard); + this.maxDocsToFetchOnEachScrollShard, + null, + pit); hits = scrollResp.getHits().getHits(); if (hits == null || hits.length == 0) { @@ -234,11 +256,12 @@ private Set runWithScrollings() { break; } scrollResp = - client - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + builder.getSecondSearchRequest(), + builder.getOriginalSelect(false), + maxDocsToFetchOnEachScrollShard, + scrollResp, + pit); hits = scrollResp.getHits().getHits(); } @@ -303,11 +326,12 @@ private boolean checkIfOnlyOneField(Select firstSelect, Select secondSelect) { private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( String firstFieldName, String secondFieldName) throws SqlParseException { SearchResponse scrollResp = - ElasticUtils.scrollOneTimeWithHits( - this.client, - this.builder.getFirstSearchRequest(), + getResponseWithHits( + builder.getFirstSearchRequest(), builder.getOriginalSelect(true), - this.maxDocsToFetchOnEachScrollShard); + maxDocsToFetchOnEachScrollShard, + null, + pit); Set results = new HashSet<>(); int currentNumOfResults = 0; SearchHit[] hits = scrollResp.getHits().getHits(); @@ -335,14 +359,16 @@ private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( break; } SearchResponse responseForSecondTable = - ElasticUtils.scrollOneTimeWithHits( - this.client, + getResponseWithHits( queryAction.getRequestBuilder(), secondQuerySelect, - this.maxDocsToFetchOnEachScrollShard); + this.maxDocsToFetchOnEachScrollShard, + null, + pit); SearchHits secondQuerySearchHits = responseForSecondTable.getHits(); SearchHit[] secondQueryHits = secondQuerySearchHits.getHits(); + while (secondQueryHits.length > 0) { totalDocsFetchedFromSecondTable += secondQueryHits.length; removeValuesFromSetAccordingToHits(secondFieldName, currentSetFromResults, secondQueryHits); @@ -350,11 +376,12 @@ private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( break; } responseForSecondTable = - client - .prepareSearchScroll(responseForSecondTable.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + queryAction.getRequestBuilder(), + secondQuerySelect, + maxDocsToFetchOnEachScrollShard, + responseForSecondTable, + pit); secondQueryHits = responseForSecondTable.getHits().getHits(); } results.addAll(currentSetFromResults); @@ -363,13 +390,13 @@ private MinusOneFieldAndOptimizationResult runWithScrollingAndAddFilter( "too many results for first table, stoping at:" + totalDocsFetchedFromFirstTable); break; } - scrollResp = - client - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(new TimeValue(600000)) - .execute() - .actionGet(); + getResponseWithHits( + builder.getFirstSearchRequest(), + builder.getOriginalSelect(true), + maxDocsToFetchOnEachScrollShard, + scrollResp, + pit); hits = scrollResp.getHits().getHits(); } return new MinusOneFieldAndOptimizationResult(results, someHit); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java index 6b8b64c4e8..375c40a5c1 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/UnionExecutor.java @@ -23,11 +23,10 @@ import org.opensearch.sql.legacy.utils.Util; /** Created by Eliran on 21/8/2016. */ -public class UnionExecutor implements ElasticHitsExecutor { +public class UnionExecutor extends ElasticHitsExecutor { private MultiQueryRequestBuilder multiQueryBuilder; private SearchHits results; - private Client client; private int currentId; public UnionExecutor(Client client, MultiQueryRequestBuilder builder) { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandler.java b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandler.java new file mode 100644 index 0000000000..66339cc70a --- /dev/null +++ b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandler.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy.pit; + +/** Point In Time */ +public interface PointInTimeHandler { + /** Create Point In Time */ + void create(); + + /** Delete Point In Time */ + void delete(); + + /** Get Point In Time Identifier */ + String getPitId(); +} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java new file mode 100644 index 0000000000..64535749e8 --- /dev/null +++ b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy.pit; + +import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; + +import lombok.Getter; +import lombok.Setter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; + +/** Handler for Point In Time */ +public class PointInTimeHandlerImpl implements PointInTimeHandler { + private Client client; + private String[] indices; + @Getter @Setter private String pitId; + private static final Logger LOG = LogManager.getLogger(); + + /** + * Constructor for class + * + * @param client OpenSearch client + * @param indices list of indices + */ + public PointInTimeHandlerImpl(Client client, String[] indices) { + this.client = client; + this.indices = indices; + } + + /** Create PIT for given indices */ + @Override + public void create() { + CreatePitRequest createPitRequest = + new CreatePitRequest( + LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE), false, indices); + client.createPit( + createPitRequest, + new ActionListener<>() { + @Override + public void onResponse(CreatePitResponse createPitResponse) { + pitId = createPitResponse.getId(); + LOG.info("Created Point In Time {} successfully.", pitId); + } + + @Override + public void onFailure(Exception e) { + LOG.error("Error occurred while creating PIT", e); + } + }); + } + + /** Delete PIT */ + @Override + public void delete() { + DeletePitRequest deletePitRequest = new DeletePitRequest(pitId); + client.deletePits( + deletePitRequest, + new ActionListener<>() { + @Override + public void onResponse(DeletePitResponse deletePitResponse) { + LOG.info( + "Delete Point In Time {} status: {}", + pitId, + deletePitResponse.status().getStatus()); + } + + @Override + public void onFailure(Exception e) { + LOG.error("Error occurred while deleting PIT", e); + } + }); + } +} diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImplTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImplTest.java new file mode 100644 index 0000000000..42f1af4563 --- /dev/null +++ b/legacy/src/test/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImplTest.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy.pit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; + +import java.util.concurrent.CompletableFuture; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; + +public class PointInTimeHandlerImplTest { + + @Mock private Client mockClient; + private String[] indices = {"index1", "index2"}; + private PointInTimeHandlerImpl pointInTimeHandlerImpl; + @Captor private ArgumentCaptor> listenerCaptor; + private final String PIT_ID = "testId"; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + pointInTimeHandlerImpl = new PointInTimeHandlerImpl(mockClient, indices); + } + + @Test + public void testCreate() { + when(LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(new TimeValue(10000)); + + CreatePitResponse mockCreatePitResponse = mock(CreatePitResponse.class); + when(mockCreatePitResponse.getId()).thenReturn(PIT_ID); + + CompletableFuture completableFuture = + CompletableFuture.completedFuture(mockCreatePitResponse); + + doAnswer( + invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onResponse(mockCreatePitResponse); + return completableFuture; + }) + .when(mockClient) + .createPit(any(), any()); + + pointInTimeHandlerImpl.create(); + + assertEquals(PIT_ID, pointInTimeHandlerImpl.getPitId()); + } + + @Test + public void testDelete() { + DeletePitResponse mockedResponse = mock(DeletePitResponse.class); + RestStatus mockRestStatus = mock(RestStatus.class); + when(mockedResponse.status()).thenReturn(mockRestStatus); + when(mockedResponse.status().getStatus()).thenReturn(200); + pointInTimeHandlerImpl.setPitId(PIT_ID); + pointInTimeHandlerImpl.delete(); + verify(mockClient).deletePits(any(), listenerCaptor.capture()); + listenerCaptor.getValue().onResponse(mockedResponse); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 475a584623..494b906b55 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -71,6 +71,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting SQL_PAGINATION_API_SEARCH_AFTER_SETTING = + Setting.boolSetting( + Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue(), + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting PPL_ENABLED_SETTING = Setting.boolSetting( Key.PPL_ENABLED.getKeyValue(), @@ -237,6 +244,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SQL_DELETE_ENABLED, SQL_DELETE_ENABLED_SETTING, new Updater(Key.SQL_DELETE_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.SQL_PAGINATION_API_SEARCH_AFTER, + SQL_PAGINATION_API_SEARCH_AFTER_SETTING, + new Updater(Key.SQL_PAGINATION_API_SEARCH_AFTER)); register( settingBuilder, clusterSettings, @@ -397,6 +410,7 @@ public static List> pluginSettings() { .add(SQL_SLOWLOG_SETTING) .add(SQL_CURSOR_KEEP_ALIVE_SETTING) .add(SQL_DELETE_ENABLED_SETTING) + .add(SQL_PAGINATION_API_SEARCH_AFTER_SETTING) .add(PPL_ENABLED_SETTING) .add(QUERY_MEMORY_LIMIT_SETTING) .add(QUERY_SIZE_LIMIT_SETTING)