Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11714. resetDeletedBlockRetryCount with --all may fail and can cause long db lock in large cluster #7665

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ DatanodeDeletedBlockTransactions getTransactions(
throws IOException;

/**
* Return the failed transactions in the log. A transaction is
* Return the failed transactions in batches in the log. A transaction is
* considered to be failed if it has been sent more than MAX_RETRY limit
* and its count is reset to -1.
*
* @param count Maximum num of returned transactions, if < 0. return all.
* @param batchSize Number of failed transactions to be processed in a batch.
* @param startTxId The least transaction id to start with.
* @return a list of failed deleted block transactions.
* @throws IOException
*/
List<DeletedBlocksTransaction> getFailedTransactionsBatch(int batchSize,
long startTxId) throws IOException;

List<DeletedBlocksTransaction> getFailedTransactions(int count,
long startTxId) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.Set;
Expand All @@ -31,6 +30,7 @@
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
Expand All @@ -53,7 +53,6 @@
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;

import com.google.common.collect.Lists;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
Expand Down Expand Up @@ -121,6 +120,33 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
containerManager, this.scmContext, metrics, scmCommandTimeoutMs);
}

@Override
public List<DeletedBlocksTransaction> getFailedTransactionsBatch(
int batchSize, long startTxId) throws IOException {
List<DeletedBlocksTransaction> failedTXs = new ArrayList<>();

lock.lock();
try {
try (
TableIterator<Long, ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {

iter.seek(startTxId);

while (iter.hasNext() && failedTXs.size() < batchSize) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
failedTXs.add(delTX);
}
}
}
} finally {
lock.unlock();
}

return failedTXs;
}

@Override
public List<DeletedBlocksTransaction> getFailedTransactions(int count,
long startTxId) throws IOException {
Expand All @@ -129,7 +155,7 @@ public List<DeletedBlocksTransaction> getFailedTransactions(int count,
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
deletedBlockLogStateManager.getReadOnlyIterator()) {
if (count == LIST_ALL_FAILED_TRANSACTIONS) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
Expand Down Expand Up @@ -176,19 +202,72 @@ public void incrementCount(List<Long> txIDs)
*/
@Override
public int resetCount(List<Long> txIDs) throws IOException {
lock.lock();
final int batchSize = 100;
int totalProcessed = 0;
long startTxId = 0;

try {
// If txIDs are not provided, fetch all failed transactions in batches
if (txIDs == null || txIDs.isEmpty()) {
txIDs = getFailedTransactions(LIST_ALL_FAILED_TRANSACTIONS, 0).stream()
.map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());
List<DeletedBlocksTransaction> batch;
do {
// Fetch the batch of failed transactions
batch = getFailedTransactionsBatch(batchSize, startTxId);

// If the batch is empty, skip further processing
if (!batch.isEmpty()) {
List<Long> batchTxIDs = batch.stream()
.map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());

lock.lock();
try {
transactionStatusManager.resetRetryCount(batchTxIDs);
int batchProcessed = deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(batchTxIDs));

totalProcessed += batchProcessed;
} finally {
lock.unlock();
}

// Update startTxId to continue from the last processed transaction in the next iteration
startTxId = batch.get(batch.size() - 1).getTxID() + 1;
}

} while (!batch.isEmpty());
} else {
// Process txIDs provided by the user in batches
for (List<Long> batch : partitionList(txIDs, batchSize)) {
lock.lock();
try {
transactionStatusManager.resetRetryCount(batch);
int batchProcessed = deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(batch));

totalProcessed += batchProcessed;
} finally {
lock.unlock();
}
}
}
transactionStatusManager.resetRetryCount(txIDs);
return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(new HashSet<>(txIDs)));
} finally {
lock.unlock();

return totalProcessed;
} catch (Exception e) {
throw new IOException("Error during transaction reset", e);
}
}


/**
* Helper method to partition a list into smaller batches.
*/
private <T> List<List<T>> partitionList(List<T> list, int batchSize) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += batchSize) {
partitions.add(list.subList(i, Math.min(i + batchSize, list.size())));
}
return partitions;
}

private DeletedBlocksTransaction constructNewTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void testBlockDeletionTransactions() throws Exception {
cluster.getStorageContainerManager().getScmHAManager()
.asSCMHADBTransactionBuffer().flush();
}
return delLog.getFailedTransactions(-1, 0).size() == 0;
return delLog.getFailedTransactionsBatch(2, 0).size() == 0;
} catch (IOException e) {
return false;
}
Expand Down
Loading