Skip to content

Commit

Permalink
refactor mget method improved readability and efficiency (#3061)
Browse files Browse the repository at this point in the history
* refactor mget method improved readability and efficiency

* refactor mget method improved readability and efficiency
  • Loading branch information
ori0o0p authored Jan 6, 2025
1 parent ce92e6a commit 38a30dc
Showing 1 changed file with 10 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,45 +284,32 @@ public Flux<KeyValue<K, V>> mget(K... keys) {

@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<KeyValue<K, V>> mget(Iterable<K> keys) {

List<K> keyList = LettuceLists.newList(keys);
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);

if (partitioned.size() < 2) {
return super.mget(keyList);
}

List<Publisher<KeyValue<K, V>>> publishers = new ArrayList<>();

for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
publishers.add(super.mget(entry.getValue()));
}

Flux<KeyValue<K, V>> fluxes = Flux.mergeSequential(publishers);
List<Publisher<KeyValue<K, V>>> publishers = partitioned.values().stream().map(super::mget)
.collect(Collectors.toList());

Mono<List<KeyValue<K, V>>> map = fluxes.collectList().map(vs -> {

KeyValue<K, V>[] values = new KeyValue[vs.size()];
return Flux.mergeSequential(publishers).collectList().map(results -> {
KeyValue<K, V>[] values = new KeyValue[keyList.size()];
int offset = 0;
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {

for (List<K> partitionKeys : partitioned.values()) {
for (int i = 0; i < keyList.size(); i++) {

int index = entry.getValue().indexOf(keyList.get(i));
if (index == -1) {
continue;
int index = partitionKeys.indexOf(keyList.get(i));
if (index != -1) {
values[i] = results.get(offset + index);
}

values[i] = vs.get(offset + index);
}

offset += entry.getValue().size();
offset += partitionKeys.size();
}

return Arrays.asList(values);
});

return map.flatMapIterable(keyValues -> keyValues);
}).flatMapMany(Flux::fromIterable);
}

@Override
Expand Down

0 comments on commit 38a30dc

Please sign in to comment.