From c5b08c3e17ac1e491b17aa59e474cddc42ac0e4f Mon Sep 17 00:00:00 2001
From: "github-actions[bot]" <github-actions[bot]@users.noreply.github.com>
Date: Thu, 26 Sep 2024 00:29:34 +0000
Subject: [PATCH] separate doc-level monitor query indices for externally
 defined monitors (#1664)

Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
(cherry picked from commit abdeca920035414bba0e26b7459a6964cceb399a)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
---
 .../alerting/DocumentLevelMonitorRunner.kt    | 12 +++
 .../alerting/service/DeleteMonitorService.kt  | 96 +++++++++++--------
 .../transport/TransportIndexMonitorAction.kt  | 24 +++--
 .../alerting/util/DocLevelMonitorQueries.kt   | 19 +++-
 .../alerting/MonitorDataSourcesIT.kt          |  7 +-
 .../bwc/AlertingBackwardsCompatibilityIT.kt   |  1 +
 .../resources/mappings/scheduled-jobs.json    |  3 +
 .../SampleRemoteMonitorRestHandler.java       |  3 +
 8 files changed, 113 insertions(+), 52 deletions(-)

diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt
index f28b03292..0164aa00e 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt
@@ -374,6 +374,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
                 // Clean up any queries created by the dry run monitor
                 monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata)
             }
