diff --git a/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java b/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java index 87f9b53..5bf4a04 100644 --- a/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java +++ b/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.cassandra.core.CassandraTemplate; +import org.springframework.data.cassandra.core.query.CassandraPageRequest; import org.springframework.data.cassandra.core.query.Criteria; import org.springframework.data.cassandra.core.query.Query; import org.springframework.data.domain.PageRequest; @@ -307,41 +308,46 @@ public ItemCollection search(List bbox, maxDate = Instant.parse(datetime); } - Query dbQuery = Query.empty(); - - if (collectionsArray != null) { - dbQuery = dbQuery.and(Criteria.where("collection").in(collectionsArray)).withAllowFiltering(); - } - - if (datetime != null) { - dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate)) - .and(Criteria.where("datetime").gte(minDate)).withAllowFiltering(); - } - if (ids != null && !ids.isEmpty()) { - dbQuery.and(Criteria.where("id").in(ids)); - } - limit = limit == null ? 10 : limit; Pageable pageable = PageRequest.of(0, 1500); + Slice itemPage; try { do { final Pageable currentPageable = pageable; - Query finalDbQuery = dbQuery; - Future> future = executorService.submit(() -> { - Slice itemPage = cassandraTemplate.slice(finalDbQuery.pageRequest(currentPageable), Item.class); - return itemPage.getContent(); - }); + // Build the query, applying the id filter if provided + Query dbQuery = Query.empty().pageRequest(currentPageable); - futures.add(future); + if (collectionsArray != null) { + dbQuery = dbQuery.and(Criteria.where("collection").in(collectionsArray)).withAllowFiltering(); + } + + if (datetime != null) { + dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate)) + .and(Criteria.where("datetime").gte(minDate)) + .withAllowFiltering(); + } + if (ids != null && !ids.isEmpty()) { + dbQuery.and(Criteria.where("id").in(ids)); + } - pageable = cassandraTemplate.slice(dbQuery.pageRequest(pageable), Item.class).hasNext() - ? pageable.next() + if (ids != null && !ids.isEmpty()) { + dbQuery.and(Criteria.where("id").in(ids)); + } + itemPage = cassandraTemplate.slice(dbQuery, Item.class); + + Slice finalItemPage = itemPage; + Future> future = executorService.submit(finalItemPage::getContent); + futures.add(future); + pageable = itemPage.hasNext() + ? ((CassandraPageRequest) finalItemPage.getPageable()).next() : null; } while (pageable != null); + + // Combine the results from all threads for (Future> future : futures) { - allItems.addAll(future.get()); + allItems.addAll(future.get()); // get() will block until the task is done } } catch (InterruptedException | ExecutionException e) {