diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java index d41fa3687bee..4276bceee30b 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/SwirldsPlatform.java @@ -50,6 +50,7 @@ import com.swirlds.common.threading.framework.config.QueueThreadConfiguration; import com.swirlds.common.threading.framework.config.QueueThreadMetricsConfiguration; import com.swirlds.common.threading.interrupt.InterruptableConsumer; +import com.swirlds.common.threading.manager.AdHocThreadManager; import com.swirlds.common.threading.manager.ThreadManager; import com.swirlds.common.utility.AutoCloseableWrapper; import com.swirlds.common.utility.Clearable; @@ -509,6 +510,16 @@ public class SwirldsPlatform implements Platform { platformWiring = components.add(new PlatformWiring(platformContext, time)); + final QueueThread oldStyleIntakeQueue = new QueueThreadConfiguration( + AdHocThreadManager.getStaticThreadManager()) + .setCapacity(10_000) + .setThreadName("old_style_intake_queue") + .setComponent("platform") + .setMetricsConfiguration(new QueueThreadMetricsConfiguration(metrics).enableMaxSizeMetric()) + .setHandler(event -> platformWiring.getGossipEventInput().put(event)) + .build(); + components.add(oldStyleIntakeQueue); + savedStateController = new SavedStateController(stateConfig); final SignedStateMetrics signedStateMetrics = new SignedStateMetrics(platformContext.getMetrics()); @@ -664,7 +675,7 @@ public class SwirldsPlatform implements Platform { selfId, appVersion, transactionPool, - platformWiring.getIntakeQueueSizeSupplier(), + oldStyleIntakeQueue::size, platformStatusManager::getCurrentStatus, latestReconnectRound::get); @@ -724,8 +735,16 @@ public class SwirldsPlatform implements Platform { shadowGraph, emergencyRecoveryManager, consensusRef, - platformWiring.getGossipEventInput()::put, - platformWiring.getIntakeQueueSizeSupplier(), + event -> { + try { + oldStyleIntakeQueue.put(event); + } catch (final InterruptedException e) { + logger.error( + EXCEPTION.getMarker(), "Interrupted while adding event to old style intake queue", e); + Thread.currentThread().interrupt(); + } + }, + oldStyleIntakeQueue::size, swirldStateManager, latestCompleteState, syncMetrics, diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java index 6bde04aabede..ceaf8a95aa07 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/PlatformSchedulersConfig.java @@ -86,7 +86,7 @@ public record PlatformSchedulersConfig( @ConfigProperty(defaultValue = "1.0") double defaultPoolMultiplier, @ConfigProperty(defaultValue = "0") int defaultPoolConstant, - @ConfigProperty(defaultValue = "10000") int eventHasherUnhandledCapacity, + @ConfigProperty(defaultValue = "500") int eventHasherUnhandledCapacity, @ConfigProperty(defaultValue = "SEQUENTIAL") TaskSchedulerType internalEventValidatorSchedulerType, @ConfigProperty(defaultValue = "500") int internalEventValidatorUnhandledCapacity, @ConfigProperty(defaultValue = "SEQUENTIAL") TaskSchedulerType eventDeduplicatorSchedulerType,