Skip to content

Commit

Permalink
fix: 11561 Added isReconnectContext flag to handle node removal pro…
Browse files Browse the repository at this point in the history
…perly in the reconnect context. (#11591)

Signed-off-by: Ivan Malygin <[email protected]>
  • Loading branch information
imalygin authored Feb 16, 2024
1 parent 25efdb3 commit 6d5348f
Show file tree
Hide file tree
Showing 13 changed files with 523 additions and 492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,16 @@ public void saveRecords(
final long lastLeafPath,
final Stream<VirtualHashRecord> pathHashRecordsToUpdate,
final Stream<VirtualLeafRecord<K, V>> leafRecordsToAddOrUpdate,
final Stream<VirtualLeafRecord<K, V>> leafRecordsToDelete) {
final Stream<VirtualLeafRecord<K, V>> leafRecordsToDelete,
final boolean isReconnectContext) {
try {
delegate.saveRecords(
firstLeafPath,
lastLeafPath,
pathHashRecordsToUpdate,
leafRecordsToAddOrUpdate,
leafRecordsToDelete);
leafRecordsToDelete,
isReconnectContext);
} catch (final Exception e) {
exceptionSink.set(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ void resumeCompaction() throws IOException {
* @param leafRecordsToAddOrUpdate stream of new leaf nodes and updated leaf nodes
* @param leafRecordsToDelete stream of new leaf nodes to delete, The leaf record's key and path
* have to be populated, all other data can be null.
* @param isReconnectContext if true, the method called in the context of reconnect
* @throws IOException If there was a problem saving changes to data source
*/
@Override
Expand All @@ -501,7 +502,8 @@ public void saveRecords(
final long lastLeafPath,
final Stream<VirtualHashRecord> hashRecordsToUpdate,
final Stream<VirtualLeafRecord<K, V>> leafRecordsToAddOrUpdate,
final Stream<VirtualLeafRecord<K, V>> leafRecordsToDelete)
final Stream<VirtualLeafRecord<K, V>> leafRecordsToDelete,
final boolean isReconnectContext)
throws IOException {
try {
validLeafPathRange = new KeyRange(firstLeafPath, lastLeafPath);
Expand All @@ -522,7 +524,8 @@ public void saveRecords(
}

// we might as well do this in the archive thread rather than leaving it waiting
writeLeavesToPathToKeyValue(firstLeafPath, lastLeafPath, leafRecordsToAddOrUpdate, leafRecordsToDelete);
writeLeavesToPathToKeyValue(
firstLeafPath, lastLeafPath, leafRecordsToAddOrUpdate, leafRecordsToDelete, isReconnectContext);
// wait for the other threads in the rare case they are not finished yet. We need to
// have all writing
// done before we return as when we return the state version we are writing is deleted
Expand Down Expand Up @@ -610,18 +613,7 @@ public VirtualLeafRecord<K, V> loadLeafRecord(final K key) throws IOException {
// Go ahead and lookup the value.
VirtualLeafRecord<K, V> leafRecord = pathToKeyValue.get(path);

// FUTURE WORK: once the reconnect key leak bug is fixed, this block should be removed
if (!leafRecord.getKey().equals(key)) {
if (database.getConfig().reconnectKeyLeakMitigationEnabled()) {
logger.warn(MERKLE_DB.getMarker(), "leaked key {} encountered, mitigation is enabled", key);
return null;
} else {
logger.error(
EXCEPTION.getMarker(),
"leaked key {} encountered, mitigation is disabled, expect problems",
key);
}
}
assert leafRecord != null && leafRecord.getKey().equals(key);

if (leafRecordCache != null) {
// No synchronization is needed here, see the comment above
Expand Down Expand Up @@ -1170,7 +1162,8 @@ private void writeLeavesToPathToKeyValue(
final long firstLeafPath,
final long lastLeafPath,
final Stream<VirtualLeafRecord<K, V>> dirtyLeaves,
final Stream<VirtualLeafRecord<K, V>> deletedLeaves)
final Stream<VirtualLeafRecord<K, V>> deletedLeaves,
boolean isReconnect)
throws IOException {
if ((dirtyLeaves == null) || (firstLeafPath <= 0)) {
// nothing to do
Expand Down Expand Up @@ -1218,7 +1211,11 @@ private void writeLeavesToPathToKeyValue(
// dirtyLeaves stream above
if (isLongKeyMode) {
final long key = ((VirtualLongKey) leafRecord.getKey()).getKeyAsLong();
longKeyToPath.putIfEqual(key, path, INVALID_PATH);
if (isReconnect) {
longKeyToPath.putIfEqual(key, path, INVALID_PATH);
} else {
longKeyToPath.put(key, INVALID_PATH);
}
} else {
objectKeyToPath.deleteIfEqual(leafRecord.getKey(), path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@
package com.swirlds.merkledb;

import static com.swirlds.common.test.fixtures.AssertionUtils.assertEventuallyEquals;
import static com.swirlds.merkledb.MerkleDbDataSourceTest.createDataSource;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.swirlds.merkledb.test.fixtures.ExampleByteArrayVirtualValue;
import com.swirlds.merkledb.test.fixtures.TestType;
import com.swirlds.virtualmap.VirtualLongKey;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -39,62 +35,62 @@ class DataSourceValidatorTest {
@TempDir
private Path tempDir;

private MerkleDbDataSource<VirtualLongKey, ExampleByteArrayVirtualValue> dataSource;
private int count;
private DataSourceValidator<VirtualLongKey, ExampleByteArrayVirtualValue> validator;

@BeforeEach
public void setUp() throws IOException {
public void setUp() {
count = 10_000;
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
// create db

dataSource = createDataSource(tempDir, "createAndCheckInternalNodeHashes", TestType.fixed_fixed, count, 0);
validator = new DataSourceValidator<>(dataSource);

// check db count
assertEventuallyEquals(
1L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected only 1 db");
}

@Test
void testValidateValidDataSource() throws IOException {
// create some node hashes
dataSource.saveRecords(
count,
count * 2L,
IntStream.range(0, count).mapToObj(MerkleDbDataSourceTest::createVirtualInternalRecord),
IntStream.range(count, count * 2 + 1)
.mapToObj(i -> TestType.fixed_fixed.dataType().createVirtualLeafRecord(i)),
Stream.empty());
MerkleDbDataSourceTest.createAndApplyDataSource(
tempDir, "createAndCheckInternalNodeHashes", TestType.fixed_fixed, count, 0, dataSource -> {
// check db count
assertEventuallyEquals(
1L,
MerkleDbDataSource::getCountOfOpenDatabases,
Duration.ofSeconds(1),
"Expected only 1 db");

final var validator = new DataSourceValidator<>(dataSource);
// create some node hashes
dataSource.saveRecords(
count,
count * 2L,
IntStream.range(0, count).mapToObj(MerkleDbDataSourceTest::createVirtualInternalRecord),
IntStream.range(count, count * 2 + 1)
.mapToObj(
i -> TestType.fixed_fixed.dataType().createVirtualLeafRecord(i)),
Stream.empty());

assertTrue(validator.validate());
assertTrue(validator.validate());
});
}

@Test
void testValidateInvalidDataSource() throws IOException {
// check db count
// create some node hashes
dataSource.saveRecords(
count,
count * 2L,
IntStream.range(0, count).mapToObj(MerkleDbDataSourceTest::createVirtualInternalRecord),
// leaves are missing
Stream.empty(),
Stream.empty());

assertFalse(validator.validate());
}

@AfterEach
public void cleanup() throws IOException {
if (dataSource != null) {
dataSource.close();
// check db count
assertEventuallyEquals(
0L, MerkleDbDataSource::getCountOfOpenDatabases, Duration.ofSeconds(1), "Expected no open dbs");
}
MerkleDbDataSourceTest.createAndApplyDataSource(
tempDir, "createAndCheckInternalNodeHashes", TestType.fixed_fixed, count, 0, dataSource -> {
// check db count
assertEventuallyEquals(
1L,
MerkleDbDataSource::getCountOfOpenDatabases,
Duration.ofSeconds(1),
"Expected only 1 db");
final var validator = new DataSourceValidator<>(dataSource);
// create some node hashes
dataSource.saveRecords(
count,
count * 2L,
IntStream.range(0, count).mapToObj(MerkleDbDataSourceTest::createVirtualInternalRecord),
// leaves are missing
Stream.empty(),
Stream.empty());
assertFalse(validator.validate());
});
}
}
Loading

0 comments on commit 6d5348f

Please sign in to comment.