Skip to content

Commit

Permalink
Merge branch 'main' into bug/issue-8135/engine_api_milestones_zero
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Lehrner <[email protected]>
  • Loading branch information
daniellehrner committed Jan 22, 2025
2 parents b1e32b4 + 519323f commit 3d5e5f1
Show file tree
Hide file tree
Showing 25 changed files with 517 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ public BesuController build() {
syncState,
transactionPoolConfiguration,
besuComponent.map(BesuComponent::getBlobCache).orElse(new BlobCache()),
miningConfiguration);
miningConfiguration,
syncConfig.isPeerTaskSystemEnabled());

final List<PeerValidator> peerValidators =
createPeerValidators(protocolSchedule, peerTaskExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public void setUp() {
syncState,
txPoolConfig,
new BlobCache(),
MiningConfiguration.newDefault());
MiningConfiguration.newDefault(),
false);

serviceImpl =
new BesuEventsImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected BlockParameter blockParameter(final JsonRpcRequestContext request) {
return request.getRequiredParameter(0, BlockParameter.class);
} catch (JsonRpcParameterException e) {
throw new InvalidJsonRpcParameters(
"Invalid block parameter (index 0)", RpcErrorType.INVALID_BLOCK_PARAMS, e);
"Invalid block parameter (index 0)", RpcErrorType.INVALID_BLOCK_NUMBER_PARAMS, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,19 @@ private static FlatTrace.Context handleHalt(
traceFrameBuilder = currentContext.getBuilder();
}
traceFrameBuilder.error(
traceFrame.getExceptionalHaltReason().map(ExceptionalHaltReason::getDescription));
traceFrame
.getExceptionalHaltReason()
.map(
exceptionalHaltReason -> {
if (exceptionalHaltReason
.name()
.equals(ExceptionalHaltReason.INVALID_OPERATION.name())) {
return ExceptionalHaltReason.INVALID_OPERATION.getDescription();
} else {
return exceptionalHaltReason.getDescription();
}
}));

