-
Notifications
You must be signed in to change notification settings - Fork 143
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
- Loading branch information
1 parent
d13b121
commit a54f6f9
Showing
2 changed files
with
280 additions
and
272 deletions.
There are no files selected for viewing
277 changes: 277 additions & 0 deletions
277
...ain/java/com/swirlds/platform/event/preconsensus/BestEffortPreconsensusEventFileCopy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,277 @@ | ||
/* | ||
* Copyright (C) 2023 Hedera Hashgraph, LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.swirlds.platform.event.preconsensus; | ||
|
||
import static com.swirlds.common.io.utility.FileUtils.executeAndRename; | ||
import static com.swirlds.logging.LogMarker.EXCEPTION; | ||
import static com.swirlds.logging.LogMarker.STATE_TO_DISK; | ||
|
||
import com.swirlds.common.context.PlatformContext; | ||
import com.swirlds.common.system.NodeId; | ||
import edu.umd.cs.findbugs.annotations.NonNull; | ||
import java.io.IOException; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.stream.Stream; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
/** | ||
* Operations for copying preconsensus event files. Is not fully thread safe, best effort only. Race conditions can | ||
* cause the copy to fail, but if it does fail it will fail atomically and in a way that is recoverable. | ||
*/ | ||
public final class BestEffortPreconsensusEventFileCopy { | ||
|
||
private static final Logger logger = LogManager.getLogger(BestEffortPreconsensusEventFileCopy.class); | ||
|
||
/** | ||
* The number of times to attempt to copy the last PCES file. Access to this file is not really coordinated between | ||
* this logic and the code responsible for managing PCES file lifecycle, and so there is a small chance that the | ||
* file moves when we attempt to make a copy. However, this probability is fairly small, and it is very unlikely | ||
* that we will be unable to snatch a copy in time with a few retries. | ||
*/ | ||
private static final int COPY_PCES_MAX_RETRIES = 10; | ||
|
||
private BestEffortPreconsensusEventFileCopy() {} | ||
|
||
/** | ||
* Copy preconsensus event files into the signed state directory. Copying these files is not thread safe and may | ||
* fail as a result. This method retries several times if a failure is encountered. Success is not guaranteed, but | ||
* success or failure is atomic and will not throw an exception. | ||
* | ||
* @param platformContext the platform context | ||
* @param selfId the id of this node | ||
* @param destinationDirectory the directory where the state is being written | ||
* @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the | ||
* state that is being written | ||
* @param round the round of the state that is being written | ||
*/ | ||
public static void copyPreconsensusEventStreamFilesRetryOnFailure( | ||
@NonNull final PlatformContext platformContext, | ||
@NonNull final NodeId selfId, | ||
@NonNull final Path destinationDirectory, | ||
final long minimumGenerationNonAncient, | ||
final long round) { | ||
|
||
final boolean copyPreconsensusStream = platformContext | ||
.getConfiguration() | ||
.getConfigData(PreconsensusEventStreamConfig.class) | ||
.copyRecentStreamToStateSnapshots(); | ||
if (!copyPreconsensusStream) { | ||
// PCES copying is disabled | ||
return; | ||
} | ||
|
||
final Path pcesDestination = | ||
destinationDirectory.resolve("preconsensus-events").resolve(Long.toString(selfId.id())); | ||
|
||
int triesRemaining = COPY_PCES_MAX_RETRIES; | ||
while (triesRemaining > 0) { | ||
triesRemaining--; | ||
try { | ||
executeAndRename( | ||
pcesDestination, | ||
temporaryDirectory -> copyPreconsensusEventStreamFiles( | ||
platformContext, selfId, temporaryDirectory, minimumGenerationNonAncient)); | ||
|
||
return; | ||
} catch (final IOException e) { | ||
if (triesRemaining > 0) { | ||
logger.warn(STATE_TO_DISK.getMarker(), "Unable to copy PCES files. Retrying."); | ||
} else { | ||
logger.error( | ||
EXCEPTION.getMarker(), | ||
"Unable to copy the last PCES file after {} retries. " | ||
+ "PCES files will not be written into the state snapshot for round {}.", | ||
COPY_PCES_MAX_RETRIES, | ||
round, | ||
e); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Copy preconsensus event files into the signed state directory. These files are necessary for the platform to use | ||
* the state file as a starting point. Note: starting a node using the PCES files in the state directory does not | ||
* guarantee that there is no data loss (i.e. there may be transactions that reach consensus after the state | ||
* snapshot), but it does allow a node to start up and participate in gossip. | ||
* | ||
* <p> | ||
* This general strategy is not very elegant is very much a hack. But it will allow us to do migration testing using | ||
* real production states and streams, in the short term. In the longer term we should consider alternate and | ||
* cleaner strategies. | ||
* | ||
* @param platformContext the platform context | ||
* @param selfId the id of this node | ||
* @param destinationDirectory the directory where the PCES files should be written | ||
* @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the | ||
* state that is being written | ||
*/ | ||
private static void copyPreconsensusEventStreamFiles( | ||
@NonNull final PlatformContext platformContext, | ||
@NonNull final NodeId selfId, | ||
@NonNull final Path destinationDirectory, | ||
final long minimumGenerationNonAncient) | ||
throws IOException { | ||
|
||
final List<PreconsensusEventFile> allFiles = gatherPreconsensusFilesOnDisk(selfId, platformContext); | ||
if (allFiles.isEmpty()) { | ||
return; | ||
} | ||
|
||
// Sort by sequence number | ||
Collections.sort(allFiles); | ||
|
||
// Discard all files that either have an incorrect origin or that do not contain non-ancient events. | ||
final List<PreconsensusEventFile> filesToCopy = | ||
getRequiredPreconsensusFiles(allFiles, minimumGenerationNonAncient); | ||
if (filesToCopy.isEmpty()) { | ||
return; | ||
} | ||
|
||
copyPreconsensusFileList(filesToCopy, destinationDirectory); | ||
} | ||
|
||
/** | ||
* Get the preconsensus files that we need to copy to a state. We need any file that has a matching origin and that | ||
* contains non-ancient events (w.r.t. the state). | ||
* | ||
* @param allFiles all PCES files on disk | ||
* @param minimumGenerationNonAncient the minimum generation of events that are not ancient, with respect to the | ||
* state that is being written | ||
* @return the list of files to copy | ||
*/ | ||
@NonNull | ||
private static List<PreconsensusEventFile> getRequiredPreconsensusFiles( | ||
@NonNull final List<PreconsensusEventFile> allFiles, final long minimumGenerationNonAncient) { | ||
|
||
final List<PreconsensusEventFile> filesToCopy = new ArrayList<>(); | ||
final PreconsensusEventFile lastFile = allFiles.get(allFiles.size() - 1); | ||
for (final PreconsensusEventFile file : allFiles) { | ||
if (file.getOrigin() == lastFile.getOrigin() | ||
&& file.getMaximumGeneration() >= minimumGenerationNonAncient) { | ||
filesToCopy.add(file); | ||
} | ||
} | ||
|
||
if (filesToCopy.isEmpty()) { | ||
logger.warn( | ||
STATE_TO_DISK.getMarker(), | ||
"No preconsensus event files meeting specified criteria found to copy. " | ||
+ "Minimum generation non-ancient: {}", | ||
minimumGenerationNonAncient); | ||
} else if (filesToCopy.size() == 1) { | ||
logger.info( | ||
STATE_TO_DISK.getMarker(), | ||
""" | ||
Found 1 preconsensus event file meeting specified criteria to copy. | ||
Minimum generation non-ancient: {} | ||
File: {} | ||
""", | ||
minimumGenerationNonAncient, | ||
filesToCopy.get(0).getPath()); | ||
} else { | ||
logger.info( | ||
STATE_TO_DISK.getMarker(), | ||
""" | ||
Found {} preconsensus event files meeting specified criteria to copy. | ||
Minimum generation non-ancient: {} | ||
First file to copy: {} | ||
Last file to copy: {} | ||
""", | ||
filesToCopy.size(), | ||
minimumGenerationNonAncient, | ||
filesToCopy.get(0).getPath(), | ||
filesToCopy.get(filesToCopy.size() - 1).getPath()); | ||
} | ||
|
||
return filesToCopy; | ||
} | ||
|
||
/** | ||
* Gather all PCES files on disk. | ||
* | ||
* @param selfId the id of this node | ||
* @param platformContext the platform context | ||
* @return a list of all PCES files on disk | ||
*/ | ||
@NonNull | ||
private static List<PreconsensusEventFile> gatherPreconsensusFilesOnDisk( | ||
@NonNull final NodeId selfId, @NonNull final PlatformContext platformContext) throws IOException { | ||
final List<PreconsensusEventFile> allFiles = new ArrayList<>(); | ||
final Path preconsensusEventStreamDirectory = | ||
PreconsensusEventFileManager.getDatabaseDirectory(platformContext, selfId); | ||
try (final Stream<Path> stream = Files.walk(preconsensusEventStreamDirectory)) { | ||
stream.filter(Files::isRegularFile).forEach(path -> { | ||
try { | ||
allFiles.add(PreconsensusEventFile.of(path)); | ||
} catch (final IOException e) { | ||
// Ignore, this will get thrown for each file that is not a PCES file | ||
} | ||
}); | ||
} | ||
|
||
if (allFiles.isEmpty()) { | ||
logger.warn(STATE_TO_DISK.getMarker(), "No preconsensus event files found to copy"); | ||
} else if (allFiles.size() == 1) { | ||
logger.info( | ||
STATE_TO_DISK.getMarker(), | ||
""" | ||
Found 1 preconsensus file on disk. | ||
File: {}""", | ||
allFiles.get(0).getPath()); | ||
} else { | ||
logger.info( | ||
STATE_TO_DISK.getMarker(), | ||
""" | ||
Found {} preconsensus files on disk. | ||
First file: {} | ||
Last file: {}""", | ||
allFiles.size(), | ||
allFiles.get(0).getPath(), | ||
allFiles.get(allFiles.size() - 1).getPath()); | ||
} | ||
|
||
return allFiles; | ||
} | ||
|
||
/** | ||
* Copy a list of preconsensus event files into a directory. | ||
* | ||
* @param filesToCopy the files to copy | ||
* @param pcesDestination the directory where the files should be copied | ||
*/ | ||
private static void copyPreconsensusFileList( | ||
@NonNull final List<PreconsensusEventFile> filesToCopy, @NonNull final Path pcesDestination) | ||
throws IOException { | ||
logger.info(STATE_TO_DISK.getMarker(), "Copying {} preconsensus event file(s)", filesToCopy.size()); | ||
|
||
// Copy files from newest to oldest. Newer files are more likely to be modified concurrently by the | ||
// PCES file writer and are more likely to fail. If we fail to copy files, it's better to fail early | ||
// so that we can retry again more quickly. | ||
for (int index = filesToCopy.size() - 1; index >= 0; index--) { | ||
final PreconsensusEventFile file = filesToCopy.get(index); | ||
Files.copy(file.getPath(), pcesDestination.resolve(file.getFileName())); | ||
} | ||
|
||
logger.info(STATE_TO_DISK.getMarker(), "Finished copying {} preconsensus event file(s)", filesToCopy.size()); | ||
} | ||
} |
Oops, something went wrong.