Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Conflict Detection in Parallelization by Considering Slots to Reduce False Positives #7923

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
6 changes: 2 additions & 4 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1806,10 +1806,8 @@ public BesuControllerBuilder setupControllerBuilder() {
if (DataStorageFormat.BONSAI.equals(getDataStorageConfiguration().getDataStorageFormat())) {
final DiffBasedSubStorageConfiguration subStorageConfiguration =
getDataStorageConfiguration().getDiffBasedSubStorageConfiguration();
if (subStorageConfiguration.getLimitTrieLogsEnabled()) {
besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
return besuControllerBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.requests.ProcessRequestContext;
import org.hyperledger.besu.ethereum.mainnet.requests.RequestProcessorCoordinator;
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
Expand Down Expand Up @@ -102,6 +103,26 @@ public BlockProcessingResult processBlock(
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
return processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}

protected BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater,
final PreprocessingFunction preprocessingBlockFunction) {
final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
long currentBlobGasUsed = 0;
Expand All @@ -127,7 +148,7 @@ public BlockProcessingResult processBlock(
.orElse(Wei.ZERO);

final Optional<PreprocessingContext> preProcessingContext =
runBlockPreProcessing(
preprocessingBlockFunction.run(
worldState,
privateMetadataUpdater,
blockHeader,
Expand Down Expand Up @@ -246,17 +267,6 @@ public BlockProcessingResult processBlock(
Optional.of(new BlockProcessingOutputs(worldState, receipts, maybeRequests)));
}

protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}

protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
final MutableWorldState worldState,
Expand Down Expand Up @@ -309,5 +319,30 @@ abstract boolean rewardCoinbase(
final boolean skipZeroBlockRewards);

public interface PreprocessingContext {}
;

public interface PreprocessingFunction {
Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice);

class NoPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.BlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor;
Expand All @@ -37,8 +41,13 @@
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MainnetParallelBlockProcessor extends MainnetBlockProcessor {

private static final Logger LOG = LoggerFactory.getLogger(MainnetParallelBlockProcessor.class);

private final Optional<MetricsSystem> metricsSystem;
private final Optional<Counter> confirmedParallelizedTransactionCounter;
private final Optional<Counter> conflictingButCachedTransactionCounter;
Expand Down Expand Up @@ -78,34 +87,6 @@ public MainnetParallelBlockProcessor(
"Counter for the number of conflicted transactions during block processing"));
}

@Override
protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
// transactions in the background through an optimistic strategy.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}

@Override
protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
Expand All @@ -126,7 +107,7 @@ protected TransactionProcessingResult getTransactionProcessingResult(
(ParallelizedPreProcessingContext) preProcessingContext.get();
transactionProcessingResult =
parallelizedPreProcessingContext
.getParallelizedConcurrentTransactionProcessor()
.parallelizedConcurrentTransactionProcessor()
.applyParallelizedTransactionResult(
worldState,
miningBeneficiary,
Expand Down Expand Up @@ -154,21 +135,48 @@ protected TransactionProcessingResult getTransactionProcessingResult(
}
}

static class ParallelizedPreProcessingContext implements PreprocessingContext {
final ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor;

public ParallelizedPreProcessingContext(
final ParallelizedConcurrentTransactionProcessor
parallelizedConcurrentTransactionProcessor) {
this.parallelizedConcurrentTransactionProcessor = parallelizedConcurrentTransactionProcessor;
}

public ParallelizedConcurrentTransactionProcessor
getParallelizedConcurrentTransactionProcessor() {
return parallelizedConcurrentTransactionProcessor;
@Override
public BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
final BlockProcessingResult blockProcessingResult =
super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new ParallelTransactionPreprocessing());
if (blockProcessingResult.isFailed()) {
// Fallback to non-parallel processing if there is a block processing exception .
LOG.info(
matkt marked this conversation as resolved.
Show resolved Hide resolved
"Block processing failed. Falling back to non-parallel processing for block #{} ({})",
blockHeader.getNumber(),
blockHeader.getBlockHash());
return super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}
return blockProcessingResult;
}

record ParallelizedPreProcessingContext(
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor)
implements PreprocessingContext {}

public static class ParallelBlockProcessorBuilder
implements ProtocolSpecBuilder.BlockProcessorBuilder {

Expand Down Expand Up @@ -196,4 +204,35 @@ public BlockProcessor apply(
metricsSystem);
}
}

class ParallelTransactionPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashOperation.BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
// transactions in the background through an optimistic strategy.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}
}
}
Loading
Loading