-
Notifications
You must be signed in to change notification settings - Fork 517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-11784. Add missing parent directories for abort MPU and abort expired MPU requests #7566
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
import jakarta.annotation.Nonnull; | ||
import jakarta.annotation.Nullable; | ||
import java.io.IOException; | ||
import java.nio.file.Paths; | ||
import java.security.GeneralSecurityException; | ||
import java.security.PrivilegedExceptionAction; | ||
import java.util.ArrayList; | ||
|
@@ -47,6 +48,7 @@ | |
import org.apache.hadoop.ozone.OzoneAcl; | ||
import org.apache.hadoop.ozone.OzoneConsts; | ||
import org.apache.hadoop.ozone.om.OMMetrics; | ||
import org.apache.hadoop.ozone.om.OzoneConfigUtil; | ||
import org.apache.hadoop.ozone.om.PrefixManager; | ||
import org.apache.hadoop.ozone.om.ResolvedBucket; | ||
import org.apache.hadoop.ozone.om.OMConfigKeys; | ||
|
@@ -67,6 +69,7 @@ | |
import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy; | ||
import org.apache.hadoop.ozone.om.request.OMClientRequestUtils; | ||
import org.apache.hadoop.ozone.om.request.file.OMFileRequest; | ||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; | ||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo; | ||
import org.apache.hadoop.ozone.protocolPB.OMPBHelper; | ||
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; | ||
|
@@ -1373,4 +1376,116 @@ protected void validateEncryptionKeyInfo(OmBucketInfo bucketInfo, KeyArgs keyArg | |
keyArgs.getKeyName() + " in encrypted bucket " + keyArgs.getBucketName(), INVALID_REQUEST); | ||
} | ||
} | ||
|
||
protected void addMissingParentsToCache(OmBucketInfo omBucketInfo, | ||
List<OmDirectoryInfo> missingParentInfos, | ||
OMMetadataManager omMetadataManager, | ||
long volumeId, | ||
long bucketId, | ||
long transactionLogIndex) throws IOException { | ||
|
||
// validate and update namespace for missing parent directory. | ||
checkBucketQuotaInNamespace(omBucketInfo, missingParentInfos.size()); | ||
omBucketInfo.incrUsedNamespace(missingParentInfos.size()); | ||
|
||
// Add cache entries for the missing parent directories. | ||
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager, | ||
volumeId, bucketId, transactionLogIndex, | ||
missingParentInfos, null); | ||
} | ||
|
||
protected OmKeyInfo getOmKeyInfoFromOpenKeyTable(String dbMultipartKey, | ||
String keyName, | ||
OMMetadataManager omMetadataManager) throws IOException { | ||
return omMetadataManager.getOpenKeyTable(getBucketLayout()) | ||
.get(dbMultipartKey); | ||
} | ||
|
||
protected void addMultiPartToCache( | ||
OMMetadataManager omMetadataManager, String multipartOpenKey, | ||
OMFileRequest.OMPathInfoWithFSO pathInfoFSO, OmKeyInfo omKeyInfo, | ||
String keyName, long transactionLogIndex | ||
) { | ||
|
||
// Add multi part to cache | ||
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, | ||
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(), | ||
keyName, transactionLogIndex); | ||
|
||
} | ||
|
||
protected boolean addMissingDirectoriesToCacheEnabled() { | ||
return false; | ||
} | ||
|
||
protected List<OmDirectoryInfo> addOrGetMissingDirectories(OzoneManager ozoneManager, | ||
OzoneManagerProtocolProtos.KeyArgs keyArgs, | ||
long trxnLogIndex) throws | ||
IOException { | ||
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); | ||
final String volumeName = keyArgs.getVolumeName(); | ||
final String bucketName = keyArgs.getBucketName(); | ||
final String keyName = keyArgs.getKeyName(); | ||
OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager, | ||
volumeName, bucketName); | ||
OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest | ||
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName, | ||
keyName, Paths.get(keyName)); | ||
List<OmDirectoryInfo> missingParentInfos = | ||
getAllMissingParentDirInfo(ozoneManager, keyArgs, omBucketInfo, | ||
pathInfoFSO, trxnLogIndex); | ||
if (!addMissingDirectoriesToCacheEnabled()) { | ||
return missingParentInfos; | ||
} | ||
Comment on lines
+1437
to
+1439
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May I know what is the purpose of this? Are there any OM requests that called If there is, please add a corresponding log. If there isn't, I think we can just remove this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to differentiate the behaviors between FSO request and non-FSO request. As @adoroszlai stated here: #7566 (comment), the PR should be only related to FSO requests. You see in this PR: by default, this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok understood, unlike in |
||
|
||
if (missingParentInfos != null && !missingParentInfos.isEmpty()) { | ||
final long volumeId = omMetadataManager.getVolumeId(volumeName); | ||
final long bucketId = omMetadataManager.getBucketId(volumeName, | ||
bucketName); | ||
|
||
// add all missing parents to directory table | ||
addMissingParentsToCache(omBucketInfo, missingParentInfos, | ||
omMetadataManager, volumeId, bucketId, trxnLogIndex); | ||
|
||
String multipartOpenKey = omMetadataManager | ||
.getMultipartKey(volumeId, bucketId, | ||
pathInfoFSO.getLastKnownParentId(), | ||
pathInfoFSO.getLeafNodeName(), | ||
keyArgs.getMultipartUploadID()); | ||
|
||
if (getOmKeyInfoFromOpenKeyTable(multipartOpenKey, | ||
keyName, omMetadataManager) == null) { | ||
|
||
final ReplicationConfig replicationConfig = OzoneConfigUtil | ||
.resolveReplicationConfigPreference(keyArgs.getType(), | ||
keyArgs.getFactor(), keyArgs.getEcReplicationConfig(), | ||
omBucketInfo != null ? | ||
omBucketInfo.getDefaultReplicationConfig() : | ||
null, ozoneManager); | ||
|
||
OmKeyInfo keyInfoFromArgs = new OmKeyInfo.Builder() | ||
.setVolumeName(volumeName) | ||
.setBucketName(bucketName) | ||
.setKeyName(keyName) | ||
.setCreationTime(keyArgs.getModificationTime()) | ||
.setModificationTime(keyArgs.getModificationTime()) | ||
.setReplicationConfig(replicationConfig) | ||
.setOmKeyLocationInfos(Collections.singletonList( | ||
new OmKeyLocationInfoGroup(0, new ArrayList<>(), true))) | ||
.setAcls(getAclsForKey(keyArgs, omBucketInfo, pathInfoFSO, | ||
ozoneManager.getPrefixManager(), ozoneManager.getConfiguration())) | ||
.setObjectID(pathInfoFSO.getLeafNodeObjectId()) | ||
.setUpdateID(trxnLogIndex) | ||
.setFileEncryptionInfo(keyArgs.hasFileEncryptionInfo() ? | ||
OMPBHelper.convert(keyArgs.getFileEncryptionInfo()) : null) | ||
.setParentObjectID(pathInfoFSO.getLastKnownParentId()) | ||
.build(); | ||
|
||
// Add missing multi part info to open key table | ||
addMultiPartToCache(omMetadataManager, multipartOpenKey, | ||
pathInfoFSO, keyInfoFromArgs, keyName, trxnLogIndex); | ||
} | ||
} | ||
return missingParentInfos; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,10 @@ | |
|
||
import org.apache.hadoop.ozone.OzoneConsts; | ||
import org.apache.ratis.server.protocol.TermIndex; | ||
import org.apache.hadoop.hdds.client.ECReplicationConfig; | ||
import org.apache.hadoop.hdds.client.RatisReplicationConfig; | ||
import org.apache.hadoop.hdds.client.ReplicationConfig; | ||
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; | ||
import org.apache.hadoop.hdds.utils.db.cache.CacheKey; | ||
import org.apache.hadoop.hdds.utils.db.cache.CacheValue; | ||
import org.apache.hadoop.ozone.audit.OMAction; | ||
|
@@ -49,6 +53,7 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
|
@@ -190,6 +195,30 @@ private void processResults(OMMetrics omMetrics, | |
|
||
} | ||
|
||
private KeyArgs buildKeyArgs(final OmMultipartUpload multipartUpload) throws OMException { | ||
final long now = Instant.now().toEpochMilli(); | ||
final ReplicationConfig replicationConfig = | ||
multipartUpload.getReplicationConfig(); | ||
KeyArgs.Builder builder = | ||
KeyArgs.newBuilder() | ||
.setVolumeName(multipartUpload.getVolumeName()) | ||
.setBucketName(multipartUpload.getBucketName()) | ||
.setKeyName(multipartUpload.getKeyName()) | ||
.setType(replicationConfig.getReplicationType()) | ||
.setModificationTime(now); | ||
|
||
if (replicationConfig instanceof ECReplicationConfig) { | ||
builder.setEcReplicationConfig(((ECReplicationConfig) replicationConfig).toProto()); | ||
} else if (replicationConfig instanceof RatisReplicationConfig) { | ||
builder.setFactor(((RatisReplicationConfig) replicationConfig).getReplicationFactor()); | ||
} else if (replicationConfig instanceof StandaloneReplicationConfig) { | ||
builder.setFactor(((StandaloneReplicationConfig) replicationConfig).getReplicationFactor()); | ||
} else { | ||
throw new OMException(OMException.ResultCodes.INVALID_REQUEST); | ||
} | ||
return builder.build(); | ||
} | ||
|
||
private void updateTableCache(OzoneManager ozoneManager, | ||
long trxnLogIndex, ExpiredMultipartUploadsBucket mpusPerBucket, | ||
Map<OmBucketInfo, List<OmMultipartAbortInfo>> abortedMultipartUploads) | ||
|
@@ -250,6 +279,7 @@ private void updateTableCache(OzoneManager ozoneManager, | |
try { | ||
multipartUpload = | ||
OmMultipartUpload.from(expiredMPUKeyName); | ||
multipartUpload.setReplicationConfig(omMultipartKeyInfo.getReplicationConfig()); | ||
} catch (IllegalArgumentException e) { | ||
LOG.warn("Aborting expired MPU failed: MPU key: " + | ||
expiredMPUKeyName + " has invalid structure, " + | ||
|
@@ -259,6 +289,8 @@ private void updateTableCache(OzoneManager ozoneManager, | |
|
||
String multipartOpenKey; | ||
try { | ||
KeyArgs keyArgs = buildKeyArgs(multipartUpload); | ||
addOrGetMissingDirectories(ozoneManager, keyArgs, trxnLogIndex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should have a corresponding OM response to add the missing directories to the DB batch similar to #6496? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know your thoughts on my reply related to #6496 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the |
||
multipartOpenKey = | ||
OMMultipartUploadUtils | ||
.getMultipartOpenKey(multipartUpload.getVolumeName(), | ||
|
@@ -271,7 +303,7 @@ private void updateTableCache(OzoneManager ozoneManager, | |
multipartUpload.getVolumeName() + ", bucket: " + | ||
multipartUpload.getBucketName() + ", key: " + | ||
multipartUpload.getKeyName() + ". Cannot parse the open key" + | ||
"for this MPU, skipping this MPU."); | ||
"for this MPU, skipping this MPU.", ome); | ||
continue; | ||
} | ||
|
||
|
@@ -336,6 +368,10 @@ private void updateTableCache(OzoneManager ozoneManager, | |
.releaseWriteLock(BUCKET_LOCK, volumeName, bucketName)); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
protected boolean addMissingDirectoriesToCacheEnabled() { | ||
return true; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn | |
|
||
validateBucketAndVolume(omMetadataManager, volumeName, bucketName); | ||
|
||
missingParentInfos = addOrGetMissingDirectories(ozoneManager, keyArgs, transactionLogIndex); | ||
|
||
OMFileRequest.OMPathInfoWithFSO pathInfoFSO = OMFileRequest | ||
.verifyDirectoryKeysInPath(omMetadataManager, volumeName, bucketName, | ||
keyName, Paths.get(keyName)); | ||
|
@@ -119,10 +121,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn | |
final OmBucketInfo bucketInfo = getBucketInfo(omMetadataManager, | ||
volumeName, bucketName); | ||
|
||
// add all missing parents to dir table | ||
missingParentInfos = getAllMissingParentDirInfo(ozoneManager, keyArgs, bucketInfo, | ||
pathInfoFSO, transactionLogIndex); | ||
|
||
// We are adding uploadId to key, because if multiple users try to | ||
// perform multipart upload on the same key, each will try to upload, who | ||
// ever finally commit the key, we see that key in ozone. Suppose if we | ||
|
@@ -192,18 +190,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn | |
.addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList())) | ||
.addAllTags(KeyValueUtil.getFromProtobuf(keyArgs.getTagsList())) | ||
.build(); | ||
|
||
// validate and update namespace for missing parent directory | ||
if (null != missingParentInfos) { | ||
checkBucketQuotaInNamespace(bucketInfo, missingParentInfos.size()); | ||
bucketInfo.incrUsedNamespace(missingParentInfos.size()); | ||
} | ||
|
||
// Add cache entries for the prefix directories. | ||
// Skip adding for the file key itself, until Key Commit. | ||
OMFileRequest.addDirectoryTableCacheEntries(omMetadataManager, | ||
volumeId, bucketId, transactionLogIndex, | ||
missingParentInfos, null); | ||
|
||
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, | ||
multipartOpenKey, omKeyInfo, pathInfoFSO.getLeafNodeName(), keyName, | ||
|
@@ -213,6 +199,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn | |
omMetadataManager.getMultipartInfoTable().addCacheEntry( | ||
multipartKey, multipartKeyInfo, transactionLogIndex); | ||
|
||
if (bucketInfo == null) { | ||
throw new IOException("bucketInfo is null"); | ||
} | ||
Comment on lines
+202
to
+204
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do hit this condition: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. I think I saw this condition happened and that is why I added this block later on. But let me delete it and re-test |
||
omClientResponse = | ||
new S3InitiateMultipartUploadResponseWithFSO( | ||
omResponse.setInitiateMultiPartUploadResponse( | ||
|
@@ -246,6 +235,11 @@ missingParentInfos, getBucketLayout(), volumeId, bucketId, | |
return omClientResponse; | ||
} | ||
|
||
@Override | ||
protected boolean addMissingDirectoriesToCacheEnabled() { | ||
return true; | ||
} | ||
|
||
/** | ||
* Verify om directory result. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, if we are only adding a directories in cache which will be cleaned up afterwards (for abort and expired abort), I don't think we should increased the usedNamespace of the parent bucket since this will cause
usedNamespace
to never be 0 since the correspodningdecrUsedNamespace
will not be called.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is copied from https://github.com/apache/ozone/pull/6496/files#diff-28870b5ea5ae6d1f8207b99595dc84f585becd8f2c53ee1d852d6ae34b7229f4R86-R87. Pls let me know if this logic is needed in MPU complete request. So my essential question is should we increase the used namespace at initiate time, or the complete time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the #7566 (comment) for another approach where we don't need to create missing parent directories, and therefore no need to update the bucket namespace.