+
             // TODO: Update the Document as part of the Trigger and return back the trigger action result
             return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults)
         } catch (e: Exception) {
@@ -387,6 +388,17 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
             )
             return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
         } finally {
+            if (monitor.deleteQueryIndexInEveryRun == true &&
+                monitorCtx.docLevelMonitorQueries!!.docLevelQueryIndexExists(monitor.dataSources)
+            ) {
+                val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources)
+                if (!ack) {
+                    logger.error(
+                        "Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd! " +
+                            "for monitor ${monitor.id}"
+                    )
+                }
+            }
             val endTime = System.currentTimeMillis()
             totalTimeTakenStat = endTime - startTime
             logger.debug(
diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt
index 9d4357a8f..84a2d7937 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt
@@ -94,57 +94,69 @@ object DeleteMonitorService :
 
     private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) {
         try {
-            val metadata = MonitorMetadataService.getMetadata(monitor)
-            metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
+            if (monitor.deleteQueryIndexInEveryRun == false) {
+                val metadata = MonitorMetadataService.getMetadata(monitor)
+                metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
 
-                val indicesExistsResponse: IndicesExistsResponse =
-                    client.suspendUntil {
-                        client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
+                    val indicesExistsResponse: IndicesExistsResponse =
+                        client.suspendUntil {
+                            client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
+                        }
+                    if (indicesExistsResponse.isExists == false) {
+                        return
                     }
-                if (indicesExistsResponse.isExists == false) {
-                    return
-                }
-                // Check if there's any queries from other monitors in this queryIndex,
-                // to avoid unnecessary doc deletion, if we could just delete index completely
-                val searchResponse: SearchResponse = client.suspendUntil {
-                    search(
-                        SearchRequest(queryIndex).source(
-                            SearchSourceBuilder()
-                                .size(0)
-                                .query(
-                                    QueryBuilders.boolQuery().mustNot(
-                                        QueryBuilders.matchQuery("monitor_id", monitor.id)
+                    // Check if there's any queries from other monitors in this queryIndex,
+                    // to avoid unnecessary doc deletion, if we could just delete index completely
+                    val searchResponse: SearchResponse = client.suspendUntil {
+                        search(
+                            SearchRequest(queryIndex).source(
+                                SearchSourceBuilder()
+                                    .size(0)
+                                    .query(
+                                        QueryBuilders.boolQuery().mustNot(
+                                            QueryBuilders.matchQuery("monitor_id", monitor.id)
+                                        )
                                     )
-                                )
-                        ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
-                        it
-                    )
-                }
-                if (searchResponse.hits.totalHits.value == 0L) {
-                    val ack: AcknowledgedResponse = client.suspendUntil {
-                        client.admin().indices().delete(
-                            DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
+                            ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
                             it
                         )
                     }
-                    if (ack.isAcknowledged == false) {
-                        log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
-                    }
-                } else {
-                    // Delete all queries added by this monitor
-                    val response: BulkByScrollResponse = suspendCoroutine { cont ->
-                        DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
-                            .source(queryIndex)
-                            .filter(QueryBuilders.matchQuery("monitor_id", monitor.id))
-                            .refresh(true)
-                            .execute(
-                                object : ActionListener<BulkByScrollResponse> {
-                                    override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
-                                    override fun onFailure(t: Exception) = cont.resumeWithException(t)
-                                }
+                    if (searchResponse.hits.totalHits.value == 0L) {
+                        val ack: AcknowledgedResponse = client.suspendUntil {
+                            client.admin().indices().delete(
+                                DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
+                                it
                             )
+                        }
+                        if (ack.isAcknowledged == false) {
+                            log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
+                        }
+                    } else {
+                        // Delete all queries added by this monitor
+                        val response: BulkByScrollResponse = suspendCoroutine { cont ->
+                            DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
+                                .source(queryIndex)
+                                .filter(QueryBuilders.matchQuery("monitor_id", monitor.id))
+                                .refresh(true)
+                                .execute(
+                                    object : ActionListener<BulkByScrollResponse> {
+                                        override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
+                                        override fun onFailure(t: Exception) = cont.resumeWithException(t)
+                                    }
+                                )
+                        }
                     }
                 }
+            } else {
+                val ack: AcknowledgedResponse = client.suspendUntil {
+                    client.admin().indices().delete(
+                        DeleteIndexRequest(monitor.dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
+                        it
+                    )
+                }
+                if (ack.isAcknowledged == false) {
+                    log.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!")
+                }
             }
         } catch (e: Exception) {
             // we only log the error and don't fail the request because if monitor document has been deleted successfully,
diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt
index 4b49cbd1d..0a5d60965 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt
@@ -548,7 +548,8 @@ class TransportIndexMonitorAction @Inject constructor(
                     if (
                         request.monitor.isMonitorOfStandardType() &&
                         Monitor.MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) ==
-                        Monitor.MonitorType.DOC_LEVEL_MONITOR
+                        Monitor.MonitorType.DOC_LEVEL_MONITOR &&
+                        request.monitor.deleteQueryIndexInEveryRun == false
                     ) {
                         indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
                     }
@@ -719,13 +720,22 @@ class TransportIndexMonitorAction @Inject constructor(
                     Monitor.MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
                 ) {
                     updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor)
-                    client.suspendUntil<Client, BulkByScrollResponse> {
-                        DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
-                            .source(currentMonitor.dataSources.queryIndex)
-                            .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id))
-                            .execute(it)
+                    if (docLevelMonitorQueries.docLevelQueryIndexExists(currentMonitor.dataSources)) {
+                        client.suspendUntil<Client, BulkByScrollResponse> {
+                            DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
+                                .source(currentMonitor.dataSources.queryIndex)
+                                .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id))
+                                .execute(it)
+                        }
+                    }
+                    if (currentMonitor.deleteQueryIndexInEveryRun == false) {
+                        indexDocLevelMonitorQueries(
+                            request.monitor,
+                            currentMonitor.id,
+                            updatedMetadata,
+                            request.refreshPolicy
+                        )
                     }
-                    indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy)
                     MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true)
                 }
                 actionListener.onResponse(
diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt
index 3a0a11e91..10e144302 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt
@@ -24,6 +24,7 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
 import org.opensearch.action.bulk.BulkRequest
 import org.opensearch.action.bulk.BulkResponse
 import org.opensearch.action.index.IndexRequest
+import org.opensearch.action.support.IndicesOptions
 import org.opensearch.action.support.WriteRequest.RefreshPolicy
 import org.opensearch.action.support.master.AcknowledgedResponse
 import org.opensearch.alerting.MonitorRunnerService.monitorCtx
@@ -181,6 +182,16 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
         }
     }
 
+    suspend fun deleteDocLevelQueryIndex(dataSources: DataSources): Boolean {
+        val ack: AcknowledgedResponse = client.suspendUntil {
+            client.admin().indices().delete(
+                DeleteIndexRequest(dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
+                it
+            )
+        }
+        return ack.isAcknowledged
+    }
+
     fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
         val clusterState = clusterService.state()
         return clusterState.metadata.hasAlias(dataSources.queryIndex)
@@ -434,6 +445,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
                     )
                 )
             indexRequests.add(indexRequest)
+            log.debug("query $query added for execution of monitor $monitorId on index $sourceIndex")
         }
         log.debug("bulk inserting percolate [${queries.size}] queries")
         if (indexRequests.isNotEmpty()) {
@@ -479,7 +491,12 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
         updatedProperties: MutableMap<String, Any>
     ): Pair<AcknowledgedResponse, String> {
         var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id]
-        if (targetQueryIndex == null) {
+        if (
+            targetQueryIndex == null || (
+                targetQueryIndex != monitor.dataSources.queryIndex &&
+                    monitor.deleteQueryIndexInEveryRun == true
+                )
+        ) {
             // queryIndex is alias which will always have only 1 backing index which is writeIndex
             // This is due to a fact that that _rollover API would maintain only single index under alias
             // if you don't add is_write_index setting when creating index initially
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt
index 1bef411bd..bb4bfa679 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt
@@ -1196,7 +1196,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
             dataSources = DataSources(
                 queryIndex = customQueryIndex,
                 queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
-            )
+            ),
+            owner = "alerting"
         )
         try {
             createMonitor(monitor)
@@ -2381,7 +2382,9 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
         val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
         var monitor = randomDocumentLevelMonitor(
             inputs = listOf(docLevelInput),
-            triggers = listOf(trigger)
+            triggers = listOf(trigger),
+            dataSources = DataSources(),
+            owner = "alerting"
         )
         // This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api
         val docPayload: StringBuilder = StringBuilder(100000)
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt
index 937be869d..76cf22700 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt
@@ -113,6 +113,7 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
         val indexName = "test_bwc_index"
         val bwcMonitorString = """
             {
+              "owner": "alerting",
               "type": "monitor",
               "name": "test_bwc_monitor",
               "enabled": true,
diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json
index 2651c862e..311cc6d84 100644
--- a/core/src/main/resources/mappings/scheduled-jobs.json
+++ b/core/src/main/resources/mappings/scheduled-jobs.json
@@ -293,6 +293,9 @@
             }
           }
         },
+        "delete_query_index_in_every_run": {
+          "type": "boolean"
+        },
         "ui_metadata": {
           "type": "object",
           "enabled": false
diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java
index 0340baa6b..a8f384cb6 100644
--- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java
+++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java
@@ -94,6 +94,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
                                 "id", null)), trigger1Serialized)),
                 Map.of(),
                 new DataSources(),
+                true,
                 "sample-remote-monitor-plugin"
         );
         IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest(
@@ -154,6 +155,7 @@ public void onFailure(Exception e) {
                     List.of(),
                     Map.of(),
                     new DataSources(),
+                    true,
                     "sample-remote-monitor-plugin"
             );
             IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest(
@@ -237,6 +239,7 @@ public void onFailure(Exception e) {
                                     "id", null)), trigger1Serialized)),
                     Map.of(),
                     new DataSources(),
+                    true,
                     "sample-remote-monitor-plugin"
             );
             IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(