Skip to content

Commit

Permalink
add threads
Browse files Browse the repository at this point in the history
  • Loading branch information
mounaTay committed Aug 28, 2024
1 parent 4d898c6 commit 8d0c06c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
4 changes: 4 additions & 0 deletions src/main/java/com/datastax/oss/cass_stac/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ CREATE CUSTOM INDEX IF NOT EXISTS item_properties_timestamp_entries ON item (ent
final String item_centroid_create_index_statement = """
CREATE CUSTOM INDEX IF NOT EXISTS item_centroid_ann_idx ON item (centroid) USING 'org.apache.cassandra.index.sai.StorageAttachedIndex' WITH OPTIONS = {'similarity_function': 'euclidean'}
""";
final String item_id_idx_create_index_statement = """
CREATE CUSTOM INDEX IF NOT EXISTS item_id_idx ON item (id) USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'""";

CqlSession cqlSession = cqlSessionBuilder.build();

Expand All @@ -106,6 +108,8 @@ CREATE CUSTOM INDEX IF NOT EXISTS item_centroid_ann_idx ON item (centroid) USING
log.info("Verification of Item_properties_datetime index is successful");
cqlSession.execute(item_centroid_create_index_statement);
log.info("Verification of Item_centroid index is successful");
cqlSession.execute(item_id_idx_create_index_statement);
log.info("Verification of item_id_idx index is successful");
} catch(Exception ex) {
log.error(ex.getLocalizedMessage());
System.exit(1);
Expand Down
53 changes: 34 additions & 19 deletions src/main/java/com/datastax/oss/cass_stac/service/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.cassandra.core.CassandraTemplate;
import org.springframework.data.cassandra.core.query.CassandraPageRequest;
import org.springframework.data.cassandra.core.query.Criteria;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeParseException;
import java.util.*;
import java.util.concurrent.*;

import static java.lang.Integer.MAX_VALUE;
import static java.lang.Integer.min;

import java.util.concurrent.CompletableFuture;
import org.springframework.scheduling.annotation.Async;


@Service
@RequiredArgsConstructor
Expand Down Expand Up @@ -82,7 +78,7 @@ public ItemModelResponse getItemById(final String id) {
throw new RuntimeException(e.getLocalizedMessage());
}
}

@Async("asyncExecutor")
public CompletableFuture<ItemModelResponse> getItemsByIdParallel(final String id) {
final ItemId itemId = itemIdDao.findById(id)
Expand Down Expand Up @@ -296,6 +292,8 @@ public ItemCollection search(List<Float> bbox,
Boolean includeObjects) {

List<Item> allItems = new ArrayList<>();
List<Future<List<Item>>> futures = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(20); // Adjust the thread pool size as needed

Instant minDate = Instant.EPOCH;
Instant maxDate = Instant.now().plusSeconds(3155695200L);
Expand All @@ -319,28 +317,45 @@ public ItemCollection search(List<Float> bbox,
dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate))
.and(Criteria.where("datetime").gte(minDate)).withAllowFiltering();
}
if (ids != null && !ids.isEmpty()) {
dbQuery.and(Criteria.where("id").in(ids));
}

limit = limit == null ? 10 : limit;
Pageable pageable = PageRequest.of(0, 1500);
Slice<Item> itemPage;
try {
do {
final Pageable currentPageable = pageable;

do {
// Fetch a page of items
itemPage = cassandraTemplate.slice(dbQuery.pageRequest(pageable), Item.class);
Query finalDbQuery = dbQuery;
Future<List<Item>> future = executorService.submit(() -> {
Slice<Item> itemPage = cassandraTemplate.slice(finalDbQuery.pageRequest(currentPageable), Item.class);
return itemPage.getContent();
});

// Add the current page content to the list
allItems.addAll(itemPage.getContent());
futures.add(future);

// Move to the next page
pageable = itemPage.hasNext() ? itemPage.nextPageable() : null;
pageable = cassandraTemplate.slice(dbQuery.pageRequest(pageable), Item.class).hasNext()
? pageable.next()
: null;

} while (pageable != null);
} while (pageable != null);
for (Future<List<Item>> future : futures) {
allItems.addAll(future.get());
}

} catch (InterruptedException | ExecutionException e) {
logger.error("Error during parallel pagination", e);
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
}

allItems = allItems.stream().filter(_item -> {
boolean valid = true;
if (ids != null) {
valid = ids.contains(_item.getId().getId());
}
// if (ids != null) {
// valid = ids.contains(_item.getId().getId());
// }

if (intersects != null)
valid = GeometryUtil.fromGeometryByteBuffer(_item.getGeometry())
Expand Down

0 comments on commit 8d0c06c

Please sign in to comment.