diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/BestEffortPreconsensusEventFileCopy.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/BestEffortPreconsensusEventFileCopy.java
new file mode 100644
index 000000000000..295b920e2a72
--- /dev/null
+++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/BestEffortPreconsensusEventFileCopy.java
@@ -0,0 +1,277 @@
+/*
+ * Copyright (C) 2023 Hedera Hashgraph, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.swirlds.platform.event.preconsensus;
+
+import static com.swirlds.common.io.utility.FileUtils.executeAndRename;
+import static com.swirlds.logging.LogMarker.EXCEPTION;
+import static com.swirlds.logging.LogMarker.STATE_TO_DISK;
+
+import com.swirlds.common.context.PlatformContext;
+import com.swirlds.common.system.NodeId;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Operations for copying preconsensus event files. Is not fully thread safe, best effort only. Race conditions can
+ * cause the copy to fail, but if it does fail it will fail atomically and in a way that is recoverable.
+ */
+public final class BestEffortPreconsensusEventFileCopy {
+
+ private static final Logger logger = LogManager.getLogger(BestEffortPreconsensusEventFileCopy.class);
+
+ /**
+ * The number of times to attempt to copy the last PCES file. Access to this file is not really coordinated between
+ * this logic and the code responsible for managing PCES file lifecycle, and so there is a small chance that the
+ * file moves when we attempt to make a copy. However, this probability is fairly small, and it is very unlikely
+ * that we will be unable to snatch a copy in time with a few retries.
+ */
+ private static final int COPY_PCES_MAX_RETRIES = 10;
+
+ private BestEffortPreconsensusEventFileCopy() {}
+
+ /**
+ * Copy preconsensus event files into the signed state directory. Copying these files is not thread safe and may
+ * fail as a result. This method retries several times if a failure is encountered. Success is not guaranteed, but
+ * success or failure is atomic and will not throw an exception.
+ *
+ * @param platformContext the platform context
+ * @param selfId the id of this node
+ * @param destinationDirectory the directory where the state is being written
+ * @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
+ * state that is being written
+ * @param round the round of the state that is being written
+ */
+ public static void copyPreconsensusEventStreamFilesRetryOnFailure(
+ @NonNull final PlatformContext platformContext,
+ @NonNull final NodeId selfId,
+ @NonNull final Path destinationDirectory,
+ final long minimumGenerationNonAncient,
+ final long round) {
+
+ final boolean copyPreconsensusStream = platformContext
+ .getConfiguration()
+ .getConfigData(PreconsensusEventStreamConfig.class)
+ .copyRecentStreamToStateSnapshots();
+ if (!copyPreconsensusStream) {
+ // PCES copying is disabled
+ return;
+ }
+
+ final Path pcesDestination =
+ destinationDirectory.resolve("preconsensus-events").resolve(Long.toString(selfId.id()));
+
+ int triesRemaining = COPY_PCES_MAX_RETRIES;
+ while (triesRemaining > 0) {
+ triesRemaining--;
+ try {
+ executeAndRename(
+ pcesDestination,
+ temporaryDirectory -> copyPreconsensusEventStreamFiles(
+ platformContext, selfId, temporaryDirectory, minimumGenerationNonAncient));
+
+ return;
+ } catch (final IOException e) {
+ if (triesRemaining > 0) {
+ logger.warn(STATE_TO_DISK.getMarker(), "Unable to copy PCES files. Retrying.");
+ } else {
+ logger.error(
+ EXCEPTION.getMarker(),
+ "Unable to copy the last PCES file after {} retries. "
+ + "PCES files will not be written into the state snapshot for round {}.",
+ COPY_PCES_MAX_RETRIES,
+ round,
+ e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Copy preconsensus event files into the signed state directory. These files are necessary for the platform to use
+ * the state file as a starting point. Note: starting a node using the PCES files in the state directory does not
+ * guarantee that there is no data loss (i.e. there may be transactions that reach consensus after the state
+ * snapshot), but it does allow a node to start up and participate in gossip.
+ *
+ *
+ * This general strategy is not very elegant is very much a hack. But it will allow us to do migration testing using
+ * real production states and streams, in the short term. In the longer term we should consider alternate and
+ * cleaner strategies.
+ *
+ * @param platformContext the platform context
+ * @param selfId the id of this node
+ * @param destinationDirectory the directory where the PCES files should be written
+ * @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
+ * state that is being written
+ */
+ private static void copyPreconsensusEventStreamFiles(
+ @NonNull final PlatformContext platformContext,
+ @NonNull final NodeId selfId,
+ @NonNull final Path destinationDirectory,
+ final long minimumGenerationNonAncient)
+ throws IOException {
+
+ final List allFiles = gatherPreconsensusFilesOnDisk(selfId, platformContext);
+ if (allFiles.isEmpty()) {
+ return;
+ }
+
+ // Sort by sequence number
+ Collections.sort(allFiles);
+
+ // Discard all files that either have an incorrect origin or that do not contain non-ancient events.
+ final List filesToCopy =
+ getRequiredPreconsensusFiles(allFiles, minimumGenerationNonAncient);
+ if (filesToCopy.isEmpty()) {
+ return;
+ }
+
+ copyPreconsensusFileList(filesToCopy, destinationDirectory);
+ }
+
+ /**
+ * Get the preconsensus files that we need to copy to a state. We need any file that has a matching origin and that
+ * contains non-ancient events (w.r.t. the state).
+ *
+ * @param allFiles all PCES files on disk
+ * @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
+ * state that is being written
+ * @return the list of files to copy
+ */
+ @NonNull
+ private static List getRequiredPreconsensusFiles(
+ @NonNull final List allFiles, final long minimumGenerationNonAncient) {
+
+ final List filesToCopy = new ArrayList<>();
+ final PreconsensusEventFile lastFile = allFiles.get(allFiles.size() - 1);
+ for (final PreconsensusEventFile file : allFiles) {
+ if (file.getOrigin() == lastFile.getOrigin()
+ && file.getMaximumGeneration() >= minimumGenerationNonAncient) {
+ filesToCopy.add(file);
+ }
+ }
+
+ if (filesToCopy.isEmpty()) {
+ logger.warn(
+ STATE_TO_DISK.getMarker(),
+ "No preconsensus event files meeting specified criteria found to copy. "
+ + "Minimum generation non-ancient: {}",
+ minimumGenerationNonAncient);
+ } else if (filesToCopy.size() == 1) {
+ logger.info(
+ STATE_TO_DISK.getMarker(),
+ """
+ Found 1 preconsensus event file meeting specified criteria to copy.
+ Minimum generation non-ancient: {}
+ File: {}
+ """,
+ minimumGenerationNonAncient,
+ filesToCopy.get(0).getPath());
+ } else {
+ logger.info(
+ STATE_TO_DISK.getMarker(),
+ """
+ Found {} preconsensus event files meeting specified criteria to copy.
+ Minimum generation non-ancient: {}
+ First file to copy: {}
+ Last file to copy: {}
+ """,
+ filesToCopy.size(),
+ minimumGenerationNonAncient,
+ filesToCopy.get(0).getPath(),
+ filesToCopy.get(filesToCopy.size() - 1).getPath());
+ }
+
+ return filesToCopy;
+ }
+
+ /**
+ * Gather all PCES files on disk.
+ *
+ * @param selfId the id of this node
+ * @param platformContext the platform context
+ * @return a list of all PCES files on disk
+ */
+ @NonNull
+ private static List gatherPreconsensusFilesOnDisk(
+ @NonNull final NodeId selfId, @NonNull final PlatformContext platformContext) throws IOException {
+ final List allFiles = new ArrayList<>();
+ final Path preconsensusEventStreamDirectory =
+ PreconsensusEventFileManager.getDatabaseDirectory(platformContext, selfId);
+ try (final Stream stream = Files.walk(preconsensusEventStreamDirectory)) {
+ stream.filter(Files::isRegularFile).forEach(path -> {
+ try {
+ allFiles.add(PreconsensusEventFile.of(path));
+ } catch (final IOException e) {
+ // Ignore, this will get thrown for each file that is not a PCES file
+ }
+ });
+ }
+
+ if (allFiles.isEmpty()) {
+ logger.warn(STATE_TO_DISK.getMarker(), "No preconsensus event files found to copy");
+ } else if (allFiles.size() == 1) {
+ logger.info(
+ STATE_TO_DISK.getMarker(),
+ """
+ Found 1 preconsensus file on disk.
+ File: {}""",
+ allFiles.get(0).getPath());
+ } else {
+ logger.info(
+ STATE_TO_DISK.getMarker(),
+ """
+ Found {} preconsensus files on disk.
+ First file: {}
+ Last file: {}""",
+ allFiles.size(),
+ allFiles.get(0).getPath(),
+ allFiles.get(allFiles.size() - 1).getPath());
+ }
+
+ return allFiles;
+ }
+
+ /**
+ * Copy a list of preconsensus event files into a directory.
+ *
+ * @param filesToCopy the files to copy
+ * @param pcesDestination the directory where the files should be copied
+ */
+ private static void copyPreconsensusFileList(
+ @NonNull final List filesToCopy, @NonNull final Path pcesDestination)
+ throws IOException {
+ logger.info(STATE_TO_DISK.getMarker(), "Copying {} preconsensus event file(s)", filesToCopy.size());
+
+ // Copy files from newest to oldest. Newer files are more likely to be modified concurrently by the
+ // PCES file writer and are more likely to fail. If we fail to copy files, it's better to fail early
+ // so that we can retry again more quickly.
+ for (int index = filesToCopy.size() - 1; index >= 0; index--) {
+ final PreconsensusEventFile file = filesToCopy.get(index);
+ Files.copy(file.getPath(), pcesDestination.resolve(file.getFileName()));
+ }
+
+ logger.info(STATE_TO_DISK.getMarker(), "Finished copying {} preconsensus event file(s)", filesToCopy.size());
+ }
+}
diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java
index 0b9b78cc59f5..7212b663b6d9 100644
--- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java
+++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/signed/SignedStateFileWriter.java
@@ -19,9 +19,9 @@
import static com.swirlds.common.io.utility.FileUtils.executeAndRename;
import static com.swirlds.common.io.utility.FileUtils.writeAndFlush;
import static com.swirlds.logging.LogMarker.EXCEPTION;
-import static com.swirlds.logging.LogMarker.STARTUP;
import static com.swirlds.logging.LogMarker.STATE_TO_DISK;
import static com.swirlds.platform.config.internal.PlatformConfigUtils.writeSettingsUsed;
+import static com.swirlds.platform.event.preconsensus.BestEffortPreconsensusEventFileCopy.copyPreconsensusEventStreamFilesRetryOnFailure;
import static com.swirlds.platform.state.signed.SignedStateFileUtils.CURRENT_ADDRESS_BOOK_FILE_NAME;
import static com.swirlds.platform.state.signed.SignedStateFileUtils.FILE_VERSION;
import static com.swirlds.platform.state.signed.SignedStateFileUtils.HASH_INFO_FILE_NAME;
@@ -36,9 +36,6 @@
import com.swirlds.common.system.NodeId;
import com.swirlds.common.system.address.AddressBook;
import com.swirlds.logging.payloads.StateSavedToDiskPayload;
-import com.swirlds.platform.event.preconsensus.PreconsensusEventFile;
-import com.swirlds.platform.event.preconsensus.PreconsensusEventFileManager;
-import com.swirlds.platform.event.preconsensus.PreconsensusEventStreamConfig;
import com.swirlds.platform.recovery.emergencyfile.EmergencyRecoveryFile;
import com.swirlds.platform.state.State;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -46,14 +43,9 @@
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Objects;
-import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -64,14 +56,6 @@ public final class SignedStateFileWriter {
private static final Logger logger = LogManager.getLogger(SignedStateFileWriter.class);
- /**
- * The number of times to attempt to copy the last PCES file. Access to this file is not really coordinated between
- * this logic and the code responsible for managing PCES file lifecycle, and so there is a small chance that the
- * file moves when we attempt to make a copy. However, this probability is fairly small, and it is very unlikely
- * that we will be unable to snatch a copy in time with a few retries.
- */
- private static final int COPY_PCES_MAX_RETRIES = 10;
-
private SignedStateFileWriter() {}
/**
@@ -179,261 +163,8 @@ public static void writeSignedStateFilesToDirectory(
platformContext,
selfId,
directory,
- signedState.getState().getPlatformState().getPlatformData().getMinimumGenerationNonAncient());
- }
- }
-
- private static void copyPreconsensusEventStreamFilesRetryOnFailure(
- @NonNull final PlatformContext platformContext,
- @NonNull final NodeId selfId,
- @NonNull final Path destinationDirectory,
- final long minimumGenerationNonAncient)
- throws IOException {
-
- int triesRemaining = COPY_PCES_MAX_RETRIES;
- while (triesRemaining > 0) {
- triesRemaining--;
- try {
- copyPreconsensusEventStreamFiles(
- platformContext, selfId, destinationDirectory, minimumGenerationNonAncient);
- return;
- } catch (final PreconsensusEventFileRenamed e) {
- if (triesRemaining == 0) {
- logger.error(
- EXCEPTION.getMarker(),
- "Unable to copy the last PCES file after {} retries. "
- + "PCES files will not be written into the state.",
- COPY_PCES_MAX_RETRIES);
- }
- }
- }
- }
-
- /**
- * Copy preconsensus event files into the signed state directory. These files are necessary for the platform to use
- * the state file as a starting point. Note: starting a node using the PCES files in the state directory does not
- * guarantee that there is no data loss (i.e. there may be transactions that reach consensus after the state
- * snapshot), but it does allow a node to start up and participate in gossip.
- *
- *
- * This general strategy is not very elegant is very much a hack. But it will allow us to do migration testing using
- * real production states and streams, in the short term. In the longer term we should consider alternate and
- * cleaner strategies.
- *
- * @param platformContext the platform context
- * @param destinationDirectory the directory where the state is being written
- * @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
- * state that is being written
- */
- private static void copyPreconsensusEventStreamFiles(
- @NonNull final PlatformContext platformContext,
- @NonNull final NodeId selfId,
- @NonNull final Path destinationDirectory,
- final long minimumGenerationNonAncient)
- throws IOException {
-
- final boolean copyPreconsensusStream = platformContext
- .getConfiguration()
- .getConfigData(PreconsensusEventStreamConfig.class)
- .copyRecentStreamToStateSnapshots();
- if (!copyPreconsensusStream) {
- // PCES copying is disabled
- return;
- }
-
- // The PCES files will be copied into this directory
- final Path pcesDestination =
- destinationDirectory.resolve("preconsensus-events").resolve(Long.toString(selfId.id()));
- Files.createDirectories(pcesDestination);
-
- final List allFiles = gatherPreconsensusFilesOnDisk(selfId, platformContext);
- if (allFiles.isEmpty()) {
- return;
- }
-
- // Sort by sequence number
- Collections.sort(allFiles);
-
- // Discard all files that either have an incorrect origin or that do not contain non-ancient events.
- final List filesToCopy =
- getRequiredPreconsensusFiles(allFiles, minimumGenerationNonAncient);
- if (filesToCopy.isEmpty()) {
- return;
- }
-
- copyPreconsensusFileList(filesToCopy, pcesDestination);
- }
-
- /**
- * Get the preconsensus files that we need to copy to a state. We need any file that has a matching origin and that
- * contains non-ancient events (w.r.t. the state).
- *
- * @param allFiles all PCES files on disk
- * @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the
- * state that is being written
- * @return the list of files to copy
- */
- @NonNull
- private static List getRequiredPreconsensusFiles(
- @NonNull final List allFiles, final long minimumGenerationNonAncient) {
-
- final List filesToCopy = new ArrayList<>();
- final PreconsensusEventFile lastFile = allFiles.get(allFiles.size() - 1);
- for (final PreconsensusEventFile file : allFiles) {
- if (file.getOrigin() == lastFile.getOrigin()
- && file.getMaximumGeneration() >= minimumGenerationNonAncient) {
- filesToCopy.add(file);
- }
- }
-
- if (filesToCopy.isEmpty()) {
- logger.warn(
- STATE_TO_DISK.getMarker(),
- "No preconsensus event files meeting specified criteria found to copy. "
- + "Minimum generation non-ancient: {}",
- minimumGenerationNonAncient);
- } else if (filesToCopy.size() == 1) {
- logger.info(
- STATE_TO_DISK.getMarker(),
- """
- Found 1 preconsensus event file meeting specified criteria to copy.
- Minimum generation non-ancient: {}
- File: {}
- """,
- minimumGenerationNonAncient,
- filesToCopy.get(0).getPath());
- } else {
- logger.info(
- STATE_TO_DISK.getMarker(),
- """
- Found {} preconsensus event files meeting specified criteria to copy.
- Minimum generation non-ancient: {}
- First file to copy: {}
- Last file to copy: {}
- """,
- filesToCopy.size(),
- minimumGenerationNonAncient,
- filesToCopy.get(0).getPath(),
- filesToCopy.get(filesToCopy.size() - 1).getPath());
- }
-
- return filesToCopy;
- }
-
- /**
- * Gather all PCES files on disk.
- *
- * @param selfId the id of this node
- * @param platformContext the platform context
- * @return a list of all PCES files on disk
- */
- @NonNull
- private static List gatherPreconsensusFilesOnDisk(
- @NonNull final NodeId selfId, @NonNull final PlatformContext platformContext) throws IOException {
- final List allFiles = new ArrayList<>();
- final Path preconsensusEventStreamDirectory =
- PreconsensusEventFileManager.getDatabaseDirectory(platformContext, selfId);
- try (final Stream stream = Files.walk(preconsensusEventStreamDirectory)) {
- stream.filter(Files::isRegularFile).forEach(path -> {
- try {
- allFiles.add(PreconsensusEventFile.of(path));
- } catch (final IOException e) {
- // Ignore, this will get thrown for each file that is not a PCES file
- }
- });
- }
-
- if (allFiles.isEmpty()) {
- logger.warn(STATE_TO_DISK.getMarker(), "No preconsensus event files found to copy");
- } else if (allFiles.size() == 1) {
- logger.info(
- STATE_TO_DISK.getMarker(),
- """
- Found 1 preconsensus file on disk.
- File: {}""",
- allFiles.get(0).getPath());
- } else {
- logger.info(
- STATE_TO_DISK.getMarker(),
- """
- Found {} preconsensus files on disk.
- First file: {}
- Last file: {}""",
- allFiles.size(),
- allFiles.get(0).getPath(),
- allFiles.get(allFiles.size() - 1).getPath());
- }
-
- return allFiles;
- }
-
- /**
- * Copy a list of preconsensus event files into a directory.
- *
- * @param filesToCopy the files to copy
- * @param pcesDestination the directory where the files should be copied
- */
- private static void copyPreconsensusFileList(
- @NonNull final List filesToCopy, @NonNull final Path pcesDestination) {
- logger.info(
- STATE_TO_DISK.getMarker(),
- "Copying {} preconsensus event files to state snapshot directory",
- filesToCopy.size());
-
- // The last file might be in the process of being written, so we need to do a deep copy of it.
- // Unlike the other files we are going to copy which have a long lifespan and are expected to
- // be stable, the last file is actively in flux. It's possible that the last file will be
- // renamed by the time we get to it, which may cause this copy operation to fail. Attempt
- // to copy this file first, so that if we fail we can abort and retry without other side
- // effects.
- deepCopyPreconsensusFile(filesToCopy.get(filesToCopy.size() - 1), pcesDestination);
-
- // Although the last file may be currently in the process of being written, all previous files will
- // be closed and immutable and so it's safe to hard link them.
- for (int index = 0; index < filesToCopy.size() - 1; index++) {
- hardLinkPreconsensusFile(filesToCopy.get(index), pcesDestination);
- }
-
- logger.info(
- STATE_TO_DISK.getMarker(),
- "Finished copying {} preconsensus event files to state snapshot directory",
- filesToCopy.size());
- }
-
- /**
- * Hard link a PCES file.
- *
- * @param file the file to link
- * @param pcesDestination the directory where the file should be linked into
- */
- private static void hardLinkPreconsensusFile(
- @NonNull final PreconsensusEventFile file, @NonNull final Path pcesDestination) {
- final Path destination = pcesDestination.resolve(file.getFileName());
- try {
- Files.createLink(destination, file.getPath());
- } catch (final IOException e) {
- logger.error(
- EXCEPTION.getMarker(),
- "Exception when hard linking preconsensus event file {} to {}",
- file.getPath(),
- destination,
- e);
- }
- }
-
- /**
- * Attempt to deep copy a PCES file.
- *
- * @param file the file to copy
- * @param pcesDestination the directory where the file should be copied into
- */
- private static void deepCopyPreconsensusFile(
- @NonNull final PreconsensusEventFile file, @NonNull final Path pcesDestination) {
- try {
- Files.copy(file.getPath(), pcesDestination.resolve(file.getFileName()));
- } catch (final IOException e) {
- logger.info(STARTUP.getMarker(), "unable to copy last PCES file (file was likely renamed), will retry");
- throw new PreconsensusEventFileRenamed(e);
+ signedState.getState().getPlatformState().getPlatformData().getMinimumGenerationNonAncient(),
+ signedState.getRound());
}
}