-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
separate doc-level monitor query indices for externally defined monitors #1664
Changes from all commits
452a51c
b280761
a15d5a1
90620a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -537,7 +537,8 @@ class TransportIndexMonitorAction @Inject constructor( | |||
if ( | ||||
request.monitor.isMonitorOfStandardType() && | ||||
Monitor.MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == | ||||
Monitor.MonitorType.DOC_LEVEL_MONITOR | ||||
Monitor.MonitorType.DOC_LEVEL_MONITOR && | ||||
request.monitor.deleteQueryIndexInEveryRun == false | ||||
) { | ||||
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) | ||||
} | ||||
|
@@ -702,13 +703,22 @@ class TransportIndexMonitorAction @Inject constructor( | |||
Monitor.MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR | ||||
) { | ||||
updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) | ||||
client.suspendUntil<Client, BulkByScrollResponse> { | ||||
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) | ||||
.source(currentMonitor.dataSources.queryIndex) | ||||
.filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) | ||||
.execute(it) | ||||
if (docLevelMonitorQueries.docLevelQueryIndexExists(currentMonitor.dataSources)) { | ||||
client.suspendUntil<Client, BulkByScrollResponse> { | ||||
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) | ||||
.source(currentMonitor.dataSources.queryIndex) | ||||
.filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) | ||||
.execute(it) | ||||
} | ||||
} | ||||
if (currentMonitor.deleteQueryIndexInEveryRun == false) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alerting/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt Line 459 in b280761
plz change it to error log |
||||
indexDocLevelMonitorQueries( | ||||
request.monitor, | ||||
currentMonitor.id, | ||||
updatedMetadata, | ||||
request.refreshPolicy | ||||
) | ||||
} | ||||
indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) | ||||
MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) | ||||
} | ||||
actionListener.onResponse( | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest | |
import org.opensearch.action.bulk.BulkRequest | ||
import org.opensearch.action.bulk.BulkResponse | ||
import org.opensearch.action.index.IndexRequest | ||
import org.opensearch.action.support.IndicesOptions | ||
import org.opensearch.action.support.WriteRequest.RefreshPolicy | ||
import org.opensearch.action.support.master.AcknowledgedResponse | ||
import org.opensearch.alerting.MonitorRunnerService.monitorCtx | ||
|
@@ -181,6 +182,16 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ | |
} | ||
} | ||
|
||
suspend fun deleteDocLevelQueryIndex(dataSources: DataSources): Boolean { | ||
val ack: AcknowledgedResponse = client.suspendUntil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we do exists check before deleting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed it. |
||
client.admin().indices().delete( | ||
DeleteIndexRequest(dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), | ||
it | ||
) | ||
} | ||
return ack.isAcknowledged | ||
} | ||
|
||
fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { | ||
val clusterState = clusterService.state() | ||
return clusterState.metadata.hasAlias(dataSources.queryIndex) | ||
|
@@ -434,6 +445,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ | |
) | ||
) | ||
indexRequests.add(indexRequest) | ||
log.debug("query $query added for execution of monitor $monitorId on index $sourceIndex") | ||
} | ||
log.debug("bulk inserting percolate [${queries.size}] queries") | ||
if (indexRequests.isNotEmpty()) { | ||
|
@@ -478,7 +490,12 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ | |
updatedProperties: MutableMap<String, Any> | ||
): Pair<AcknowledgedResponse, String> { | ||
var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] | ||
if (targetQueryIndex == null) { | ||
if ( | ||
targetQueryIndex == null || ( | ||
targetQueryIndex != monitor.dataSources.queryIndex && | ||
monitor.deleteQueryIndexInEveryRun == true | ||
) | ||
) { | ||
// queryIndex is alias which will always have only 1 backing index which is writeIndex | ||
// This is due to a fact that that _rollover API would maintain only single index under alias | ||
// if you don't add is_write_index setting when creating index initially | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1195,7 +1195,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { | |
dataSources = DataSources( | ||
queryIndex = customQueryIndex, | ||
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), | ||
) | ||
), | ||
owner = "alerting" | ||
) | ||
try { | ||
createMonitor(monitor) | ||
|
@@ -2379,7 +2380,9 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { | |
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) | ||
var monitor = randomDocumentLevelMonitor( | ||
inputs = listOf(docLevelInput), | ||
triggers = listOf(trigger) | ||
triggers = listOf(trigger), | ||
dataSources = DataSources(), | ||
owner = "alerting" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz add 2 new tests-
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in first test plz also update monitor with flag false to true and do rollover again and this time assert monitor execution succeeeds |
||
) | ||
// This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api | ||
val docPayload: StringBuilder = StringBuilder(100000) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/opensearch-project/alerting/pull/1664/files#diff-58c6e3aa339ebbcc7ee2452426f17164dc91d435c1f24ee55fdc4f06ec1a371eR546
here source to query mapping is updated but in case of new change query index doesnt even get created and wrong value would come up right?
this needs to be handled too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the source to query mapping is dynamically updated during the first monitor run as shown in this logic. https://github.com/opensearch-project/alerting/blob/main/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt#L481
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this method being called for the monitors with dedicated queryindex also?
https://github.com/opensearch-project/alerting/blob/main/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt#L473C25-L473C49
this code snippet will always throw exception right?
because the queryIndex is not an alias anymore?