Skip to content

Commit

Permalink
Concurrency optimization for graph native loading update
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Ramadurai <[email protected]>
  • Loading branch information
Gankris96 authored and Ganesh Ramadurai committed Jan 23, 2025
1 parent 1c4a7ca commit 4dc8449
Show file tree
Hide file tree
Showing 7 changed files with 454 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add binary index support for Lucene engine. (#2292)[https://github.com/opensearch-project/k-NN/pull/2292]
- Add expand_nested_docs Parameter support to NMSLIB engine (#2331)[https://github.com/opensearch-project/k-NN/pull/2331]
- Add cosine similarity support for faiss engine (#2376)[https://github.com/opensearch-project/k-NN/pull/2376]
- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]

### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

/**
* Manages native memory allocations made by JNI.
Expand All @@ -56,6 +58,7 @@ public class NativeMemoryCacheManager implements Closeable {

private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ConcurrentHashMap<String, ReentrantLock> indexLocks = new ConcurrentHashMap<>();
private final ExecutorService executor;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;
Expand Down Expand Up @@ -345,7 +348,22 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC

// Cache Miss
// Evict before put
// open the graph file before proceeding to load the graph into memory
ReentrantLock indexFileLock = indexLocks.computeIfAbsent(key, k -> new ReentrantLock());
indexFileLock.lock();
nativeMemoryEntryContext.openVectorIndex();
indexFileLock.unlock();
if (!indexFileLock.hasQueuedThreads()) {
indexLocks.remove(key, indexFileLock);
}
synchronized (this) {
// recheck if another thread already loaded this entry into the cache
result = cache.getIfPresent(key);
if (result != null) {
accessRecencyQueue.remove(key);
accessRecencyQueue.addLast(key);
return result;
}
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
Iterator<String> lruIterator = accessRecencyQueue.iterator();
while (lruIterator.hasNext()
Expand All @@ -367,7 +385,15 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
return result;
}
} else {
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
// open graphFile before load
try (nativeMemoryEntryContext) {
String key = nativeMemoryEntryContext.getKey();
ReentrantLock indexFileLock = indexLocks.computeIfAbsent(key, k -> new ReentrantLock());
indexFileLock.lock();
nativeMemoryEntryContext.openVectorIndex();
indexFileLock.unlock();
return cache.get(key, nativeMemoryEntryContext::load);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
package org.opensearch.knn.index.memory;

import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.store.IndexInputWithBuffer;

import java.io.IOException;
import java.util.Map;
Expand All @@ -26,7 +30,7 @@
/**
* Encapsulates all information needed to load a component into native memory.
*/
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> {
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> implements AutoCloseable {

protected final String key;

Expand Down Expand Up @@ -55,13 +59,27 @@ public String getKey() {
*/
public abstract Integer calculateSizeInKB();

/**
* Opens the graph file by opening the corresponding indexInput so
* that it is available for graph loading
*/

public void openVectorIndex() {}

/**
* Provides the capability to close the closable objects in the {@link NativeMemoryEntryContext}
*/
@Override
public void close() {}

/**
* Loads entry into memory.
*
* @return NativeMemoryAllocation associated with NativeMemoryEntryContext
*/
public abstract T load() throws IOException;

@Log4j2
public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.IndexAllocation> {

@Getter
Expand All @@ -75,6 +93,17 @@ public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMem
@Getter
private final String modelId;

@Getter
private boolean indexGraphFileOpened = false;
@Getter
private int indexSizeKb;

@Getter
private IndexInput readStream;

@Getter
IndexInputWithBuffer indexInputWithBuffer;

/**
* Constructor
*
Expand Down Expand Up @@ -131,10 +160,61 @@ public Integer calculateSizeInKB() {
}
}

@Override
public void openVectorIndex() {
// if graph file is already opened for index, do nothing
if (isIndexGraphFileOpened()) {
return;
}
// Extract vector file name from the given cache key.
// Ex: _0_165_my_field.faiss@1vaqiupVUwvkXAG4Qc/RPg==
final String cacheKey = this.getKey();
final String vectorFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(cacheKey);
if (vectorFileName == null) {
throw new IllegalStateException(
"Invalid cache key was given. The key [" + cacheKey + "] does not contain the corresponding vector file name."
);
}

// Prepare for opening index input from directory.
final Directory directory = this.getDirectory();

// Try to open an index input then pass it down to native engine for loading an index.
try {
indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);
readStream = directory.openInput(vectorFileName, IOContext.READONCE);
readStream.seek(0);
indexInputWithBuffer = new IndexInputWithBuffer(readStream);
indexGraphFileOpened = true;
log.debug("[KNN] NativeMemoryCacheManager openVectorIndex successful");
} catch (IOException e) {
throw new RuntimeException("Failed to openVectorIndex the index " + openSearchIndexName);
}
}

@Override
public NativeMemoryAllocation.IndexAllocation load() throws IOException {
if (!isIndexGraphFileOpened()) {
throw new IllegalStateException("Index graph file is not open");
}
return indexLoadStrategy.load(this);
}

// close the indexInput
@Override
public void close() {
if (readStream != null) {
try {
readStream.close();
indexGraphFileOpened = false;
} catch (IOException e) {
throw new RuntimeException(
"Exception while closing the indexInput index [" + openSearchIndexName + "] for loading the graph file.",
e
);
}
}
}
}

public static class TrainingDataEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.TrainingDataAllocation> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@

import lombok.extern.log4j.Log4j2;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.store.IndexInputWithBuffer;
import org.opensearch.knn.index.util.IndexUtil;
import org.opensearch.knn.jni.JNIService;
import org.opensearch.knn.index.engine.KNNEngine;
Expand Down Expand Up @@ -88,10 +85,16 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde
final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);

// Try to open an index input then pass it down to native engine for loading an index.
try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) {
final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream);
final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine);

// openVectorIndex takes care of opening the indexInput file
if (!indexEntryContext.isIndexGraphFileOpened()) {
throw new IllegalStateException("Index [" + indexEntryContext.getOpenSearchIndexName() + "] is not preloaded");
}
try (indexEntryContext) {
final long indexAddress = JNIService.loadIndex(
indexEntryContext.indexInputWithBuffer,
indexEntryContext.getParameters(),
knnEngine
);
return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName);
}
}
Expand Down
Loading

0 comments on commit 4dc8449

Please sign in to comment.