if (currentContext != null) {
final Action.Builder actionBuilder = traceFrameBuilder.getActionBuilder();
actionBuilder.value(Quantity.create(traceFrame.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ public void getBlockByNumberForInvalidBlockParameter() throws Exception {
// Check general format of result
final String respBody = resp.body().string();
final JsonObject json = new JsonObject(respBody);
final RpcErrorType expectedError = RpcErrorType.INVALID_BLOCK_PARAMS;
final RpcErrorType expectedError = RpcErrorType.INVALID_BLOCK_NUMBER_PARAMS;
testHelper.assertValidJsonRpcError(
json, id, expectedError.getCode(), expectedError.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,17 @@ public void returnsCorrectMethodName() {
@Test
public void exceptionWhenNoParamsSupplied() {
assertThatThrownBy(() -> method.response(requestWithParams()))
.isInstanceOf(InvalidJsonRpcParameters.class);
.isInstanceOf(InvalidJsonRpcParameters.class)
.hasFieldOrPropertyWithValue("rpcErrorType", RpcErrorType.INVALID_BLOCK_NUMBER_PARAMS);
verifyNoMoreInteractions(blockchainQueries);
}

@Test
public void exceptionWhenNoNumberSupplied() {
assertThatThrownBy(() -> method.response(requestWithParams("false")))
.isInstanceOf(InvalidJsonRpcParameters.class);
.isInstanceOf(InvalidJsonRpcParameters.class)
.hasFieldOrPropertyWithValue("rpcErrorType", RpcErrorType.INVALID_BLOCK_NUMBER_PARAMS);

verifyNoMoreInteractions(blockchainQueries);
}

Expand All @@ -129,7 +132,8 @@ public void exceptionWhenNoBoolSupplied() {
public void exceptionWhenNumberParamInvalid() {
assertThatThrownBy(() -> method.response(requestWithParams("invalid", "true")))
.isInstanceOf(InvalidJsonRpcParameters.class)
.hasMessage("Invalid block parameter (index 0)");
.hasMessage("Invalid block parameter (index 0)")
.hasFieldOrPropertyWithValue("rpcErrorType", RpcErrorType.INVALID_BLOCK_NUMBER_PARAMS);
verifyNoMoreInteractions(blockchainQueries);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
MessageData responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);

if (responseMessageData == null) {
throw new InvalidPeerTaskResponseException();
}

result = peerTask.processResponse(responseMessageData);
} finally {
inflightRequestCountForThisTaskClass.decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse;
import org.hyperledger.besu.ethereum.eth.messages.GetPooledTransactionsMessage;
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.List;
import java.util.function.Predicate;

public class GetPooledTransactionsFromPeerTask implements PeerTask<List<Transaction>> {

private final List<Hash> hashes;

public GetPooledTransactionsFromPeerTask(final List<Hash> hashes) {
this.hashes = hashes.stream().distinct().toList();
}

@Override
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}

@Override
public MessageData getRequestMessage() {
return GetPooledTransactionsMessage.create(hashes);
}

@Override
public List<Transaction> processResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
final PooledTransactionsMessage pooledTransactionsMessage =
PooledTransactionsMessage.readFrom(messageData);
final List<Transaction> responseTransactions = pooledTransactionsMessage.transactions();
if (responseTransactions.size() > hashes.size()) {
throw new InvalidPeerTaskResponseException(
"Response transaction count does not match request hash count");
}
return responseTransactions;
}

@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (peer) -> true;
}

@Override
public PeerTaskValidationResponse validateResult(final List<Transaction> result) {
if (!result.stream().allMatch((t) -> hashes.contains(t.getHash()))) {
return PeerTaskValidationResponse.RESULTS_DO_NOT_MATCH_QUERY;
}
return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD;
}

public List<Hash> getHashes() {
return hashes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
Expand All @@ -28,6 +30,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

import com.google.common.collect.EvictingQueue;
Expand All @@ -49,6 +52,7 @@ public class BufferedGetPooledTransactionsFromPeerFetcher {
private final ScheduledFuture<?> scheduledFuture;
private final EthPeer peer;
private final Queue<Hash> txAnnounces;
private final boolean isPeerTaskSystemEnabled;

public BufferedGetPooledTransactionsFromPeerFetcher(
final EthContext ethContext,
Expand All @@ -57,7 +61,8 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
final TransactionPool transactionPool,
final PeerTransactionTracker transactionTracker,
final TransactionPoolMetrics metrics,
final String metricLabel) {
final String metricLabel,
final boolean isPeerTaskSystemEnabled) {
this.ethContext = ethContext;
this.scheduledFuture = scheduledFuture;
this.peer = peer;
Expand All @@ -67,6 +72,7 @@ public BufferedGetPooledTransactionsFromPeerFetcher(
this.metricLabel = metricLabel;
this.txAnnounces =
Queues.synchronizedQueue(EvictingQueue.create(DEFAULT_MAX_PENDING_TRANSACTIONS));
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
}

public ScheduledFuture<?> getScheduledFuture() {
Expand All @@ -76,27 +82,53 @@ public ScheduledFuture<?> getScheduledFuture() {
public void requestTransactions() {
List<Hash> txHashesAnnounced;
while (!(txHashesAnnounced = getTxHashesAnnounced()).isEmpty()) {
final GetPooledTransactionsFromPeerTask task =
GetPooledTransactionsFromPeerTask.forHashes(
ethContext, txHashesAnnounced, metrics.getMetricsSystem());
task.assignPeer(peer);
ethContext
.getScheduler()
.scheduleSyncWorkerTask(task)
.thenAccept(
result -> {
List<Transaction> retrievedTransactions = result.getResult();
transactionTracker.markTransactionsAsSeen(peer, retrievedTransactions);

LOG.atTrace()
.setMessage("Got {} transactions of {} hashes requested from peer {}")
.addArgument(retrievedTransactions::size)
.addArgument(task.getTransactionHashes()::size)
.addArgument(peer::getLoggableId)
.log();

transactionPool.addRemoteTransactions(retrievedTransactions);
});
CompletableFuture<List<Transaction>> futureTransactions;
if (isPeerTaskSystemEnabled) {
final org.hyperledger.besu.ethereum.eth.manager.peertask.task
.GetPooledTransactionsFromPeerTask
task =
new org.hyperledger.besu.ethereum.eth.manager.peertask.task
.GetPooledTransactionsFromPeerTask(txHashesAnnounced);
futureTransactions =
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> {
PeerTaskExecutorResult<List<Transaction>> taskResult =
ethContext.getPeerTaskExecutor().executeAgainstPeer(task, peer);
if (taskResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
|| taskResult.result().isEmpty()) {
return CompletableFuture.failedFuture(
new RuntimeException("Failed to retrieve transactions for hashes"));
}
return CompletableFuture.completedFuture(taskResult.result().get());
});
} else {
final GetPooledTransactionsFromPeerTask task =
GetPooledTransactionsFromPeerTask.forHashes(
ethContext, txHashesAnnounced, metrics.getMetricsSystem());
task.assignPeer(peer);
futureTransactions =
ethContext
.getScheduler()
.scheduleSyncWorkerTask(task)
.thenCompose(
(peerTaskResult) ->
CompletableFuture.completedFuture(peerTaskResult.getResult()));
}

futureTransactions.thenAccept(
retrievedTransactions -> {
transactionTracker.markTransactionsAsSeen(peer, retrievedTransactions);

LOG.atTrace()
.setMessage("Got {} transactions requested from peer {}")
.addArgument(retrievedTransactions::size)
.addArgument(peer::getLoggableId)
.log();

transactionPool.addRemoteTransactions(retrievedTransactions);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@ public class NewPooledTransactionHashesMessageProcessor {
private final TransactionPoolConfiguration transactionPoolConfiguration;
private final EthContext ethContext;
private final TransactionPoolMetrics metrics;
private final boolean isPeerTaskSystemEnabled;

public NewPooledTransactionHashesMessageProcessor(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionPoolConfiguration transactionPoolConfiguration,
final EthContext ethContext,
final TransactionPoolMetrics metrics) {
final TransactionPoolMetrics metrics,
final boolean isPeerTaskSystemEnabled) {
this.transactionTracker = transactionTracker;
this.transactionPool = transactionPool;
this.transactionPoolConfiguration = transactionPoolConfiguration;
this.ethContext = ethContext;
this.metrics = metrics;
metrics.initExpiredMessagesCounter(METRIC_LABEL);
this.scheduledTasks = new ConcurrentHashMap<>();
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
}

void processNewPooledTransactionHashesMessage(
Expand Down Expand Up @@ -114,7 +117,8 @@ private void processNewPooledTransactionHashesMessage(
transactionPool,
transactionTracker,
metrics,
METRIC_LABEL);
METRIC_LABEL,
isPeerTaskSystemEnabled);
});

bufferedTask.addHashes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static TransactionPool createTransactionPool(
final SyncState syncState,
final TransactionPoolConfiguration transactionPoolConfiguration,
final BlobCache blobCache,
final MiningConfiguration miningConfiguration) {
final MiningConfiguration miningConfiguration,
final boolean isPeerTaskSystemEnabled) {

final TransactionPoolMetrics metrics = new TransactionPoolMetrics(metricsSystem);

Expand All @@ -80,7 +81,8 @@ public static TransactionPool createTransactionPool(
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
blobCache,
miningConfiguration);
miningConfiguration,
isPeerTaskSystemEnabled);
}

static TransactionPool createTransactionPool(
Expand All @@ -95,7 +97,8 @@ static TransactionPool createTransactionPool(
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final BlobCache blobCache,
final MiningConfiguration miningConfiguration) {
final MiningConfiguration miningConfiguration,
final boolean isPeerTaskSystemEnabled) {

final TransactionPool transactionPool =
new TransactionPool(
Expand Down Expand Up @@ -135,7 +138,8 @@ static TransactionPool createTransactionPool(
transactionPool,
transactionPoolConfiguration,
ethContext,
metrics),
metrics,
isPeerTaskSystemEnabled),
transactionPoolConfiguration.getUnstable().getTxMessageKeepAliveSeconds());

subscribeTransactionHandlers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,8 @@ public void transactionMessagesGoToTheCorrectExecutor() {
new SyncState(blockchain, ethManager.ethContext().getEthPeers()),
TransactionPoolConfiguration.DEFAULT,
new BlobCache(),
MiningConfiguration.newDefault())
MiningConfiguration.newDefault(),
false)
.setEnabled();

// Send just a transaction message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void setupTest() {
syncState,
TransactionPoolConfiguration.DEFAULT,
new BlobCache(),
MiningConfiguration.newDefault());
MiningConfiguration.newDefault(),
false);
transactionPool.setEnabled();

ethProtocolManager =
Expand Down
Loading

0 comments on commit 3d5e5f1

Please sign in to comment.