Skip to content

Commit

Permalink
rename and add more UTs
Browse files Browse the repository at this point in the history
Signed-off-by: xinyual <[email protected]>
  • Loading branch information
xinyual committed Nov 21, 2024
1 parent b4adbc7 commit b963587
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand Down Expand Up @@ -264,19 +266,9 @@ private void checkAgentBeforeDeleteModel(String modelId, ActionListener<Boolean>
if (searchHits.length == 0) {
actionListener.onResponse(true);
} else {
List<String> relatedAgents = new ArrayList<>();
for (SearchHit hit : searchHits) {
relatedAgents.add(hit.getId());
}
actionListener
.onFailure(
new OpenSearchStatusException(
searchHits.length
+ " agents are still using this model, please delete or update the agents first: "
+ Arrays.toString(relatedAgents.toArray(new String[0])),
RestStatus.CONFLICT
)
);
String errorMessage = formatAgentErrorMessage(searchHits);

actionListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.CONFLICT));
}

}, e -> {
Expand Down Expand Up @@ -305,9 +297,13 @@ private void checkIngestPipelineBeforeDeleteModel(String modelId, ActionListener
actionListener
.onFailure(
new OpenSearchStatusException(
allDependentPipelineIds.size()
+ " ingest pipelines are still using this model, please delete or update the pipelines first: "
+ Arrays.toString(allDependentPipelineIds.toArray(new String[0])),
String
.format(
Locale.ROOT,
"%d ingest pipelines are still using this model, please delete or update the pipelines first: %s",
allDependentPipelineIds.size(),
Arrays.toString(allDependentPipelineIds.toArray(new String[0]))
),
RestStatus.CONFLICT
)
);
Expand Down Expand Up @@ -335,9 +331,13 @@ private void checkSearchPipelineBeforeDeleteModel(String modelId, ActionListener
actionListener
.onFailure(
new OpenSearchStatusException(
allDependentPipelineIds.size()
+ " search pipelines are still using this model, please delete or update the pipelines first: "
+ Arrays.toString(allDependentPipelineIds.toArray(new String[0])),
String
.format(
Locale.ROOT,
"%d search pipelines are still using this model, please delete or update the pipelines first: %s",
allDependentPipelineIds.size(),
Arrays.toString(allDependentPipelineIds.toArray(new String[0]))
),
RestStatus.CONFLICT
)
);
Expand All @@ -353,24 +353,23 @@ private void checkSearchPipelineBeforeDeleteModel(String modelId, ActionListener
private void checkDownstreamTaskBeforeDeleteModel(String modelId, Boolean isHidden, ActionListener<DeleteResponse> actionListener) {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicBoolean noneBlocked = new AtomicBoolean(true);
List<String> errorMessages = new ArrayList<>();
CopyOnWriteArrayList<String> errorMessages = new CopyOnWriteArrayList<>();
ActionListener<Boolean> countDownActionListener = ActionListener.wrap(b -> {
countDownLatch.countDown();
noneBlocked.compareAndSet(true, b);
if (countDownLatch.getCount() == 0) {
if (noneBlocked.get()) {
deleteModel(modelId, isHidden, actionListener);
} else {
actionListener.onFailure(new OpenSearchStatusException(String.join(",", errorMessages), RestStatus.CONFLICT));
actionListener.onFailure(new OpenSearchStatusException(String.join(". ", errorMessages), RestStatus.CONFLICT));
}
}
}, e -> {
countDownLatch.countDown();
noneBlocked.compareAndSet(true, false);
// actionListener.onFailure(e);
errorMessages.add(e.getMessage());
if (countDownLatch.getCount() == 0) {
actionListener.onFailure(new OpenSearchStatusException(String.join(",", errorMessages), RestStatus.CONFLICT));
actionListener.onFailure(new OpenSearchStatusException(String.join(". ", errorMessages), RestStatus.CONFLICT));
}

});
Expand Down Expand Up @@ -487,19 +486,19 @@ private <T> List<String> findDependentPipelines(
List<String> dependentPipelineConfigurations = new ArrayList<>();
for (T pipelineConfiguration : pipelineConfigurations) {
Map<String, Object> config = getConfigFunction.apply(pipelineConfiguration);
if (searchThroughConfig(config, candidateModelId, "")) {
if (searchThroughConfig(config, candidateModelId)) {
dependentPipelineConfigurations.add(getIdFunction.apply(pipelineConfiguration));
}
}
return dependentPipelineConfigurations;
}

// This method is to go through the pipeline configs and only when the key is model id and value is
// 1. String and equal to candidate id 2. A list of String containing candidate id We will return True. Otherwise False
private Boolean searchThroughConfig(Object searchCandidate, String candidateId, String targetModelKey) {
// This method is to go through the pipeline configs and he configuration is a map of string to objects.
// Objects can be a list or a map. we will search exhaustively through the configuration for any match of the candidateId.
private Boolean searchThroughConfig(Object searchCandidate, String candidateId) {
// Use a stack to store the elements to be processed
Deque<Pair<String, Object>> stack = new ArrayDeque<>();
stack.push(Pair.of(targetModelKey, searchCandidate));
stack.push(Pair.of("", searchCandidate));

while (!stack.isEmpty()) {
// Pop an item from the stack
Expand Down Expand Up @@ -531,6 +530,29 @@ private Boolean searchThroughConfig(Object searchCandidate, String candidateId,
return false;
}

private String formatAgentErrorMessage(SearchHit[] hits) {
boolean isHidden = false;
for (SearchHit hit : hits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
isHidden = isHidden || Boolean.parseBoolean((String) sourceAsMap.getOrDefault(IS_HIDDEN_FIELD, false));
}
if (isHidden) {
return String
.format(Locale.ROOT, "%d agents are still using this model, please delete or update the agents first", hits.length);
}
List<String> agentIds = new ArrayList<>();
for (SearchHit hit : hits) {
agentIds.add(hit.getId());
}
return String
.format(
Locale.ROOT,
"%d agents are still using this model, please delete or update the agents first: %s",
hits.length,
Arrays.toString(agentIds.toArray(new String[0]))
);
}

// this method is only to stub static method.
@VisibleForTesting
boolean isSuperAdminUserWrapper(ClusterService clusterService, Client client) {
Expand Down
Loading

0 comments on commit b963587

Please sign in to comment.