From 70eaa699090ca38864e5efe064d3ed35d43f6597 Mon Sep 17 00:00:00 2001 From: anomnaco Date: Wed, 11 Sep 2024 08:51:07 -0400 Subject: [PATCH 1/3] Get Items per PartitionId in parallel --- .../oss/cass_stac/service/ItemService.java | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java b/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java index 5228d35..f81f284 100644 --- a/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java +++ b/src/main/java/com/datastax/oss/cass_stac/service/ItemService.java @@ -348,6 +348,35 @@ private List fetchItemsForPartition(String partitionId, return itemPage; } + @Async("asyncExecutor") + private CompletableFuture> fetchItemsForPartitionAsync(String partitionId, + List bbox, + Instant minDate, + Instant maxDate, + List collectionsArray, + Integer pageSize) { + List itemPage; + + Query dbQuery = Query.query(Criteria.where("partition_id").is(partitionId)) +; + if (collectionsArray != null) { + dbQuery = dbQuery.and(Criteria.where("collection").in(collectionsArray)); + } + + dbQuery = dbQuery.and(Criteria.where("datetime").lte(maxDate)) + .and(Criteria.where("datetime").gte(minDate)); + + logger.info("dbQuery.toString()"); + logger.info(dbQuery.toString()); + itemPage = cassandraTemplate.select(dbQuery, Item.class); + + logger.info("itemPage.toString()"); + logger.info(itemPage.toString()); + + + return CompletableFuture.completedFuture(itemPage); + } + /** * search within all items, items that intersect with bbox or a geometry using intersects * date might be used as well as a filter @@ -386,13 +415,25 @@ public ItemCollection search(List bbox, List partitionIds = getPartitions(intersects, ids, dateTimes); assert partitionIds != null; - List allItems = new ArrayList<>(); + List>> futures = new ArrayList<>(); + for (String partitionId : partitionIds) { - logger.info("partitionId"); - logger.info(partitionId); - allItems.addAll(fetchItemsForPartition(partitionId, bbox, Instant.parse(dateTimes.get("minDate")), Instant.parse(dateTimes.get("maxDate")), collectionsArray, limit)); + logger.info("Fetching items for partitionId: " + partitionId); + CompletableFuture> future = fetchItemsForPartitionAsync(partitionId, bbox, + Instant.parse(dateTimes.get("minDate")), Instant.parse(dateTimes.get("maxDate")), + collectionsArray, limit); + futures.add(future); } + CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + CompletableFuture> allItemsFuture = allFutures.thenApply(v -> + futures.stream() + .flatMap(future -> future.join().stream()) + .collect(Collectors.toList()) + ); + + List allItems = allItemsFuture.get(); + allItems = allItems.stream().filter(_item -> { boolean valid = true; if (ids != null) { From b1f4ec77a3412a9ae70ba49ba87f18ce948cdce0 Mon Sep 17 00:00:00 2001 From: anomnaco Date: Wed, 11 Sep 2024 08:53:52 -0400 Subject: [PATCH 2/3] Async config for 5 to 10 threads --- .../java/com/datastax/oss/cass_stac/config/AsyncConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java b/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java index c7cdd01..9515f11 100644 --- a/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java +++ b/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java @@ -13,8 +13,8 @@ public class AsyncConfig { @Bean(name = "asyncExecutor") public TaskExecutor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(10); - executor.setMaxPoolSize(50); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncThread-"); executor.initialize(); From 6c980791c1e175af25684f7a58de8e11d35042b4 Mon Sep 17 00:00:00 2001 From: Rahul Singh Date: Wed, 11 Sep 2024 15:16:26 +0000 Subject: [PATCH 3/3] changing threads based on CPU --- .../com/datastax/oss/cass_stac/config/AsyncConfig.java | 7 ++++--- upload_batch.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java b/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java index 9515f11..2c19516 100644 --- a/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java +++ b/src/main/java/com/datastax/oss/cass_stac/config/AsyncConfig.java @@ -12,10 +12,11 @@ public class AsyncConfig { @Bean(name = "asyncExecutor") public TaskExecutor asyncExecutor() { + int corePoolSize = Runtime.getRuntime().availableProcessors(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(5); - executor.setMaxPoolSize(10); - executor.setQueueCapacity(100); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(corePoolSize * 4); + executor.setQueueCapacity(corePoolSize * 4); executor.setThreadNamePrefix("AsyncThread-"); executor.initialize(); return executor; diff --git a/upload_batch.py b/upload_batch.py index 71c6b5f..9de2c39 100755 --- a/upload_batch.py +++ b/upload_batch.py @@ -26,10 +26,10 @@ log_file_path = 'process.log' # Batch size -batch_size = 100 +batch_size = 10 # Number of threads -num_threads = 10 +num_threads = 64 def log(message): with open(log_file_path, 'a') as log_file: