Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11784. Add missing parent directories for abort MPU and abort expired MPU requests #7566

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.PrefixManager;
import org.apache.hadoop.ozone.om.ResolvedBucket;
import org.apache.hadoop.ozone.om.OMConfigKeys;
Expand All @@ -67,6 +69,7 @@
import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy;
import org.apache.hadoop.ozone.om.request.OMClientRequestUtils;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
Expand Down Expand Up @@ -1373,4 +1376,116 @@ protected void validateEncryptionKeyInfo(OmBucketInfo bucketInfo, KeyArgs keyArg
keyArgs.getKeyName() + " in encrypted bucket " + keyArgs.getBucketName(), INVALID_REQUEST);
}
}

protected void addMissingParentsToCache(OmBucketInfo omBucketInfo,
List<OmDirectoryInfo> missingParentInfos,
OMMetadataManager omMetadataManager,
long volumeId,
long bucketId,
long transactionLogIndex) throws IOException {

// validate and update namespace for missing parent directory.
checkBucketQuotaInNamespace(omBucketInfo, missingParentInfos.size());
omBucketInfo.incrUsedNamespace(missingParentInfos.size());
Comment on lines +1388 to +1389
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, if we are only adding a directories in cache which will be cleaned up afterwards (for abort and expired abort), I don't think we should increased the usedNamespace of the parent bucket since this will cause usedNamespace to never be 0 since the correspodning decrUsedNamespace will not be called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is copied from https://github.com/apache/ozone/pull/6496/files#diff-28870b5ea5ae6d1f8207b99595dc84f585becd8f2c53ee1d852d6ae34b7229f4R86-R87. Pls let me know if this logic is needed in MPU complete request. So my essential question is should we increase the used namespace at initiate time, or the complete time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the #7566 (comment) for another approach where we don't need to create missing parent directories, and therefore no need to update the bucket namespace.


// Add cache entries for the missing parent directories.
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
volumeId, bucketId, transactionLogIndex,
missingParentInfos, null);
}

protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey,
String keyName,
OMMetadataManager omMetadataManager) throws IOException {
return omMetadataManager.getOpenKeyTable(getBucketLayout())
.get(dbMultipartKey);
}

protected void addMultiPartToCache(
OMMetadataManager omMetadataManager, String multipartOpenKey,
OMFileRequest.OMPathInfoWithFSO pathInfoFSO, OmKeyInfo omKeyInfo,
String keyName, long transactionLogIndex
) {

// Add multi part to cache
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(),
keyName, transactionLogIndex);

}

protected boolean addMissingDirectoriesToCacheEnabled() {
return false;
}

protected List<OmDirectoryInfo> addOrGetMissingDirectories(OzoneManager ozoneManager,
OzoneManagerProtocolProtos.KeyArgs keyArgs,
long trxnLogIndex) throws
IOException {
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
final String volumeName = keyArgs.getVolumeName();
final String bucketName = keyArgs.getBucketName();
final String keyName = keyArgs.getKeyName();
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);
OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName,
keyName, Paths.get(keyName));
List<OmDirectoryInfo> missingParentInfos =
getAllMissingParentDirInfo(ozoneManager, keyArgs, omBucketInfo,
pathInfoFSO, trxnLogIndex);
if (!addMissingDirectoriesToCacheEnabled()) {
return missingParentInfos;
}
Comment on lines +1437 to +1439
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know what is the purpose of this? Are there any OM requests that called addOrGetMissingDirectories but will not actually add the missing directories to the cache?

If there is, please add a corresponding log. If there isn't, I think we can just remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to differentiate the behaviors between FSO request and non-FSO request. As @adoroszlai stated here: #7566 (comment), the PR should be only related to FSO requests.

You see in this PR: by default, this addMissingDirectoriesToCacheEnabled() will return false. But for FSO request, it overrides it as true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok understood, unlike in S3InitiateMultipartUploadRequestWithFSO, validateAndUpdateCache for S3MultipartUploadCompleteRequestWithFSO and S3MultipartUploadAbortRequestWithFSO are implemented by the parent instead, requiring the distinction with the flag.


if (missingParentInfos != null && !missingParentInfos.isEmpty()) {
final long volumeId = omMetadataManager.getVolumeId(volumeName);
final long bucketId = omMetadataManager.getBucketId(volumeName,
bucketName);

// add all missing parents to directory table
addMissingParentsToCache(omBucketInfo, missingParentInfos,
omMetadataManager, volumeId, bucketId, trxnLogIndex);

String multipartOpenKey = omMetadataManager
.getMultipartKey(volumeId, bucketId,
pathInfoFSO.getLastKnownParentId(),
pathInfoFSO.getLeafNodeName(),
keyArgs.getMultipartUploadID());

if (getOmKeyInfoFromOpenKeyTable(multipartOpenKey,
keyName, omMetadataManager) == null) {

final ReplicationConfig replicationConfig = OzoneConfigUtil
.resolveReplicationConfigPreference(keyArgs.getType(),
keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
omBucketInfo != null ?
omBucketInfo.getDefaultReplicationConfig() :
null, ozoneManager);

OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setReplicationConfig(replicationConfig)
.setOmKeyLocationInfos(Collections.singletonList(
new OmKeyLocationInfoGroup(0, new ArrayList<>(), true)))
.setAcls(getAclsForKey(keyArgs, omBucketInfo, pathInfoFSO,
ozoneManager.getPrefixManager(), ozoneManager.getConfiguration()))
.setObjectID(pathInfoFSO.getLeafNodeObjectId())
.setUpdateID(trxnLogIndex)
.setFileEncryptionInfo(keyArgs.hasFileEncryptionInfo() ?
OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null)
.setParentObjectID(pathInfoFSO.getLastKnownParentId())
.build();

// Add missing multi part info to open key table
addMultiPartToCache(omMetadataManager, multipartOpenKey,
pathInfoFSO, keyInfoFromArgs, keyName, trxnLogIndex);
}
}
return missingParentInfos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
package org.apache.hadoop.ozone.om.request.s3.multipart;

import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
Expand Down Expand Up @@ -49,6 +53,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -190,6 +195,30 @@ private void processResults(OMMetrics omMetrics,

}

private KeyArgs buildKeyArgs(final OmMultipartUpload multipartUpload) throws OMException {
final long now = Instant.now().toEpochMilli();
final ReplicationConfig replicationConfig =
multipartUpload.getReplicationConfig();
KeyArgs.Builder builder =
KeyArgs.newBuilder()
.setVolumeName(multipartUpload.getVolumeName())
.setBucketName(multipartUpload.getBucketName())
.setKeyName(multipartUpload.getKeyName())
.setType(replicationConfig.getReplicationType())
.setModificationTime(now);

if (replicationConfig instanceof ECReplicationConfig) {
builder.setEcReplicationConfig(((ECReplicationConfig) replicationConfig).toProto());
} else if (replicationConfig instanceof RatisReplicationConfig) {
builder.setFactor(((RatisReplicationConfig) replicationConfig).getReplicationFactor());
} else if (replicationConfig instanceof StandaloneReplicationConfig) {
builder.setFactor(((StandaloneReplicationConfig) replicationConfig).getReplicationFactor());
} else {
throw new OMException(OMException.ResultCodes.INVALID_REQUEST);
}
return builder.build();
}

private void updateTableCache(OzoneManager ozoneManager,
long trxnLogIndex, ExpiredMultipartUploadsBucket mpusPerBucket,
Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads)
Expand Down Expand Up @@ -250,6 +279,7 @@ private void updateTableCache(OzoneManager ozoneManager,
try {
multipartUpload =
OmMultipartUpload.from(expiredMPUKeyName);
multipartUpload.setReplicationConfig(omMultipartKeyInfo.getReplicationConfig());
} catch (IllegalArgumentException e) {
LOG.warn("Aborting expired MPU failed: MPU key: " +
expiredMPUKeyName + " has invalid structure, " +
Expand All @@ -259,6 +289,8 @@ private void updateTableCache(OzoneManager ozoneManager,

String multipartOpenKey;
try {
KeyArgs keyArgs = buildKeyArgs(multipartUpload);
addOrGetMissingDirectories(ozoneManager, keyArgs, trxnLogIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have a corresponding OM response to add the missing directories to the DB batch similar to #6496?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know your thoughts on my reply related to #6496

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the DIRECTORY_TABLE to @CleanupTableInfo for S3ExpiredMultipartUploadsAbortResponse, S3MultipartUploadAbortResponseWithFSO, and S3MultipartUploadCompleteResponseWithFSO now

multipartOpenKey =
OMMultipartUploadUtils
.getMultipartOpenKey(multipartUpload.getVolumeName(),
Expand All @@ -271,7 +303,7 @@ private void updateTableCache(OzoneManager ozoneManager,
multipartUpload.getVolumeName() + ", bucket: " +
multipartUpload.getBucketName() + ", key: " +
multipartUpload.getKeyName() + ". Cannot parse the open key" +
"for this MPU, skipping this MPU.");
"for this MPU, skipping this MPU.", ome);
continue;
}

Expand Down Expand Up @@ -336,6 +368,10 @@ private void updateTableCache(OzoneManager ozoneManager,
.releaseWriteLock(BUCKET_LOCK, volumeName, bucketName));
}
}
}

@Override
protected boolean addMissingDirectoriesToCacheEnabled() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut

validateBucketAndVolume(omMetadataManager, volumeName, bucketName);

missingParentInfos = addOrGetMissingDirectories(ozoneManager, keyArgs, transactionLogIndex);

OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName,
keyName, Paths.get(keyName));
Expand All @@ -119,10 +121,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
final OmBucketInfo bucketInfo = getBucketInfo(omMetadataManager,
volumeName, bucketName);

// add all missing parents to dir table
missingParentInfos = getAllMissingParentDirInfo(ozoneManager, keyArgs, bucketInfo,
pathInfoFSO, transactionLogIndex);

// We are adding uploadId to key, because if multiple users try to
// perform multipart upload on the same key, each will try to upload, who
// ever finally commit the key, we see that key in ozone. Suppose if we
Expand Down Expand Up @@ -192,18 +190,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
.addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList()))
.addAllTags(KeyValueUtil.getFromProtobuf(keyArgs.getTagsList()))
.build();

// validate and update namespace for missing parent directory
if (null != missingParentInfos) {
checkBucketQuotaInNamespace(bucketInfo, missingParentInfos.size());
bucketInfo.incrUsedNamespace(missingParentInfos.size());
}

// Add cache entries for the prefix directories.
// Skip adding for the file key itself, until Key Commit.
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager,
volumeId, bucketId, transactionLogIndex,
missingParentInfos, null);

OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(), keyName,
Expand All @@ -213,6 +199,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
omMetadataManager.getMultipartInfoTable().addCacheEntry(
multipartKey, multipartKeyInfo, transactionLogIndex);

if (bucketInfo == null) {
throw new IOException("bucketInfo is null");
}
Comment on lines +202 to +204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? validateBucketAndVolume will check the bucket existence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do hit this condition: String bucketKey is not null, but 'CacheValue bucketInfo' is null. Pls let me know if my validation here is reasonable.

Copy link
Contributor

@ivandika3 ivandika3 Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, validateBucketAndVolume is already checking the cache as well. Moreover, the OM request was done under a BUCKET_LOCK, so the bucket should still exists during the request processing. Bucket not found check should be earlier before updating any OM state. Moreover, it should be an OMException thrown with BUCKET_NOT_FOUND ResultCodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think I saw this condition happened and that is why I added this block later on. But let me delete it and re-test

omClientResponse =
new S3InitiateMultipartUploadResponseWithFSO(
omResponse.setInitiateMultiPartUploadResponse(
Expand Down Expand Up @@ -246,6 +235,11 @@ missingParentInfos, getBucketLayout(), volumeId, bucketId,
return omClientResponse;
}

@Override
protected boolean addMissingDirectoriesToCacheEnabled() {
return true;
}

/**
* Verify om directory result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;

import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
Expand All @@ -47,8 +49,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart
.S3MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortRequest;
Expand All @@ -57,8 +58,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;

import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;

Expand Down Expand Up @@ -140,6 +139,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut

String multipartOpenKey;
try {
addOrGetMissingDirectories(ozoneManager, keyArgs, trxnLogIndex);
multipartOpenKey =
getMultipartOpenKey(keyArgs.getMultipartUploadID(), volumeName,
bucketName, keyName, omMetadataManager);
Expand All @@ -157,13 +157,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
// If there is no entry in openKeyTable, then there is no multipart
// upload initiated for this key.
if (omKeyInfo == null) {
throw new OMException("Abort Multipart Upload Failed: volume: " +
throw new OMException("Abort Multipart Upload Failed (omKeyInfo not found): volume: " +
requestedVolume + "bucket: " + requestedBucket + "key: " + keyName,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}

multipartKeyInfo = omMetadataManager.getMultipartInfoTable()
.get(multipartKey);
if (multipartKeyInfo == null) {
throw new OMException("Abort Multipart Upload Failed (multipartKeyInfo not found): volume: " +
requestedVolume + "bucket: " + requestedBucket + "key: " + keyName,
OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
}
multipartKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());

// When abort uploaded key, we need to subtract the PartKey length from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ protected OMClientResponse getOmClientResponse(OzoneManager ozoneManager,
omBucketInfo.copyObject(), getBucketLayout());
return omClientResp;
}

@Override
protected boolean addMissingDirectoriesToCacheEnabled() {
return true;
}
}
Loading
Loading