Skip to content

Commit

Permalink
Merge pull request #32 from Anant/feature/ParallelSearch
Browse files Browse the repository at this point in the history
Feature/parallel search
  • Loading branch information
anomnaco authored Sep 11, 2024
2 parents 2578681 + 6c98079 commit 3aa9462
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ public class AsyncConfig {

@Bean(name = "asyncExecutor")
public TaskExecutor asyncExecutor() {
int corePoolSize = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(corePoolSize * 4);
executor.setQueueCapacity(corePoolSize * 4);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
Expand Down
49 changes: 45 additions & 4 deletions src/main/java/com/datastax/oss/cass_stac/service/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,35 @@ private List<Item> fetchItemsForPartition(String partitionId,
return itemPage;
}

@Async("asyncExecutor")
private CompletableFuture<List<Item>> fetchItemsForPartitionAsync(String partitionId,
List<Float> bbox,
Instant minDate,
Instant maxDate,
List<String> collectionsArray,
Integer pageSize) {
List<Item> itemPage;

Query dbQuery = Query.query(Criteria.where("partition_id").is(partitionId))
;
if (collectionsArray != null) {
dbQuery = dbQuery.and(Criteria.where("collection").in(collectionsArray));
}

dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate))
.and(Criteria.where("datetime").gte(minDate));

logger.info("dbQuery.toString()");
logger.info(dbQuery.toString());
itemPage = cassandraTemplate.select(dbQuery, Item.class);

logger.info("itemPage.toString()");
logger.info(itemPage.toString());


return CompletableFuture.completedFuture(itemPage);
}

/**
* search within all items, items that intersect with bbox or a geometry using intersects
* date might be used as well as a filter
Expand Down Expand Up @@ -386,13 +415,25 @@ public ItemCollection search(List<Float> bbox,
List<String> partitionIds = getPartitions(intersects, ids, dateTimes);

assert partitionIds != null;
List<Item> allItems = new ArrayList<>();
List<CompletableFuture<List<Item>>> futures = new ArrayList<>();

for (String partitionId : partitionIds) {
logger.info("partitionId");
logger.info(partitionId);
allItems.addAll(fetchItemsForPartition(partitionId, bbox, Instant.parse(dateTimes.get("minDate")), Instant.parse(dateTimes.get("maxDate")), collectionsArray, limit));
logger.info("Fetching items for partitionId: " + partitionId);
CompletableFuture<List<Item>> future = fetchItemsForPartitionAsync(partitionId, bbox,
Instant.parse(dateTimes.get("minDate")), Instant.parse(dateTimes.get("maxDate")),
collectionsArray, limit);
futures.add(future);
}

CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<Item>> allItemsFuture = allFutures.thenApply(v ->
futures.stream()
.flatMap(future -> future.join().stream())
.collect(Collectors.toList())
);

List<Item> allItems = allItemsFuture.get();

allItems = allItems.stream().filter(_item -> {
boolean valid = true;
if (ids != null) {
Expand Down
4 changes: 2 additions & 2 deletions upload_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
log_file_path = 'process.log'

# Batch size
batch_size = 100
batch_size = 10

# Number of threads
num_threads = 10
num_threads = 64

def log(message):
with open(log_file_path, 'a') as log_file:
Expand Down

0 comments on commit 3aa9462

Please sign in to comment.