Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metadata Management] add s3 interface for s3 client mocking #1406

Open
wants to merge 2 commits into
base: metadata-management
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions metadata/pkg/drivers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
backendpb "github.com/opensds/multi-cloud/backend/proto"
"github.com/opensds/multi-cloud/metadata/pkg/db"
"github.com/opensds/multi-cloud/metadata/pkg/model"
Expand Down Expand Up @@ -50,8 +51,7 @@ type AwsAdapter struct {
Session *session.Session
}

func GetHeadObject(sess *session.Session, bucketName *string, obj *model.MetaObject) {
svc := s3.New(sess)
func GetHeadObject(svc s3iface.S3API, bucketName *string, obj *model.MetaObject) {
meta, err := svc.HeadObject(&s3.HeadObjectInput{Bucket: bucketName, Key: &obj.ObjectName})
if err != nil {
log.Errorf("cannot perform head object on object %v in bucket %v. failed with error: %v", obj.ObjectName, *bucketName, err)
Expand All @@ -60,7 +60,9 @@ func GetHeadObject(sess *session.Session, bucketName *string, obj *model.MetaObj
if meta.ServerSideEncryption != nil {
obj.ServerSideEncryption = *meta.ServerSideEncryption
}
obj.ObjectType = *meta.ContentType
if meta.ContentType != nil {
obj.ObjectType = *meta.ContentType
}
if meta.Expires != nil {
expiresTime, err := time.Parse(time.RFC3339, *meta.Expires)
if err != nil {
Expand All @@ -82,12 +84,11 @@ func GetHeadObject(sess *session.Session, bucketName *string, obj *model.MetaObj
obj.Metadata = metadata
}

func ObjectList(sess *session.Session, bucket *model.MetaBucket) error {
svc := s3.New(sess)
output, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{Bucket: &bucket.Name})
var ObjectList = func(svc s3iface.S3API, bucketName string) ([]*model.MetaObject, int64, error) {
output, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{Bucket: &bucketName})
if err != nil {
log.Errorf("unable to list objects in bucket %v. failed with error: %v", bucket.Name, err)
return err
log.Errorf("unable to list objects in bucket %v. failed with error: %v", bucketName, err)
return nil, 0, err
}

numObjects := len(output.Contents)
Expand All @@ -102,7 +103,7 @@ func ObjectList(sess *session.Session, bucket *model.MetaBucket) error {
totSize += obj.Size
obj.StorageClass = *object.StorageClass

tags, err := svc.GetObjectTagging(&s3.GetObjectTaggingInput{Bucket: &bucket.Name, Key: &obj.ObjectName})
tags, err := svc.GetObjectTagging(&s3.GetObjectTaggingInput{Bucket: &bucketName, Key: &obj.ObjectName})

if err == nil {
tagset := map[string]string{}
Expand All @@ -117,7 +118,7 @@ func ObjectList(sess *session.Session, bucket *model.MetaBucket) error {
log.Errorf("unable to get object tags. failed with error: %v", err)
}

acl, err := svc.GetObjectAcl(&s3.GetObjectAclInput{Bucket: &bucket.Name, Key: &obj.ObjectName})
acl, err := svc.GetObjectAcl(&s3.GetObjectAclInput{Bucket: &bucketName, Key: &obj.ObjectName})
if err != nil {
log.Errorf("unable to get object Acl. failed with error: %v", err)
} else {
Expand All @@ -128,25 +129,21 @@ func ObjectList(sess *session.Session, bucket *model.MetaBucket) error {
obj.ObjectAcl = access
}

GetHeadObject(sess, &bucket.Name, obj)
GetHeadObject(svc, &bucketName, obj)
}
bucket.NumberOfObjects = numObjects
bucket.TotalSize = totSize
bucket.Objects = objectArray
return nil
return objectArray, totSize, nil
}

func GetBucketMeta(buckIdx int, bucket *s3.Bucket, sess *session.Session, bucketArray []*model.MetaBucket, wg *sync.WaitGroup) {
func GetBucketMeta(buckIdx int, bucket *s3.Bucket, svc s3iface.S3API, backendRegion *string, bucketArray []*model.MetaBucket, wg *sync.WaitGroup) {
defer wg.Done()

svc := s3.New(sess)
loc, err := svc.GetBucketLocation(&s3.GetBucketLocationInput{Bucket: bucket.Name})
if err != nil {
log.Errorf("unable to get bucket location. failed with error: %v", err)
return
}

if *loc.LocationConstraint != *sess.Config.Region {
if *loc.LocationConstraint != *backendRegion {
return
}

Expand All @@ -155,22 +152,27 @@ func GetBucketMeta(buckIdx int, bucket *s3.Bucket, sess *session.Session, bucket
buck.CreationDate = bucket.CreationDate
buck.Region = *loc.LocationConstraint

err = ObjectList(sess, buck)
objectArray, totalSize, err := ObjectList(svc, *bucket.Name)

if err != nil {
return
}

buck.Objects = objectArray
buck.NumberOfObjects = len(objectArray)
buck.TotalSize = totalSize

bucketArray[buckIdx] = buck

tags, err := svc.GetBucketTagging(&s3.GetBucketTaggingInput{Bucket: bucket.Name})

if err == nil {
if err == nil || strings.Contains(err.Error(), "NoSuchTagSet") {
tagset := map[string]string{}
for _, tag := range tags.TagSet {
tagset[*tag.Key] = *tag.Value
}
buck.BucketTags = tagset
} else if !strings.Contains(err.Error(), "NoSuchTagSet") {
} else {
log.Errorf("unable to get bucket tags. failed with error: %v", err)
}

Expand All @@ -186,8 +188,7 @@ func GetBucketMeta(buckIdx int, bucket *s3.Bucket, sess *session.Session, bucket
}
}

func BucketList(sess *session.Session) ([]*model.MetaBucket, error) {
svc := s3.New(sess)
func BucketList(svc s3iface.S3API, backendRegion *string) ([]*model.MetaBucket, error) {

output, err := svc.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
Expand All @@ -198,7 +199,7 @@ func BucketList(sess *session.Session) ([]*model.MetaBucket, error) {
wg := sync.WaitGroup{}
for idx, bucket := range output.Buckets {
wg.Add(1)
go GetBucketMeta(idx, bucket, sess, bucketArray, &wg)
go GetBucketMeta(idx, bucket, svc, backendRegion, bucketArray, &wg)
}
wg.Wait()

Expand All @@ -213,8 +214,9 @@ func BucketList(sess *session.Session) ([]*model.MetaBucket, error) {
}

func (ad *AwsAdapter) SyncMetadata(ctx context.Context, in *pb.SyncMetadataRequest) error {

buckArr, err := BucketList(ad.Session)
svc := s3.New(ad.Session)
backendRegion := ad.Session.Config.Region
buckArr, err := BucketList(svc, backendRegion)
if err != nil {
log.Errorf("metadata collection for backend id: %v failed with error: %v", ad.Backend.Id, err)
return err
Expand Down