diff --git a/src/main/java/com/datastax/oss/cass_stac/App.java b/src/main/java/com/datastax/oss/cass_stac/App.java index 1b05a2b..08fbd05 100644 --- a/src/main/java/com/datastax/oss/cass_stac/App.java +++ b/src/main/java/com/datastax/oss/cass_stac/App.java @@ -85,6 +85,8 @@ CREATE CUSTOM INDEX IF NOT EXISTS item_properties_timestamp_entries ON item (ent final String item_centroid_create_index_statement = """ CREATE CUSTOM INDEX IF NOT EXISTS item_centroid_ann_idx ON item (centroid) USING 'org.apache.cassandra.index.sai.StorageAttachedIndex' WITH OPTIONS = {'similarity_function': 'euclidean'} """; + final String item_id_idx_create_index_statement = """ + CREATE CUSTOM INDEX IF NOT EXISTS item_id_idx ON item (id) USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'"""; CqlSession cqlSession = cqlSessionBuilder.build(); @@ -106,6 +108,8 @@ CREATE CUSTOM INDEX IF NOT EXISTS item_centroid_ann_idx ON item (centroid) USING log.info("Verification of Item_properties_datetime index is successful"); cqlSession.execute(item_centroid_create_index_statement); log.info("Verification of Item_centroid index is successful"); + cqlSession.execute(item_id_idx_create_index_statement); + log.info("Verification of item_id_idx index is successful"); } catch(Exception ex) { log.error(ex.getLocalizedMessage()); System.exit(1); diff --git a/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java b/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java index 8948efe..87f9b53 100644 --- a/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java +++ b/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java @@ -24,29 +24,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.cassandra.core.CassandraTemplate; -import org.springframework.data.cassandra.core.query.CassandraPageRequest; import org.springframework.data.cassandra.core.query.Criteria; import org.springframework.data.cassandra.core.query.Query; -import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.io.IOException; import java.nio.ByteBuffer; import java.time.Instant; import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.time.format.DateTimeParseException; import java.util.*; +import java.util.concurrent.*; import static java.lang.Integer.MAX_VALUE; import static java.lang.Integer.min; -import java.util.concurrent.CompletableFuture; -import org.springframework.scheduling.annotation.Async; - @Service @RequiredArgsConstructor @@ -82,7 +78,7 @@ public ItemModelResponse getItemById(final String id) { throw new RuntimeException(e.getLocalizedMessage()); } } - + @Async("asyncExecutor") public CompletableFuture getItemsByIdParallel(final String id) { final ItemId itemId = itemIdDao.findById(id) @@ -296,6 +292,8 @@ public ItemCollection search(List bbox, Boolean includeObjects) { List allItems = new ArrayList<>(); + List>> futures = new ArrayList<>(); + ExecutorService executorService = Executors.newFixedThreadPool(20); // Adjust the thread pool size as needed Instant minDate = Instant.EPOCH; Instant maxDate = Instant.now().plusSeconds(3155695200L); @@ -319,28 +317,45 @@ public ItemCollection search(List bbox, dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate)) .and(Criteria.where("datetime").gte(minDate)).withAllowFiltering(); } + if (ids != null && !ids.isEmpty()) { + dbQuery.and(Criteria.where("id").in(ids)); + } limit = limit == null ? 10 : limit; Pageable pageable = PageRequest.of(0, 1500); - Slice itemPage; + try { + do { + final Pageable currentPageable = pageable; - do { - // Fetch a page of items - itemPage = cassandraTemplate.slice(dbQuery.pageRequest(pageable), Item.class); + Query finalDbQuery = dbQuery; + Future> future = executorService.submit(() -> { + Slice itemPage = cassandraTemplate.slice(finalDbQuery.pageRequest(currentPageable), Item.class); + return itemPage.getContent(); + }); - // Add the current page content to the list - allItems.addAll(itemPage.getContent()); + futures.add(future); - // Move to the next page - pageable = itemPage.hasNext() ? itemPage.nextPageable() : null; + pageable = cassandraTemplate.slice(dbQuery.pageRequest(pageable), Item.class).hasNext() + ? pageable.next() + : null; - } while (pageable != null); + } while (pageable != null); + for (Future> future : futures) { + allItems.addAll(future.get()); + } + + } catch (InterruptedException | ExecutionException e) { + logger.error("Error during parallel pagination", e); + Thread.currentThread().interrupt(); + } finally { + executorService.shutdown(); + } allItems = allItems.stream().filter(_item -> { boolean valid = true; - if (ids != null) { - valid = ids.contains(_item.getId().getId()); - } +// if (ids != null) { +// valid = ids.contains(_item.getId().getId()); +// } if (intersects != null) valid = GeometryUtil.fromGeometryByteBuffer(_item.getGeometry())