Skip to content

Commit

Permalink
Merge pull request #35 from Anant/feature/error-handling
Browse files Browse the repository at this point in the history
move filter inside future stream
  • Loading branch information
anomnaco authored Sep 13, 2024
2 parents c8de6f0 + 4b687cc commit 655b0aa
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 62 deletions.
6 changes: 1 addition & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ target/
!**/src/test/**/target/

### IntelliJ IDEA ###
.idea/misc.xml
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
.idea
*.iws
*.iml
*.ipr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ The bounding box is provided as four or six numbers, depending on whether the co
try {
ItemCollection response = itemService.search(
bbox,
GeoJsonParser.parseGeometry(intersects),
intersects != null ? GeoJsonParser.parseGeometry(intersects) : null,
datetime,
limit,
ids,
Expand Down
106 changes: 50 additions & 56 deletions src/main/java/com/datastax/oss/cass_stac/service/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public Map<String, String> parseDatetime(String datetime) {

if (datetime.contains("/")) {
String[] parts = datetime.split("/");
Instant rangeCenter = Instant.parse("2023-01-01T00:00:00.00Z");
Instant rangeCenter = Instant.parse("2023-01-01T00:00:00.00Z");

result.put("minDate", parts[0].equals("..") ? rangeCenter.minusSeconds(15770000L).toString() : parts[0]);
result.put("minOffsetDate", parts[0].equals("..") ? rangeCenter.minusSeconds(15770000L).toString() : parts[0]);
Expand All @@ -289,10 +289,12 @@ public Map<String, String> parseDatetime(String datetime) {
public List<String> getPartitions(Geometry intersects, List<String> ids, Map<String, String> dateTimes) {
final GeoTimePartition partitioner = new GeoTimePartition();
List<String> partitions = new ArrayList<>();
if (intersects != null && ids == null)
if (intersects == null && (ids == null || ids.isEmpty()))
throw new RuntimeException("Filter on either ids or intersects should be passed to search");
if (ids == null)
partitions = switch (intersects.getGeometryType()) {
case "Point":
yield partitioner.getGeoTimePartitionsForPoint(intersects.getCentroid(),
yield partitioner.getGeoTimePartitionsForPoint(intersects.getCentroid(),
OffsetDateTime.parse(dateTimes.get("minOffsetDate")),
OffsetDateTime.parse(dateTimes.get("maxOffsetDate")));
case "Polygon":
Expand All @@ -302,7 +304,7 @@ public List<String> getPartitions(Geometry intersects, List<String> ids, Map<Str
default:
throw new IllegalStateException("Unexpected value: " + intersects.getGeometryType());
};
else if (ids != null) {
else if (!ids.isEmpty()) {
partitions = new ArrayList<>(ids.stream().map(id -> {
final ItemId itemId = itemIdDao.findById(id).orElse(null);
if (itemId != null) {
Expand All @@ -318,8 +320,7 @@ private List<Item> fetchItemsForPartition(String partitionId,
List<Float> bbox,
Instant minDate,
Instant maxDate,
List<String> collectionsArray,
Integer pageSize) {
List<String> collectionsArray) {
List<Item> itemPage;

Query dbQuery = Query.query(Criteria.where("partition_id").is(partitionId));
Expand All @@ -338,12 +339,11 @@ private List<Item> fetchItemsForPartition(String partitionId,
}

@Async("asyncExecutor")
private CompletableFuture<List<Item>> fetchItemsForPartitionAsync(String partitionId,
List<Float> bbox,
Instant minDate,
Instant maxDate,
List<String> collectionsArray,
Integer pageSize) {
protected CompletableFuture<List<Item>> fetchItemsForPartitionAsync(String partitionId,
List<Float> bbox,
Instant minDate,
Instant maxDate,
List<String> collectionsArray) {
List<Item> itemPage;

Query dbQuery = Query.query(Criteria.where("partition_id").is(partitionId));
Expand Down Expand Up @@ -391,64 +391,58 @@ public ItemCollection search(List<Float> bbox,

limit = limit == null ? 10 : limit;

if (datetime == null)
throw new RuntimeException("datetime is required to filter out data");

Map<String, String> dateTimes = parseDatetime(datetime);
List<String> partitionIds = getPartitions(intersects, ids, dateTimes);

assert partitionIds != null;
List<CompletableFuture<List<Item>>> futures = new ArrayList<>();

for (String partitionId : partitionIds) {
logger.info("Fetching items for partitionId: " + partitionId);
logger.info("Fetching items for partitionId: {}", partitionId);
CompletableFuture<List<Item>> future = fetchItemsForPartitionAsync(partitionId, bbox,
Instant.parse(dateTimes.get("minDate")), Instant.parse(dateTimes.get("maxDate")),
collectionsArray, limit);
collectionsArray);
futures.add(future);
}

CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<Item>> allItemsFuture = allFutures.thenApply(v ->
futures.stream()
.flatMap(future -> future.join().stream())
.collect(Collectors.toList())
CompletableFuture<List<Item>> allItemsFuture = allFutures.thenApply(v ->
futures.stream()
.flatMap(future -> future.join().stream())
.filter(_item -> {
boolean valid = true;
if (ids != null) {
valid = ids.contains(_item.getId().getId());
}

if (bbox != null) {
ItemDto itemDto;
try {
itemDto = convertItemToDto(_item);
} catch (IOException e) {
throw new RuntimeException(e);
}
valid = BboxIntersects(itemDto.getBbox(), bbox);
}

if (query != null) {
QueryEvaluator evaluator = new QueryEvaluator();
Map<String, Object> additionalAttributes;
JsonNode attributes;
try {
attributes = objectMapper.readValue(_item.getProperties(), JsonNode.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
additionalAttributes = objectMapper.convertValue(attributes, new TypeReference<>() {
});
valid = evaluator.evaluate(query, additionalAttributes);
}
return valid;
})
.collect(Collectors.toList())
);

List<Item> allItems = allItemsFuture.get();

allItems = allItems.stream().filter(_item -> {
boolean valid = true;
if (ids != null) {
valid = ids.contains(_item.getId().getId());
}

if (bbox != null) {
ItemDto itemDto;
try {
itemDto = convertItemToDto(_item);
} catch (IOException e) {
throw new RuntimeException(e);
}
valid = BboxIntersects(itemDto.getBbox(), bbox);
}

if (query != null) {
QueryEvaluator evaluator = new QueryEvaluator();
Map<String, Object> additionalAttributes;
JsonNode attributes;
try {
attributes = objectMapper.readValue(_item.getProperties(), JsonNode.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
additionalAttributes = objectMapper.convertValue(attributes, new TypeReference<>() {
});
valid = evaluator.evaluate(query, additionalAttributes);
}
return valid;
}).toList();

List<Item> allItems = allItemsFuture.get();

if (sort != null && !sort.isEmpty()) {
allItems = SortUtils.sortItems(allItems, sort);
Expand Down

0 comments on commit 655b0aa

Please sign in to comment.