Skip to content

Commit

Permalink
fix threads
Browse files Browse the repository at this point in the history
  • Loading branch information
mounaTay committed Aug 28, 2024
1 parent 8d0c06c commit 92a36ae
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions src/main/java/com/datastax/oss/cass_stac/service/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.cassandra.core.CassandraTemplate;
import org.springframework.data.cassandra.core.query.CassandraPageRequest;
import org.springframework.data.cassandra.core.query.Criteria;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.domain.PageRequest;
Expand Down Expand Up @@ -307,41 +308,46 @@ public ItemCollection search(List<Float> bbox,
maxDate = Instant.parse(datetime);
}

Query dbQuery = Query.empty();

if (collectionsArray != null) {
dbQuery = dbQuery.and(Criteria.where("collection").in(collectionsArray)).withAllowFiltering();
}

if (datetime != null) {
dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate))
.and(Criteria.where("datetime").gte(minDate)).withAllowFiltering();
}
if (ids != null && !ids.isEmpty()) {
dbQuery.and(Criteria.where("id").in(ids));
}

limit = limit == null ? 10 : limit;
Pageable pageable = PageRequest.of(0, 1500);
Slice<Item> itemPage;
try {
do {
final Pageable currentPageable = pageable;

Query finalDbQuery = dbQuery;
Future<List<Item>> future = executorService.submit(() -> {
Slice<Item> itemPage = cassandraTemplate.slice(finalDbQuery.pageRequest(currentPageable), Item.class);
return itemPage.getContent();
});
// Build the query, applying the id filter if provided
Query dbQuery = Query.empty().pageRequest(currentPageable);

futures.add(future);
if (collectionsArray != null) {
dbQuery = dbQuery.and(Criteria.where("collection").in(collectionsArray)).withAllowFiltering();
}

if (datetime != null) {
dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate))
.and(Criteria.where("datetime").gte(minDate))
.withAllowFiltering();
}
if (ids != null && !ids.isEmpty()) {
dbQuery.and(Criteria.where("id").in(ids));
}

pageable = cassandraTemplate.slice(dbQuery.pageRequest(pageable), Item.class).hasNext()
? pageable.next()
if (ids != null && !ids.isEmpty()) {
dbQuery.and(Criteria.where("id").in(ids));
}
itemPage = cassandraTemplate.slice(dbQuery, Item.class);

Slice<Item> finalItemPage = itemPage;
Future<List<Item>> future = executorService.submit(finalItemPage::getContent);
futures.add(future);
pageable = itemPage.hasNext()
? ((CassandraPageRequest) finalItemPage.getPageable()).next()
: null;

} while (pageable != null);

// Combine the results from all threads
for (Future<List<Item>> future : futures) {
allItems.addAll(future.get());
allItems.addAll(future.get()); // get() will block until the task is done
}

} catch (InterruptedException | ExecutionException e) {
Expand Down

0 comments on commit 92a36ae

Please sign in to comment.