Skip to content

Commit

Permalink
extend bucket properties: add max-page-size (new)
Browse files Browse the repository at this point in the history
* motivation: vendors providing s3-compliant storages
  may have a different maximum (and not necessarily Amazon's 1000)
* secondly, make AWS region (`extra.aws.cloud_region`) writeable
  allow to modify (NOTE: advanced usage only!)
* refactor `MaxPageSize`: move it from the backend API to bucket
* TODO: skip HEAD(bucket) when updating ExtraProps

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Feb 22, 2024
1 parent edd1ea0 commit 258564f
Show file tree
Hide file tree
Showing 25 changed files with 105 additions and 76 deletions.
5 changes: 2 additions & 3 deletions ais/backend/ais.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,7 @@ func (m *AISBackendProvider) resolve(uuid string) (*remAis, string, error) {
// BackendProvider //
/////////////////////

func (*AISBackendProvider) Provider() string { return apc.AIS }
func (*AISBackendProvider) MaxPageSize() uint { return apc.DefaultPageSizeAIS }
func (*AISBackendProvider) Provider() string { return apc.AIS }

func (*AISBackendProvider) CreateBucket(_ *meta.Bck) (errCode int, err error) {
debug.Assert(false) // Bucket creation happens only with reverse proxy to AIS cluster.
Expand Down Expand Up @@ -429,7 +428,7 @@ func (m *AISBackendProvider) ListObjects(remoteBck *meta.Bck, msg *apc.LsoMsg, l
return
}
remoteMsg := msg.Clone()
remoteMsg.PageSize = calcPageSize(remoteMsg.PageSize, m.MaxPageSize())
remoteMsg.PageSize = calcPageSize(remoteMsg.PageSize, remoteBck.MaxPageSize())

// TODO:
// Currently, not encoding xaction (aka request) `UUID` from the remote cluster
Expand Down
7 changes: 2 additions & 5 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ func NewAWS(t core.TargetPut) (core.BackendProvider, error) {

func (*awsProvider) Provider() string { return apc.AWS }

// https://docs.aws.amazon.com/cli/latest/userguide/cli-usage-pagination.html#cli-usage-pagination-serverside
func (*awsProvider) MaxPageSize() uint { return apc.DefaultPageSizeCloud }

//
// CREATE BUCKET
//
Expand Down Expand Up @@ -145,7 +142,7 @@ func (*awsProvider) HeadBucket(_ ctx, bck *meta.Bck) (bckProps cos.StrKVs, errCo
// NOTE: obtaining versioning info is extremely slow - to avoid timeouts, imposing a hard limit on the page size
const versionedPageSize = 20

func (awsp *awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult) (errCode int, err error) {
func (*awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult) (errCode int, err error) {
var (
svc *s3.Client
h = cmn.BackendHelpers.Amazon
Expand All @@ -171,7 +168,7 @@ func (awsp *awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Ls
}

versioning = bck.Props != nil && bck.Props.Versioning.Enabled && msg.WantProp(apc.GetPropsVersion)
msg.PageSize = calcPageSize(msg.PageSize, awsp.MaxPageSize())
msg.PageSize = calcPageSize(msg.PageSize, bck.MaxPageSize())
if versioning {
msg.PageSize = min(versionedPageSize, msg.PageSize)
}
Expand Down
5 changes: 1 addition & 4 deletions ais/backend/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,6 @@ func azureErrorToAISError(azureError error, bck *cmn.Bck, objName string) (int,

func (*azureProvider) Provider() string { return apc.Azure }

// ref: https://docs.microsoft.com/en-us/connectors/azureblob/#general-limits
func (*azureProvider) MaxPageSize() uint { return 5000 }

//
// CREATE BUCKET
//
Expand Down Expand Up @@ -247,7 +244,7 @@ func (ap *azureProvider) HeadBucket(ctx context.Context, bck *meta.Bck) (cos.Str
//

func (ap *azureProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult) (int, error) {
msg.PageSize = calcPageSize(msg.PageSize, ap.MaxPageSize())
msg.PageSize = calcPageSize(msg.PageSize, bck.MaxPageSize())
var (
cloudBck = bck.RemoteBck()
cntURL = ap.u + "/" + cloudBck.Name
Expand Down
2 changes: 1 addition & 1 deletion ais/backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func calcPageSize(pageSize, maxPageSize uint) uint {
if pageSize == 0 {
return maxPageSize
}
return pageSize
return min(pageSize, maxPageSize)
}

//nolint:deadcode,unused // It is used but in `*_mock.go` files.
Expand Down
7 changes: 2 additions & 5 deletions ais/backend/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ func (gcpp *gcpProvider) createClient(ctx context.Context) (*storage.Client, err

func (*gcpProvider) Provider() string { return apc.GCP }

// https://cloud.google.com/storage/docs/json_api/v1/objects/list#parameters
func (*gcpProvider) MaxPageSize() uint { return apc.DefaultPageSizeCloud }

//
// CREATE BUCKET
//
Expand Down Expand Up @@ -156,13 +153,13 @@ func (*gcpProvider) HeadBucket(ctx context.Context, bck *meta.Bck) (bckProps cos
// LIST OBJECTS
//

func (gcpp *gcpProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult) (errCode int, err error) {
func (*gcpProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoResult) (errCode int, err error) {
var (
query *storage.Query
h = cmn.BackendHelpers.Google
cloudBck = bck.RemoteBck()
)
msg.PageSize = calcPageSize(msg.PageSize, gcpp.MaxPageSize())
msg.PageSize = calcPageSize(msg.PageSize, bck.MaxPageSize())
if msg.Prefix != "" {
query = &storage.Query{Prefix: msg.Prefix}
}
Expand Down
5 changes: 2 additions & 3 deletions ais/backend/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func hdfsErrorToAISError(err error) (int, error) {
return http.StatusBadRequest, err
}

func (*hdfsProvider) Provider() string { return apc.HDFS }
func (*hdfsProvider) MaxPageSize() uint { return 10000 }
func (*hdfsProvider) Provider() string { return apc.HDFS }

//
// CREATE BUCKET
Expand Down Expand Up @@ -131,7 +130,7 @@ func (hp *hdfsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Lso
h = cmn.BackendHelpers.HDFS
idx int
)
msg.PageSize = calcPageSize(msg.PageSize, hp.MaxPageSize())
msg.PageSize = calcPageSize(msg.PageSize, bck.MaxPageSize())

err := hp.c.Walk(bck.Props.Extra.HDFS.RefDirectory, func(path string, fi os.FileInfo, err error) error {
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions ais/backend/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (hp *httpProvider) client(u string) *http.Client {
return hp.cliH
}

func (*httpProvider) Provider() string { return apc.HTTP }
func (*httpProvider) MaxPageSize() uint { return 10000 }
func (*httpProvider) Provider() string { return apc.HTTP }

// TODO: can be done
func (hp *httpProvider) CreateBucket(*meta.Bck) (int, error) {
Expand Down
4 changes: 2 additions & 2 deletions ais/backend/mock_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ var _ core.BackendProvider = (*mockBP)(nil)

func NewDummyBackend(t core.TargetPut) (core.BackendProvider, error) { return &mockBP{t: t}, nil }

func (*mockBP) Provider() string { return mock }
func (*mockBP) MaxPageSize() uint { return math.MaxUint32 }
func (*mockBP) Provider() string { return mock }
func (*mockBP) MaxPageSize(*meta.Bck) uint { return math.MaxUint32 }

func (*mockBP) CreateBucket(*meta.Bck) (int, error) {
return http.StatusBadRequest, cmn.NewErrUnsupp("create", mock+" bucket")
Expand Down
2 changes: 1 addition & 1 deletion ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2103,7 +2103,7 @@ func (p *proxy) lsObjsA(bck *meta.Bck, lsmsg *apc.LsoMsg) (allEntries *cmn.LsoRe
flags uint32
)
if lsmsg.PageSize == 0 {
lsmsg.PageSize = apc.DefaultPageSizeAIS
lsmsg.PageSize = apc.MaxPageSizeAIS
}
pageSize := lsmsg.PageSize

Expand Down
8 changes: 4 additions & 4 deletions ais/test/scripts/s3-mpt-large-files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
# Prerequisites: ########################################################################################
# - aistore cluster
# - s3cmd
# - locally accessible directory that MAY contain large files (see examples below)
# - locally accessible (source) directory that MAY contain large files (see examples below)
# - any aistore bucket
#
## Example usage:
## 1. use existing large files and run 8 iterations
## ./ais/test/scripts/ais/test/scripts/s3-mpt-large-files.sh /tmp/largefiles s3://abc 8
## ./ais/test/scripts/s3-mpt-large-files.sh /tmp/largefiles s3://abc 8
## 2. generate (the default number of) large files, run 16 iterations
## ./ais/test/scripts/ais/test/scripts/s3-mpt-large-files.sh /tmp/largefiles s3://abc 16 true
## ./ais/test/scripts/s3-mpt-large-files.sh /tmp/largefiles s3://abc 16 true
## 3. same as above w/ 10 generated large files
## ./ais/test/scripts/ais/test/scripts/s3-mpt-large-files.sh /tmp/largefiles s3://abc 16 true 10
## ./ais/test/scripts/s3-mpt-large-files.sh /tmp/largefiles s3://abc 16 true 10
# #######################################################################################################

# command line
Expand Down
10 changes: 3 additions & 7 deletions ais/tgtbck.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,12 @@ func (t *target) blist(qbck *cmn.QueryBcks, config *cmn.Config, bmd *bucketMD) (
// control/scope - via `apc.LsoMsg`
func (t *target) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bck, lsmsg *apc.LsoMsg) (ok bool) {
if !bck.IsAIS() && !lsmsg.IsFlagSet(apc.LsObjCached) {
maxRemotePageSize := t.Backend(bck).MaxPageSize()
maxRemotePageSize := bck.MaxPageSize()
if lsmsg.PageSize > maxRemotePageSize {
t.writeErrf(w, r, "page size %d exceeds the supported maximum (%d)", lsmsg.PageSize, maxRemotePageSize)
t.writeErrf(w, r, "page size %d exceeds the maximum (%d)", lsmsg.PageSize, maxRemotePageSize)
return false
}
if lsmsg.PageSize == 0 {
lsmsg.PageSize = maxRemotePageSize
}
}
debug.Assert(lsmsg.PageSize > 0 && lsmsg.PageSize < 100000)

// (advanced) user-selected target to execute remote ls
if lsmsg.SID != "" {
smap := t.owner.smap.get()
Expand Down Expand Up @@ -519,6 +514,7 @@ func (t *target) httpbckhead(w http.ResponseWriter, r *http.Request, apireq *api
}
}
// + cloud
// TODO: skip when updating cmn.ExtraProps, and in part ExtraPropsAWS.CloudRegion, ExtraPropsAWS.Endpoint, etc.
bucketProps, code, err = t.Backend(apireq.bck).HeadBucket(ctx, apireq.bck)
if err != nil {
if !inBMD {
Expand Down
9 changes: 6 additions & 3 deletions api/apc/lsmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@ const (
LsVerChanged
)

// List objects default page size
// max page sizes
// see also: bprops Extra.AWS.MaxPageSize
const (
DefaultPageSizeAIS = 10000
DefaultPageSizeCloud = 1000
MaxPageSizeAIS = 10000
MaxPageSizeAWS = 1000
MaxPageSizeGCP = 1000
MaxPageSizeAzure = 5000
)

const (
Expand Down
9 changes: 3 additions & 6 deletions bench/tools/aisloader/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,7 @@ func listObjCallback(ctx *api.LsoCounter) {

// listObjectNames returns a slice of object names of all objects that match the prefix in a bucket.
func listObjectNames(baseParams api.BaseParams, bck cmn.Bck, prefix string, cached bool) ([]string, error) {
msg := &apc.LsoMsg{Prefix: prefix, PageSize: apc.DefaultPageSizeCloud}
if bck.IsAIS() || bck.IsRemoteAIS() {
msg.PageSize = apc.DefaultPageSizeAIS
}
msg := &apc.LsoMsg{Prefix: prefix}
// if bck is remote then check for cached flag
if cached {
msg.Flags |= apc.LsObjCached
Expand Down Expand Up @@ -493,7 +490,7 @@ func initS3Svc() error {
func s3ListObjects() ([]string, error) {
// first page
params := &s3.ListObjectsV2Input{Bucket: aws.String(runParams.bck.Name)}
params.MaxKeys = aws.Int32(apc.DefaultPageSizeCloud)
params.MaxKeys = aws.Int32(apc.MaxPageSizeAWS)

prev := mono.NanoTime()
resp, err := s3svc.ListObjectsV2(context.Background(), params)
Expand All @@ -509,7 +506,7 @@ func s3ListObjects() ([]string, error) {
token = *resp.NextContinuationToken
}
if token != "" {
l = 16 * apc.DefaultPageSizeCloud
l = 16 * apc.MaxPageSizeAWS
}
names := make([]string, 0, l)
for _, object := range resp.Contents {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/auth_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func loginUserHandler(c *cli.Context) (err error) {
cluID = parseStrFlag(c, clusterTokenFlag)
)
if flagIsSet(c, expireFlag) {
expireIn = apc.Duration(parseDurationFlag(c, expireFlag))
expireIn = apc.Ptr(parseDurationFlag(c, expireFlag))
}
if cluID != "" {
if _, err := authn.GetRegisteredClusters(authParams, authn.CluACL{ID: cluID}); err != nil {
Expand Down
13 changes: 7 additions & 6 deletions cmd/cli/cli/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func createBucket(c *cli.Context, bck cmn.Bck, props *cmn.BpropsToSet, dontHeadR
}

// Destroy ais buckets
func destroyBuckets(c *cli.Context, buckets []cmn.Bck) error {
for _, bck := range buckets {
func destroyBuckets(c *cli.Context, buckets []cmn.Bck) (cmn.Bck, error) {
for i := range buckets {
bck := buckets[i]
empty, errEmp := isBucketEmpty(bck, true /*cached*/)
if errEmp == nil && !empty {
if !flagIsSet(c, yesFlag) {
Expand All @@ -65,14 +66,14 @@ func destroyBuckets(c *cli.Context, buckets []cmn.Bck) error {
if cmn.IsStatusNotFound(err) {
err := &errDoesNotExist{what: "bucket", name: bck.Cname("")}
if !flagIsSet(c, ignoreErrorFlag) {
return err
return bck, err
}
fmt.Fprintln(c.App.ErrWriter, err.Error())
fmt.Fprintln(c.App.ErrWriter, err)
continue
}
return err
return bck, err
}
return nil
return cmn.Bck{}, nil
}

// Rename ais bucket
Expand Down
17 changes: 14 additions & 3 deletions cmd/cli/cli/bucket_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var (
),
cmdSetBprops: {
forceFlag,
dontHeadRemoteFlag,
},
cmdResetBprops: {},

Expand Down Expand Up @@ -402,7 +403,14 @@ func removeBucketHandler(c *cli.Context) error {
if err != nil {
return err
}
return destroyBuckets(c, buckets)
bck, err := destroyBuckets(c, buckets)
if err == nil {
return nil
}
if herr, ok := err.(*cmn.ErrHTTP); ok && herr.TypeCode == "ErrUnsupp" {
return fmt.Errorf("%v\n(Tip: did you want to evict '%s' from aistore?)", err, bck.Cname(""))
}
return err
}

func resetPropsHandler(c *cli.Context) error {
Expand Down Expand Up @@ -462,8 +470,11 @@ func setPropsHandler(c *cli.Context) (err error) {
if err != nil {
return err
}
if currProps, err = headBucket(bck, false /* don't add */); err != nil {
return err
dontHeadRemote := flagIsSet(c, dontHeadRemoteFlag)
if !dontHeadRemote {
if currProps, err = headBucket(bck, false /* don't add */); err != nil {
return err
}
}
newProps, err := parseBpropsFromContext(c)

Expand Down
25 changes: 15 additions & 10 deletions cmd/cli/cli/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,25 +442,30 @@ func lsoErr(msg *apc.LsoMsg, err error) error {
}

func _setPage(c *cli.Context, bck cmn.Bck) (pageSize, limit int, err error) {
defaultPageSize := apc.DefaultPageSizeCloud
if bck.IsAIS() || bck.IsRemoteAIS() {
defaultPageSize = apc.DefaultPageSizeAIS
}
pageSize = parseIntFlag(c, pageSizeFlag)
if pageSize < 0 {
err = fmt.Errorf("page size (%d) cannot be negative", pageSize)
return
b := meta.CloneBck(&bck)
if flagIsSet(c, pageSizeFlag) {
pageSize = parseIntFlag(c, pageSizeFlag)
if pageSize < 0 {
err = fmt.Errorf("invalid %s: page size (%d) cannot be negative", qflprn(pageSizeFlag), pageSize)
return
}
if uint(pageSize) > b.MaxPageSize() {
err = fmt.Errorf("invalid %s: page size (%d) cannot exceed the maximum (%d)", qflprn(pageSizeFlag), pageSize, b.MaxPageSize())
return
}
}

limit = parseIntFlag(c, objLimitFlag)
if limit < 0 {
err = fmt.Errorf("max object count (%d) cannot be negative", limit)
err = fmt.Errorf("invalid %s: max number of listed objects (%d) cannot be negative", qflprn(objLimitFlag), limit)
return
}
if limit == 0 {
return
}

// when limit "wins"
if limit < pageSize || (limit < defaultPageSize && pageSize == 0) {
if limit < pageSize || (uint(limit) < b.MaxPageSize() && pageSize == 0) {
pageSize = limit
}
return
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.21

require (
github.com/NVIDIA/aistore v1.3.22-0.20240220173151-d0162a406e65
github.com/NVIDIA/aistore v1.3.22-0.20240222204139-c1151aa139d2
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20240220173151-d0162a406e65 h1:1xqubi67UKrZdyR8vBHb6fGwNH0i6nxhXZ5xpSWRH9M=
github.com/NVIDIA/aistore v1.3.22-0.20240220173151-d0162a406e65/go.mod h1:aLKufHFV2zZPVThQTMovtlYJIaU5LnVGgrJCZ2AxbWg=
github.com/NVIDIA/aistore v1.3.22-0.20240222204139-c1151aa139d2 h1:86IB9UWaefIE/1eDvzrTMbezMZ4vP9WqwPUquywzTyI=
github.com/NVIDIA/aistore v1.3.22-0.20240222204139-c1151aa139d2/go.mod h1:aLKufHFV2zZPVThQTMovtlYJIaU5LnVGgrJCZ2AxbWg=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
Loading

0 comments on commit 258564f

Please sign in to comment.