Skip to content

Commit

Permalink
chore: add an old-style queue thread for intake (#11671)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Feb 26, 2024
1 parent 191fdc4 commit 08372c4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.swirlds.common.threading.framework.config.QueueThreadConfiguration;
import com.swirlds.common.threading.framework.config.QueueThreadMetricsConfiguration;
import com.swirlds.common.threading.interrupt.InterruptableConsumer;
import com.swirlds.common.threading.manager.AdHocThreadManager;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.common.utility.AutoCloseableWrapper;
import com.swirlds.common.utility.Clearable;
Expand Down Expand Up @@ -509,6 +510,16 @@ public class SwirldsPlatform implements Platform {

platformWiring = components.add(new PlatformWiring(platformContext, time));

final QueueThread<GossipEvent> oldStyleIntakeQueue = new QueueThreadConfiguration<GossipEvent>(
AdHocThreadManager.getStaticThreadManager())
.setCapacity(10_000)
.setThreadName("old_style_intake_queue")
.setComponent("platform")
.setMetricsConfiguration(new QueueThreadMetricsConfiguration(metrics).enableMaxSizeMetric())
.setHandler(event -> platformWiring.getGossipEventInput().put(event))
.build();
components.add(oldStyleIntakeQueue);

savedStateController = new SavedStateController(stateConfig);

final SignedStateMetrics signedStateMetrics = new SignedStateMetrics(platformContext.getMetrics());
Expand Down Expand Up @@ -664,7 +675,7 @@ public class SwirldsPlatform implements Platform {
selfId,
appVersion,
transactionPool,
platformWiring.getIntakeQueueSizeSupplier(),
oldStyleIntakeQueue::size,
platformStatusManager::getCurrentStatus,
latestReconnectRound::get);

Expand Down Expand Up @@ -724,8 +735,16 @@ public class SwirldsPlatform implements Platform {
shadowGraph,
emergencyRecoveryManager,
consensusRef,
platformWiring.getGossipEventInput()::put,
platformWiring.getIntakeQueueSizeSupplier(),
event -> {
try {
oldStyleIntakeQueue.put(event);
} catch (final InterruptedException e) {
logger.error(
EXCEPTION.getMarker(), "Interrupted while adding event to old style intake queue", e);
Thread.currentThread().interrupt();
}
},
oldStyleIntakeQueue::size,
swirldStateManager,
latestCompleteState,
syncMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
public record PlatformSchedulersConfig(
@ConfigProperty(defaultValue = "1.0") double defaultPoolMultiplier,
@ConfigProperty(defaultValue = "0") int defaultPoolConstant,
@ConfigProperty(defaultValue = "10000") int eventHasherUnhandledCapacity,
@ConfigProperty(defaultValue = "500") int eventHasherUnhandledCapacity,
@ConfigProperty(defaultValue = "SEQUENTIAL") TaskSchedulerType internalEventValidatorSchedulerType,
@ConfigProperty(defaultValue = "500") int internalEventValidatorUnhandledCapacity,
@ConfigProperty(defaultValue = "SEQUENTIAL") TaskSchedulerType eventDeduplicatorSchedulerType,
Expand Down

0 comments on commit 08372c4

Please sign in to comment.