diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index a496909f67e..5d750a7cd3c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -145,7 +145,7 @@ public void addLocation(long ledgerId, long entryId, long location) throws IOExc batch.close(); } - public Batch newBatch() { + public Batch newBatch() throws IOException { return locationsDb.newBatch(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java index 8e18148c085..100903b66ee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java @@ -122,7 +122,7 @@ default void compact() throws IOException {} * * @return */ - CloseableIterator keys(); + CloseableIterator keys() throws IOException; /** * Get an iterator over to scan sequentially through all the keys within a @@ -134,13 +134,13 @@ default void compact() throws IOException {} * the lastKey in the range (not included) * */ - CloseableIterator keys(byte[] firstKey, byte[] lastKey); + CloseableIterator keys(byte[] firstKey, byte[] lastKey) throws IOException; /** * Return an iterator object that can be used to sequentially scan through all * the entries in the database. */ - CloseableIterator> iterator(); + CloseableIterator> iterator() throws IOException; /** * Commit all pending write to durable storage. @@ -163,7 +163,7 @@ interface CloseableIterator extends Closeable { T next() throws IOException; } - Batch newBatch(); + Batch newBatch() throws IOException; /** * Interface for a batch to be written in the storage. @@ -175,7 +175,7 @@ public interface Batch extends Closeable { void deleteRange(byte[] beginKey, byte[] endKey) throws IOException; - void clear(); + void clear() throws IOException; void flush() throws IOException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java index a77a0a18f7c..45f5b6983e5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; import org.apache.bookkeeper.conf.ServerConfiguration; import org.rocksdb.BlockBasedTableConfig; @@ -74,6 +75,10 @@ public class KeyValueStorageRocksDB implements KeyValueStorage { new KeyValueStorageRocksDB(defaultBasePath, subPath, dbConfigType, conf); private final RocksDB db; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + private volatile boolean closed = false; private RocksObject options; private List columnFamilyDescriptors; @@ -285,41 +290,57 @@ private RocksDB initializeRocksDBWithBookieConf(String basePath, String subPath, @Override public void close() throws IOException { - db.close(); - if (cache != null) { - cache.close(); - } - if (options != null) { - options.close(); - } - optionSync.close(); - optionDontSync.close(); - optionCache.close(); - optionDontCache.close(); - emptyBatch.close(); + writeLock.lock(); + try { + closed = true; + db.close(); + if (cache != null) { + cache.close(); + } + if (options != null) { + options.close(); + } + optionSync.close(); + optionDontSync.close(); + optionCache.close(); + optionDontCache.close(); + emptyBatch.close(); + } finally { + writeLock.unlock(); + } } @Override public void put(byte[] key, byte[] value) throws IOException { + readLock.lock(); try { + throwIfClosed(); db.put(optionDontSync, key, value); } catch (RocksDBException e) { throw new IOException("Error in RocksDB put", e); + } finally { + readLock.unlock(); } } @Override public byte[] get(byte[] key) throws IOException { + readLock.lock(); try { + throwIfClosed(); return db.get(key); } catch (RocksDBException e) { throw new IOException("Error in RocksDB get", e); + } finally { + readLock.unlock(); } } @Override public int get(byte[] key, byte[] value) throws IOException { + readLock.lock(); try { + throwIfClosed(); int res = db.get(key, value); if (res == RocksDB.NOT_FOUND) { return -1; @@ -330,19 +351,27 @@ public int get(byte[] key, byte[] value) throws IOException { } } catch (RocksDBException e) { throw new IOException("Error in RocksDB get", e); + } finally { + readLock.unlock(); } } @Override @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE") public Entry getFloor(byte[] key) throws IOException { - try (Slice upperBound = new Slice(key); + readLock.lock(); + try { + throwIfClosed(); + try (Slice upperBound = new Slice(key); ReadOptions option = new ReadOptions(optionCache).setIterateUpperBound(upperBound); RocksIterator iterator = db.newIterator(option)) { - iterator.seekToLast(); - if (iterator.isValid()) { - return new EntryWrapper(iterator.key(), iterator.value()); + iterator.seekToLast(); + if (iterator.isValid()) { + return new EntryWrapper(iterator.key(), iterator.value()); + } } + } finally { + readLock.unlock(); } return null; } @@ -350,24 +379,34 @@ public Entry getFloor(byte[] key) throws IOException { @Override @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE") public Entry getCeil(byte[] key) throws IOException { - try (RocksIterator iterator = db.newIterator(optionCache)) { - // Position the iterator on the record whose key is >= to the supplied key - iterator.seek(key); - - if (iterator.isValid()) { - return new EntryWrapper(iterator.key(), iterator.value()); - } else { - return null; + readLock.lock(); + try { + throwIfClosed(); + try (RocksIterator iterator = db.newIterator(optionCache)) { + // Position the iterator on the record whose key is >= to the supplied key + iterator.seek(key); + + if (iterator.isValid()) { + return new EntryWrapper(iterator.key(), iterator.value()); + } else { + return null; + } } + } finally { + readLock.unlock(); } } @Override public void delete(byte[] key) throws IOException { + readLock.lock(); try { + throwIfClosed(); db.delete(optionDontSync, key); } catch (RocksDBException e) { throw new IOException("Error in RocksDB delete", e); + } finally { + readLock.unlock(); } } @@ -378,16 +417,22 @@ public String getDBPath() { @Override public void compact(byte[] firstKey, byte[] lastKey) throws IOException { + readLock.lock(); try { + throwIfClosed(); db.compactRange(firstKey, lastKey); } catch (RocksDBException e) { throw new IOException("Error in RocksDB compact", e); + } finally { + readLock.unlock(); } } @Override public void compact() throws IOException { + readLock.lock(); try { + throwIfClosed(); final long start = System.currentTimeMillis(); final int oriRocksDBFileCount = db.getLiveFilesMetaData().size(); final long oriRocksDBSize = getRocksDBSize(); @@ -403,6 +448,8 @@ public void compact() throws IOException { db.getName(), end - start, oriRocksDBSize - rocksDBSize, rocksDBFileCount, rocksDBSize); } catch (RocksDBException e) { throw new IOException("Error in RocksDB compact", e); + } finally { + readLock.unlock(); } } @@ -417,109 +464,196 @@ private long getRocksDBSize() { @Override public void sync() throws IOException { + readLock.lock(); try { + throwIfClosed(); db.write(optionSync, emptyBatch); } catch (RocksDBException e) { throw new IOException(e); + } finally { + readLock.unlock(); } } @Override - public CloseableIterator keys() { - final RocksIterator iterator = db.newIterator(optionCache); - iterator.seekToFirst(); - - return new CloseableIterator() { - @Override - public boolean hasNext() { - return iterator.isValid(); - } - - @Override - public byte[] next() { - checkState(iterator.isValid()); - byte[] key = iterator.key(); - iterator.next(); - return key; - } - - @Override - public void close() { - iterator.close(); - } - }; + public CloseableIterator keys() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + final RocksIterator iterator = db.newIterator(optionCache); + iterator.seekToFirst(); + + return new CloseableIterator() { + @Override + public boolean hasNext() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + return iterator.isValid(); + } finally { + readLock.unlock(); + } + } + + @Override + public byte[] next() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + checkState(iterator.isValid()); + byte[] key = iterator.key(); + iterator.next(); + return key; + } finally { + readLock.unlock(); + } + } + + @Override + public void close() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + iterator.close(); + } finally { + readLock.unlock(); + } + } + }; + } finally { + readLock.unlock(); + } } @Override - public CloseableIterator keys(byte[] firstKey, byte[] lastKey) { - final Slice upperBound = new Slice(lastKey); - final ReadOptions option = new ReadOptions(optionCache).setIterateUpperBound(upperBound); - final RocksIterator iterator = db.newIterator(option); - iterator.seek(firstKey); - - return new CloseableIterator() { - @Override - public boolean hasNext() { - return iterator.isValid(); - } - - @Override - public byte[] next() { - checkState(iterator.isValid()); - byte[] key = iterator.key(); - iterator.next(); - return key; - } - - @Override - public void close() { - iterator.close(); - option.close(); - upperBound.close(); - } - }; + public CloseableIterator keys(byte[] firstKey, byte[] lastKey) throws IOException { + readLock.lock(); + try { + throwIfClosed(); + final Slice upperBound = new Slice(lastKey); + final ReadOptions option = new ReadOptions(optionCache).setIterateUpperBound(upperBound); + final RocksIterator iterator = db.newIterator(option); + iterator.seek(firstKey); + + return new CloseableIterator() { + @Override + public boolean hasNext() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + return iterator.isValid(); + } finally { + readLock.unlock(); + } + } + + @Override + public byte[] next() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + checkState(iterator.isValid()); + byte[] key = iterator.key(); + iterator.next(); + return key; + } finally { + readLock.unlock(); + } + } + + @Override + public void close() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + iterator.close(); + option.close(); + upperBound.close(); + } finally { + readLock.unlock(); + } + + } + }; + } finally { + readLock.unlock(); + } } @Override - public CloseableIterator> iterator() { - final RocksIterator iterator = db.newIterator(optionDontCache); - iterator.seekToFirst(); - final EntryWrapper entryWrapper = new EntryWrapper(); - - return new CloseableIterator>() { - @Override - public boolean hasNext() { - return iterator.isValid(); - } - - @Override - public Entry next() { - checkState(iterator.isValid()); - entryWrapper.key = iterator.key(); - entryWrapper.value = iterator.value(); - iterator.next(); - return entryWrapper; - } - - @Override - public void close() { - iterator.close(); - } - }; + public CloseableIterator> iterator() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + final RocksIterator iterator = db.newIterator(optionDontCache); + iterator.seekToFirst(); + final EntryWrapper entryWrapper = new EntryWrapper(); + + return new CloseableIterator>() { + @Override + public boolean hasNext() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + return iterator.isValid(); + } finally { + readLock.unlock(); + } + } + + @Override + public Entry next() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + checkState(iterator.isValid()); + entryWrapper.key = iterator.key(); + entryWrapper.value = iterator.value(); + iterator.next(); + return entryWrapper; + } finally { + readLock.unlock(); + } + } + + @Override + public void close() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + iterator.close(); + } finally { + readLock.unlock(); + } + } + }; + } finally { + readLock.unlock(); + } } @Override public long count() throws IOException { + readLock.lock(); try { + throwIfClosed(); return db.getLongProperty("rocksdb.estimate-num-keys"); } catch (RocksDBException e) { throw new IOException("Error in getting records count", e); + } finally { + readLock.unlock(); } } @Override - public Batch newBatch() { - return new RocksDBBatch(writeBatchMaxSize); + public Batch newBatch() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + return new RocksDBBatch(writeBatchMaxSize); + } finally { + readLock.unlock(); + } } private class RocksDBBatch implements Batch { @@ -532,44 +666,68 @@ private class RocksDBBatch implements Batch { } @Override - public void close() { - writeBatch.close(); - batchCount = 0; + public void close() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + writeBatch.close(); + batchCount = 0; + } finally { + readLock.unlock(); + } } @Override public void put(byte[] key, byte[] value) throws IOException { + readLock.lock(); try { + throwIfClosed(); writeBatch.put(key, value); countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); + } finally { + readLock.unlock(); } } @Override public void remove(byte[] key) throws IOException { + readLock.lock(); try { + throwIfClosed(); writeBatch.delete(key); countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); + } finally { + readLock.unlock(); } } @Override - public void clear() { - writeBatch.clear(); - batchCount = 0; + public void clear() throws IOException { + readLock.lock(); + try { + throwIfClosed(); + writeBatch.clear(); + batchCount = 0; + } finally { + readLock.unlock(); + } } @Override public void deleteRange(byte[] beginKey, byte[] endKey) throws IOException { + readLock.lock(); try { + throwIfClosed(); writeBatch.deleteRange(beginKey, endKey); countBatchAndFlushIfNeeded(); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); + } finally { + readLock.unlock(); } } @@ -587,10 +745,14 @@ public int batchCount() { @Override public void flush() throws IOException { + readLock.lock(); try { + throwIfClosed(); db.write(optionSync, writeBatch); } catch (RocksDBException e) { throw new IOException("Failed to flush RocksDB batch", e); + } finally { + readLock.unlock(); } } } @@ -639,5 +801,11 @@ RocksObject getOptions() { return options; } + private void throwIfClosed() throws IOException { + if (closed) { + throw new IOException("Attempted to use KeyValueStorageRocksDB after it was closed"); + } + } + private static final Logger log = LoggerFactory.getLogger(KeyValueStorageRocksDB.class); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java index bfb8d0d43fd..5632d55b7e4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java @@ -131,8 +131,9 @@ public void put(long entryLogId, EntryLogMetadata entryLogMeta) throws EntryLogM @Override public void forEach(BiConsumer action) throws EntryLogMetadataMapException { throwIfClosed(); - CloseableIterator> iterator = metadataMapDB.iterator(); + CloseableIterator> iterator = null; try { + iterator = metadataMapDB.iterator(); while (iterator.hasNext()) { if (isClosed.get()) { break; @@ -151,7 +152,9 @@ public void forEach(BiConsumer action) throws EntryLogMe throw new EntryLogMetadataMapException(e); } finally { try { - iterator.close(); + if (iterator != null) { + iterator.close(); + } } catch (IOException e) { log.error("Failed to close entry-log metadata-map rocksDB iterator {}", e.getMessage(), e); }