diff --git a/ais/plstcx.go b/ais/plstcx.go index 85c47af24e9..2c5d33095b5 100644 --- a/ais/plstcx.go +++ b/ais/plstcx.go @@ -42,7 +42,7 @@ type ( cnt int lsmsg apc.LsoMsg altmsg apc.ActMsg - tcomsg cmn.TCObjsMsg + tcomsg cmn.TCOMsg stopped atomic.Bool } ) diff --git a/ais/proxy.go b/ais/proxy.go index dcd55a48474..380de59b8ff 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -1373,7 +1373,7 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg } case apc.ActCopyObjects, apc.ActETLObjects: var ( - tcomsg = &cmn.TCObjsMsg{} + tcomsg = &cmn.TCOMsg{} bckTo *meta.Bck ecode int eq bool diff --git a/ais/prxtxn.go b/ais/prxtxn.go index 6146f3c6fa6..c038bdef9c7 100644 --- a/ais/prxtxn.go +++ b/ais/prxtxn.go @@ -646,7 +646,7 @@ func (p *proxy) tcb(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg, dryRun bool) (xid } // transform or copy a list or a range of objects -func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, config *cmn.Config, msg *apc.ActMsg, tcomsg *cmn.TCObjsMsg) (string, error) { +func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, config *cmn.Config, msg *apc.ActMsg, tcomsg *cmn.TCOMsg) (string, error) { // 1. prep var ( _, existsTo = p.owner.bmd.get().Get(bckTo) // cleanup on fail: destroy if created diff --git a/ais/test/cp_multiobj_test.go b/ais/test/cp_multiobj_test.go index 1a60bf1e14b..d7958d7379e 100644 --- a/ais/test/cp_multiobj_test.go +++ b/ais/test/cp_multiobj_test.go @@ -78,7 +78,7 @@ func TestCopyMultiObjSimple(t *testing.T) { template := "test/a-" + fmt.Sprintf("{%04d..%04d}", rangeStart, rangeStart+copyCnt-1) tlog.Logf("[%s] %s => %s\n", template, bckFrom.Cname(""), bckTo.Cname("")) - msg := cmn.TCObjsMsg{ToBck: bckTo} + msg := cmn.TCOMsg{ToBck: bckTo} msg.Template = template xid, err = api.CopyMultiObj(baseParams, bckFrom, &msg) tassert.CheckFatal(t, err) @@ -190,7 +190,7 @@ func testCopyMobj(t *testing.T, bck *meta.Bck) { var ( err error xid string - msg = cmn.TCObjsMsg{ToBck: bckTo} + msg = cmn.TCOMsg{ToBck: bckTo} ) msg.ObjNames = list if m.bck.IsRemote() && test.evictRemoteSrc { @@ -224,7 +224,7 @@ func testCopyMobj(t *testing.T, bck *meta.Bck) { err error xid string template = fmt.Sprintf(fmtRange, m.prefix, start, start+numToCopy-1) - msg = cmn.TCObjsMsg{ToBck: bckTo} + msg = cmn.TCOMsg{ToBck: bckTo} ) msg.Template = template if m.bck.IsRemote() && test.evictRemoteSrc { diff --git a/ais/test/etl_cp_multiobj_test.go b/ais/test/etl_cp_multiobj_test.go index 9c6f8e6cf3d..f8baddaa409 100644 --- a/ais/test/etl_cp_multiobj_test.go +++ b/ais/test/etl_cp_multiobj_test.go @@ -146,7 +146,7 @@ func testETLMultiObj(t *testing.T, etlName string, bckFrom, bckTo cmn.Bck, fileR objList = pt.ToSlice() objCnt = len(objList) requestTimeout = 30 * time.Second - tcomsg = cmn.TCObjsMsg{ToBck: bckTo} + tcomsg = cmn.TCOMsg{ToBck: bckTo} ) tcomsg.Transform.Name = etlName tcomsg.Transform.Timeout = cos.Duration(requestTimeout) diff --git a/ais/tgttxn.go b/ais/tgttxn.go index c15d7b4aef1..b9cd765f583 100644 --- a/ais/tgttxn.go +++ b/ais/tgttxn.go @@ -123,7 +123,7 @@ func (t *target) txnHandler(w http.ResponseWriter, r *http.Request) { case apc.ActCopyObjects, apc.ActETLObjects: var ( dp core.DP - tcomsg = &cmn.TCObjsMsg{} + tcomsg = &cmn.TCOMsg{} ) if err := cos.MorphMarshal(c.msg.Value, tcomsg); err != nil { t.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, t.si, msg.Action, c.msg.Value, err) @@ -632,7 +632,7 @@ func (t *target) _tcbBegin(c *txnSrv, msg *apc.TCBMsg, dp core.DP) (err error) { // Two IDs: // - TxnUUID: transaction (txn) ID // - xid: xaction ID (will have "tco-" prefix) -func (t *target) tcobjs(c *txnSrv, msg *cmn.TCObjsMsg, dp core.DP) (xid string, _ error) { +func (t *target) tcobjs(c *txnSrv, msg *cmn.TCOMsg, dp core.DP) (xid string, _ error) { switch c.phase { case apc.ActBegin: var ( diff --git a/ais/tgtxact.go b/ais/tgtxact.go index c274eabe970..12425550e03 100644 --- a/ais/tgtxact.go +++ b/ais/tgtxact.go @@ -268,7 +268,7 @@ func (t *target) httpxpost(w http.ResponseWriter, r *http.Request) { err error xctn core.Xact amsg *apc.ActMsg - tcomsg cmn.TCObjsMsg + tcomsg cmn.TCOMsg ) if amsg, err = t.readActionMsg(w, r); err != nil { return diff --git a/ais/txn.go b/ais/txn.go index 967644738cd..20befea68ac 100644 --- a/ais/txn.go +++ b/ais/txn.go @@ -113,7 +113,7 @@ type ( } txnTCObjs struct { xtco *xs.XactTCObjs - msg *cmn.TCObjsMsg + msg *cmn.TCOMsg txnBckBase } txnECEncode struct { @@ -581,7 +581,7 @@ func (txn *txnTCB) String() string { // txnTCObjs // /////////////// -func newTxnTCObjs(c *txnSrv, bckFrom *meta.Bck, xtco *xs.XactTCObjs, msg *cmn.TCObjsMsg) (txn *txnTCObjs) { +func newTxnTCObjs(c *txnSrv, bckFrom *meta.Bck, xtco *xs.XactTCObjs, msg *cmn.TCOMsg) (txn *txnTCObjs) { txn = &txnTCObjs{xtco: xtco, msg: msg} txn.init(bckFrom) txn.fillFromCtx(c) diff --git a/api/apc/blob.go b/api/apc/blob.go index 51826988e84..d01eb697c47 100644 --- a/api/apc/blob.go +++ b/api/apc/blob.go @@ -16,12 +16,14 @@ import ( const _bldl = "blob-downloader" type BlobMsg struct { - ChunkSize int64 `json:"chunk-size"` - FullSize int64 `json:"full-size"` - NumWorkers int `json:"num-workers"` - LatestVer bool `json:"latest-ver"` + ChunkSize int64 `json:"chunk-size"` // as in: chunk size + FullSize int64 `json:"full-size"` // user-specified (full) size of the object to download + NumWorkers int `json:"num-workers"` // number of concurrent downloading workers (readers); `dfltNumWorkers` when zero + LatestVer bool `json:"latest-ver"` // when true and in-cluster: check with remote whether (deleted | version-changed) } +// in re LatestVer, see also: `QparamLatestVer`, 'versioning.validate_warm_get' + // using textproto.CanonicalMIMEHeaderKey() to check presence - // if a given key is present and is an empty string, it's an error func (msg *BlobMsg) FromHeader(hdr http.Header) error { diff --git a/api/apc/multiobj.go b/api/apc/multiobj.go index 8b40d277732..afcd340d0fd 100644 --- a/api/apc/multiobj.go +++ b/api/apc/multiobj.go @@ -4,50 +4,56 @@ */ package apc +// (common for all multi-object operations) type ( // List of object names _or_ a template specifying { optional Prefix, zero or more Ranges } ListRange struct { Template string `json:"template"` ObjNames []string `json:"objnames"` } - PrefetchMsg struct { - ListRange - BlobThreshold int64 `json:"blob-threshold"` - ContinueOnError bool `json:"coer"` - LatestVer bool `json:"latest-ver"` // see also: QparamLatestVer, 'versioning.validate_warm_get' - } - - // ArchiveMsg contains the parameters (all except the destination bucket) - // for archiving mutiple objects as one of the supported archive.FileExtensions types - // at the specified (bucket) destination. - // See also: api.PutApndArchArgs - // -------------------- terminology --------------------- - // here and elsewhere "archive" is any (.tar, .tgz/.tar.gz, .zip, .tar.lz4) formatted object. - ArchiveMsg struct { - TxnUUID string `json:"-"` // internal use - FromBckName string `json:"-"` // ditto - ArchName string `json:"archname"` // one of the archive.FileExtensions - Mime string `json:"mime"` // user-specified mime type (NOTE: takes precedence if defined) - ListRange - BaseNameOnly bool `json:"bnonly"` // only extract the base name of objects as names of archived objects - InclSrcBname bool `json:"isbn"` // include source bucket name into the names of archived objects - AppendIfExists bool `json:"aate"` // adding a list or a range of objects to an existing archive - ContinueOnError bool `json:"coer"` // on err, keep running arc xaction in a any given multi-object transaction - } - // Multi-object copy & transform (see also: TCBMsg) - TCObjsMsg struct { - ListRange - TxnUUID string // (plstcx client, internal use) - TCBMsg - ContinueOnError bool `json:"coer"` - } ) -/////////////// -// ListRange // -/////////////// - -// empty `ListRange{}` implies operating on an entire bucket ("all objects in the source bucket") +// [NOTE] +// - empty `ListRange{}` implies operating on an entire bucket ("all objects in the source bucket") +// - in re `LatestVer`, see related: `QparamLatestVer`, 'versioning.validate_warm_get' func (lrm *ListRange) IsList() bool { return len(lrm.ObjNames) > 0 } func (lrm *ListRange) HasTemplate() bool { return lrm.Template != "" } + +// prefetch +type PrefetchMsg struct { + ListRange + BlobThreshold int64 `json:"blob-threshold"` // when greater than threshold prefetch using blob-downloader; otherwise cold GET + NumWorkers int `json:"num-workers"` // number of concurrent workers; 0 - number of mountpaths (default); (-1) none + ContinueOnError bool `json:"coer"` // ignore non-critical errors, keep going + LatestVer bool `json:"latest-ver"` // when true & in-cluster: check with remote whether (deleted | version-changed) +} + +// ArchiveMsg contains the parameters (all except the destination bucket) +// for archiving mutiple objects as one of the supported archive.FileExtensions types +// at the specified (bucket) destination. +// See also: api.PutApndArchArgs +// -------------------- terminology --------------------- +// here and elsewhere "archive" is any (.tar, .tgz/.tar.gz, .zip, .tar.lz4) formatted object. +// [NOTE] see cmn/api for cmn.ArchiveMsg (that also contains ToBck) +type ArchiveMsg struct { + TxnUUID string `json:"-"` // internal use + FromBckName string `json:"-"` // ditto + ArchName string `json:"archname"` // one of the archive.FileExtensions + Mime string `json:"mime"` // user-specified mime type (NOTE: takes precedence if defined) + ListRange + BaseNameOnly bool `json:"bnonly"` // only extract the base name of objects as names of archived objects + InclSrcBname bool `json:"isbn"` // include source bucket name into the names of archived objects + AppendIfExists bool `json:"aate"` // adding a list or a range of objects to an existing archive + ContinueOnError bool `json:"coer"` // on err, keep running arc xaction in a any given multi-object transaction +} + +// multi-object copy & transform +// [NOTE] see cmn/api for cmn.TCOMsg (that also contains ToBck); see also TCBMsg +type TCOMsg struct { + ListRange + TxnUUID string // (plstcx client, internal use) + NumWorkers int `json:"num-workers"` // number of concurrent workers; 0 - number of mountpaths (default); (-1) none + TCBMsg + ContinueOnError bool `json:"coer"` +} diff --git a/api/multiobj.go b/api/multiobj.go index 65418f37c40..f4ee37d75a8 100644 --- a/api/multiobj.go +++ b/api/multiobj.go @@ -34,7 +34,7 @@ func ArchiveMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.ArchiveBckMsg) (st // `fltPresence` applies exclusively to remote `bckFrom` (is ignored if the source is ais://) // and is one of: { apc.FltExists, apc.FltPresent, ... } - for complete enum, see api/apc/query.go -func CopyMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCObjsMsg, fltPresence ...int) (xid string, err error) { +func CopyMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCOMsg, fltPresence ...int) (xid string, err error) { bp.Method = http.MethodPost q := bckFrom.NewQuery() if len(fltPresence) > 0 { @@ -43,7 +43,7 @@ func CopyMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCObjsMsg, fltPresenc return dolr(bp, bckFrom, apc.ActCopyObjects, msg, q) } -func ETLMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCObjsMsg, fltPresence ...int) (xid string, err error) { +func ETLMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCOMsg, fltPresence ...int) (xid string, err error) { bp.Method = http.MethodPost q := bckFrom.NewQuery() if len(fltPresence) > 0 { diff --git a/cmd/cli/cli/blob_hdlr.go b/cmd/cli/cli/blob_hdlr.go index 2edf1de42ae..bfa108f7071 100644 --- a/cmd/cli/cli/blob_hdlr.go +++ b/cmd/cli/cli/blob_hdlr.go @@ -55,8 +55,8 @@ func blobDownloadHandler(c *cli.Context) error { return err } } - if flagIsSet(c, numWorkersFlag) { - msg.NumWorkers = parseIntFlag(c, numWorkersFlag) + if flagIsSet(c, numBlobWorkersFlag) { + msg.NumWorkers = parseIntFlag(c, numBlobWorkersFlag) } msg.LatestVer = flagIsSet(c, latestVerFlag) diff --git a/cmd/cli/cli/const.go b/cmd/cli/cli/const.go index b6f857db319..3f8a9dab8b1 100644 --- a/cmd/cli/cli/const.go +++ b/cmd/cli/cli/const.go @@ -835,13 +835,22 @@ var ( Usage: "utilize built-in blob-downloader (and the corresponding alternative datapath) to read very large remote objects", } - numWorkersFlag = cli.IntFlag{ + // num-workers + numBlobWorkersFlag = cli.IntFlag{ Name: "num-workers", Usage: "number of concurrent blob-downloading workers (readers); system default when omitted or zero", } + numListRangeWorkersFlag = cli.IntFlag{ + Name: "num-workers", + Usage: "number of concurrent workers (readers); number of target mountpaths if omitted or zero;\n" + + indent4 + "\t(-1) is a special value indicating no workers (ie., single-threaded execution);\n" + + indent4 + "\tany positive value will be adjusted not to exceed the number of CPUs", + } + // validate cksumFlag = cli.BoolFlag{Name: "checksum", Usage: "validate checksum"} + // ais put putObjCksumText = indent4 + "\tand provide it as part of the PUT request for subsequent validation on the server side" putObjCksumFlags = initPutObjCksumFlags() putObjDfltCksumFlag = cli.BoolFlag{ diff --git a/cmd/cli/cli/get.go b/cmd/cli/cli/get.go index 26118c9c5a4..be03e078a37 100644 --- a/cmd/cli/cli/get.go +++ b/cmd/cli/cli/get.go @@ -466,10 +466,10 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, outFile string, a qparamArc } hdr.Set(apc.HdrBlobChunk, parseStrFlag(c, chunkSizeFlag)) } - if flagIsSet(c, numWorkersFlag) { - nw := parseIntFlag(c, numWorkersFlag) + if flagIsSet(c, numBlobWorkersFlag) { + nw := parseIntFlag(c, numBlobWorkersFlag) if nw <= 0 || nw > 128 { - return fmt.Errorf("invalid %s=%d: expecting (1..128) range", flprn(numWorkersFlag), nw) + return fmt.Errorf("invalid %s=%d: expecting (1..128) range", flprn(numBlobWorkersFlag), nw) } hdr.Set(apc.HdrBlobWorkers, strconv.Itoa(nw)) } @@ -477,9 +477,9 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, outFile string, a qparamArc if !quiet { now = mono.NanoTime() } - } else if flagIsSet(c, chunkSizeFlag) || flagIsSet(c, numWorkersFlag) { + } else if flagIsSet(c, chunkSizeFlag) || flagIsSet(c, numBlobWorkersFlag) { return fmt.Errorf("command line options (%s, %s) can be used only together with %s", - qflprn(chunkSizeFlag), qflprn(numWorkersFlag), qflprn(blobDownloadFlag)) + qflprn(chunkSizeFlag), qflprn(numBlobWorkersFlag), qflprn(blobDownloadFlag)) } var getArgs api.GetArgs diff --git a/cmd/cli/cli/job_hdlr.go b/cmd/cli/cli/job_hdlr.go index 785b261c655..bf8b5b90fe1 100644 --- a/cmd/cli/cli/job_hdlr.go +++ b/cmd/cli/cli/job_hdlr.go @@ -26,16 +26,16 @@ import ( "github.com/urfave/cli" ) -const ( - prefetchUsage = "prefetch one remote bucket, multiple remote buckets, or\n" + - indent1 + "selected objects in a given remote bucket or buckets, e.g.:\n" + - indent1 + "\t- 'prefetch gs://abc'\t- prefetch entire bucket (all gs://abc objects that are _not_ present in the cluster);\n" + - indent1 + "\t- 'prefetch gs:'\t- prefetch all visible/accessible GCP buckets;\n" + - indent1 + "\t- 'prefetch gs://abc --template images/'\t- prefetch all objects from the virtual subdirectory \"images\";\n" + - indent1 + "\t- 'prefetch gs://abc/images/'\t- same as above;\n" + - indent1 + "\t- 'prefetch gs://abc --template \"shard-{0000..9999}.tar.lz4\"'\t- prefetch the matching range (prefix + brace expansion);\n" + - indent1 + "\t- 'prefetch \"gs://abc/shard-{0000..9999}.tar.lz4\"'\t- same as above (notice double quotes)" -) +const prefetchUsage = "prefetch one remote bucket, multiple remote buckets, or\n" + + indent1 + "selected objects in a given remote bucket or buckets, e.g.:\n" + + indent1 + "\t- 'prefetch gs://abc'\t- prefetch entire bucket (all gs://abc objects that are _not_ in-cluster);\n" + + indent1 + "\t- 'prefetch gs://abc --num-workers 32'\t- same as above with 32 concurrent (prefetching) workers;\n" + + indent1 + "\t- 'prefetch gs:'\t- prefetch all visible/accessible GCP buckets;\n" + + indent1 + "\t- 'prefetch gs: --num-workers=48'\t- same as above employing 48 workers;\n" + + indent1 + "\t- 'prefetch gs://abc --template images/'\t- prefetch all objects from the virtual subdirectory \"images\";\n" + + indent1 + "\t- 'prefetch gs://abc/images/'\t- same as above;\n" + + indent1 + "\t- 'prefetch gs://abc --template \"shard-{0000..9999}.tar.lz4\"'\t- prefetch the matching range (prefix + brace expansion);\n" + + indent1 + "\t- 'prefetch \"gs://abc/shard-{0000..9999}.tar.lz4\"'\t- same as above (notice double quotes)" // top-level job command var ( @@ -85,13 +85,14 @@ var ( verbObjPrefixFlag, // to disambiguate bucket/prefix vs bucket/objName latestVerFlag, blobThresholdFlag, + numListRangeWorkersFlag, ), cmdBlobDownload: { refreshFlag, progressFlag, listFlag, chunkSizeFlag, - numWorkersFlag, + numBlobWorkersFlag, waitFlag, waitJobXactFinishedFlag, latestVerFlag, diff --git a/cmd/cli/cli/multiobj.go b/cmd/cli/cli/multiobj.go index 9458a99543f..ae42a67e2a3 100644 --- a/cmd/cli/cli/multiobj.go +++ b/cmd/cli/cli/multiobj.go @@ -62,7 +62,7 @@ func runTCO(c *cli.Context, bckFrom, bckTo cmn.Bck, listObjs, tmplObjs, etlName } // 2. TCO message - msg := cmn.TCObjsMsg{ToBck: bckTo} + msg := cmn.TCOMsg{ToBck: bckTo} { msg.ListRange = lrMsg msg.DryRun = flagIsSet(c, copyDryRunFlag) @@ -444,11 +444,14 @@ func (lr *lrCtx) _do(c *cli.Context, fileList []string) (xid, kind, action strin msg.ObjNames = fileList msg.Template = lr.tmplObjs msg.LatestVer = flagIsSet(c, latestVerFlag) - } - if flagIsSet(c, blobThresholdFlag) { - msg.BlobThreshold, err = parseSizeFlag(c, blobThresholdFlag) - if err != nil { - return + if flagIsSet(c, blobThresholdFlag) { + msg.BlobThreshold, err = parseSizeFlag(c, blobThresholdFlag) + if err != nil { + return + } + } + if flagIsSet(c, numListRangeWorkersFlag) { + msg.NumWorkers = parseIntFlag(c, numListRangeWorkersFlag) } } xid, err = api.Prefetch(apiBP, lr.bck, msg) @@ -462,7 +465,7 @@ func (lr *lrCtx) _do(c *cli.Context, fileList []string) (xid, kind, action strin kind = apc.ActEvictObjects action = "evict" default: - debug.Assert(false, verb) + debug.Assert(false, "invalid subcommand: ", verb) } return xid, kind, action, err } diff --git a/cmd/cli/cli/object_hdlr.go b/cmd/cli/cli/object_hdlr.go index d21334d1481..3f83d43d36b 100644 --- a/cmd/cli/cli/object_hdlr.go +++ b/cmd/cli/cli/object_hdlr.go @@ -44,7 +44,7 @@ var ( // blob-downloader blobDownloadFlag, chunkSizeFlag, - numWorkersFlag, + numBlobWorkersFlag, // archive archpathGetFlag, archmimeFlag, diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 3e366160ad4..71d3bb28d2a 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli go 1.22.3 require ( - github.com/NVIDIA/aistore v1.3.24-0.20240911183340-8fd68450c4bc + github.com/NVIDIA/aistore v1.3.24-0.20240913135903-a3912a7b3c07 github.com/fatih/color v1.17.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo/v2 v2.20.0 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index e70853da059..5d80a5815b5 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.24-0.20240911183340-8fd68450c4bc h1:EZnKo05NVLt6rDsAzCrjv/nft44uOvDb0LZ41jiqO1E= -github.com/NVIDIA/aistore v1.3.24-0.20240911183340-8fd68450c4bc/go.mod h1:si83S9r29vwIC0f0CE2Mk+25bFiaN6mmVlmuBpP4hHM= +github.com/NVIDIA/aistore v1.3.24-0.20240913135903-a3912a7b3c07 h1:cQnI9RQ+4qdN4co4Vr60WmNIPuAAede//MVtGooZVkU= +github.com/NVIDIA/aistore v1.3.24-0.20240913135903-a3912a7b3c07/go.mod h1:si83S9r29vwIC0f0CE2Mk+25bFiaN6mmVlmuBpP4hHM= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/cmn/api.go b/cmn/api.go index 6ea4ddbbe9b..2fa993e3b23 100644 --- a/cmn/api.go +++ b/cmn/api.go @@ -366,9 +366,9 @@ type ( } // Multi-object copy & transform (see also: TCBMsg) - TCObjsMsg struct { + TCOMsg struct { ToBck Bck `json:"tobck"` - apc.TCObjsMsg + apc.TCOMsg } ) diff --git a/xact/xs/archive.go b/xact/xs/archive.go index d5883889a08..a9065eb8c78 100644 --- a/xact/xs/archive.go +++ b/xact/xs/archive.go @@ -244,7 +244,10 @@ func (r *XactArch) Do(msg *cmn.ArchiveBckMsg) { r.cleanup() } var lrit = &lriterator{} - err := lrit.init(r, &msg.ListRange, r.Bck(), true) + + // lrpWorkersNone since we need a single writer to serialize adding files + // into an eventual `archlom` + err := lrit.init(r, &msg.ListRange, r.Bck(), lrpWorkersNone) if err != nil { r.Abort(err) r.DecPending() diff --git a/xact/xs/blob_download.go b/xact/xs/blob_download.go index 48f498def84..183a6ecd3bd 100644 --- a/xact/xs/blob_download.go +++ b/xact/xs/blob_download.go @@ -137,6 +137,8 @@ func RenewBlobDl(xid string, params *core.BlobParams, oa *cmn.ObjAttrs) xreg.Ren cos.ToSizeIEC(maxChunkSize, 0)) pre.chunkSize = maxChunkSize } + + // [NOTE] factor in num CPUs and num chunks to tuneup num workers (heuristic) if pre.numWorkers == 0 { pre.numWorkers = dfltNumWorkers } @@ -149,6 +151,7 @@ func RenewBlobDl(xid string, params *core.BlobParams, oa *cmn.ObjAttrs) xreg.Ren if a := cmn.MaxParallelism(); a < pre.numWorkers { pre.numWorkers = a } + return xreg.RenewBucketXact(apc.ActBlobDl, lom.Bck(), xreg.Args{UUID: xid, Custom: pre}) } diff --git a/xact/xs/evict.go b/xact/xs/evict.go index e540900d9e4..c100ac86e55 100644 --- a/xact/xs/evict.go +++ b/xact/xs/evict.go @@ -57,7 +57,7 @@ func (*evdFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) { func newEvictDelete(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.ListRange) (ed *evictDelete, err error) { ed = &evictDelete{config: cmn.GCO.Get()} - if err = ed.lriterator.init(ed, msg, bck); err != nil { + if err = ed.lriterator.init(ed, msg, bck, lrpWorkersDflt); err != nil { return nil, err } ed.InitBase(xargs.UUID, kind, bck) diff --git a/xact/xs/lrit.go b/xact/xs/lrit.go index 06e1febfdf0..124d96c9c91 100644 --- a/xact/xs/lrit.go +++ b/xact/xs/lrit.go @@ -35,6 +35,14 @@ const ( lrpPrefix ) +// when number of workers is not defined in a respective control message +// (see e.g. PrefetchMsg.NumWorkers) +// we have two special values +const ( + lrpWorkersNone = -1 // no workers at all - iterated LOMs get executed by the (iterating) goroutine + lrpWorkersDflt = 0 // num workers = number of mountpaths +) + // common for all list-range type ( // one multi-object operation work item @@ -94,9 +102,11 @@ var ( // lriterator // //////////////// -func (r *lriterator) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, blocking ...bool) error { - avail := fs.GetAvail() - l := len(avail) +func (r *lriterator) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, numWorkers int) error { + var ( + avail = fs.GetAvail() + l = len(avail) + ) if l == 0 { return cmn.ErrNoMountpaths } @@ -104,28 +114,49 @@ func (r *lriterator) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck, blocki r.msg = msg r.bck = bck - // list is the simplest and always single-threaded if msg.IsList() { r.lrp = lrpList - return nil - } - if err := r._inipr(msg); err != nil { - return err + } else { + if err := r._inipr(msg); err != nil { + return err + } } - if l == 1 { + + // run single-threaded + if numWorkers == lrpWorkersNone { return nil } - if len(blocking) > 0 && blocking[0] { - return nil + + // tuneup number of concurrent workers (heuristic) + if numWorkers == lrpWorkersDflt { + numWorkers = l + } + if a := cmn.MaxParallelism(); a > numWorkers+8 { + var bump bool + a <<= 1 + switch r.lrp { + case lrpList: + bump = len(msg.ObjNames) > a + case lrpRange: + bump = int(r.pt.Count()) > a + case lrpPrefix: + bump = true // err on that other side + } + if bump { + numWorkers += 2 + } + } else { + numWorkers = min(numWorkers, a) } - // num-workers == num-mountpaths but note: - // these are not _joggers_ - r.workers = make([]*lrworker, 0, l) - for range avail { + // but note: these are _not_ joggers + r.workers = make([]*lrworker, 0, numWorkers) + for range numWorkers { r.workers = append(r.workers, &lrworker{r}) } - r.workCh = make(chan lrpair, l) + + // work channel capacity: up to 4 pending work items per + r.workCh = make(chan lrpair, min(numWorkers<<2, 512)) return nil } @@ -158,6 +189,8 @@ pref: return nil } +func (r *lriterator) numWorkers() int { return len(r.workers) } + func (r *lriterator) run(wi lrwi, smap *meta.Smap) (err error) { for _, worker := range r.workers { r.wg.Add(1) @@ -251,7 +284,7 @@ func (r *lriterator) _prefix(wi lrwi, smap *meta.Smap) error { lst = &npg.page } if err != nil { - nlog.Errorln(core.T.String()+":", err, ecode) + nlog.Errorln(core.T.String(), "[", err, "ecode", ecode, "]") freeLsoEntries(lst.Entries) return err } diff --git a/xact/xs/prefetch.go b/xact/xs/prefetch.go index 1c6bdedbc08..175120b041c 100644 --- a/xact/xs/prefetch.go +++ b/xact/xs/prefetch.go @@ -89,7 +89,7 @@ func (*prfFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) { func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.PrefetchMsg) (r *prefetch, err error) { r = &prefetch{config: cmn.GCO.Get(), msg: msg} - err = r.lriterator.init(r, &msg.ListRange, bck) + err = r.lriterator.init(r, &msg.ListRange, bck, msg.NumWorkers) if err != nil { return nil, err } @@ -103,7 +103,14 @@ func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.Prefetch } func (r *prefetch) Run(wg *sync.WaitGroup) { + if nw := r.lriterator.numWorkers(); nw > 1 { + nlog.Infoln(r.Name(), "[", nw, "workers ]") + } else { + nlog.Infoln(r.Name()) + } + wg.Done() + err := r.lriterator.run(r, core.T.Sowner().Get()) if err != nil { r.AddErr(err, 5, cos.SmoduleXs) // duplicated? diff --git a/xact/xs/tcobjs.go b/xact/xs/tcobjs.go index db36aa37dfd..be29093d7ea 100644 --- a/xact/xs/tcobjs.go +++ b/xact/xs/tcobjs.go @@ -40,14 +40,14 @@ type ( mtx sync.RWMutex } args *xreg.TCObjsArgs - workCh chan *cmn.TCObjsMsg + workCh chan *cmn.TCOMsg chanFull atomic.Int64 streamingX owt cmn.OWT } tcowi struct { r *XactTCObjs - msg *cmn.TCObjsMsg + msg *cmn.TCOMsg // finishing refc atomic.Int32 } @@ -81,7 +81,7 @@ func (p *tcoFactory) Start() error { p.Args.UUID = PrefixTcoID + uuid // new x-tco - workCh := make(chan *cmn.TCObjsMsg, maxNumInParallel) + workCh := make(chan *cmn.TCOMsg, maxNumInParallel) r := &XactTCObjs{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, args: p.args, workCh: workCh} r.pending.m = make(map[string]*tcowi, maxNumInParallel) r.owt = cmn.OwtCopy @@ -134,7 +134,7 @@ func (r *XactTCObjs) Snap() (snap *core.Snap) { return } -func (r *XactTCObjs) Begin(msg *cmn.TCObjsMsg) { +func (r *XactTCObjs) Begin(msg *cmn.TCOMsg) { wi := &tcowi{r: r, msg: msg} r.pending.mtx.Lock() r.pending.m[msg.TxnUUID] = wi @@ -175,7 +175,7 @@ func (r *XactTCObjs) Run(wg *sync.WaitGroup) { // run var wg *sync.WaitGroup - if err = lrit.init(r, &msg.ListRange, r.Bck()); err == nil { + if err = lrit.init(r, &msg.ListRange, r.Bck(), lrpWorkersDflt); err == nil { if msg.Sync && lrit.lrp != lrpList { wg = &sync.WaitGroup{} wg.Add(1) @@ -213,7 +213,7 @@ fin: } // more work -func (r *XactTCObjs) Do(msg *cmn.TCObjsMsg) { +func (r *XactTCObjs) Do(msg *cmn.TCOMsg) { r.IncPending() r.workCh <- msg @@ -372,7 +372,7 @@ func (r *XactTCObjs) prune(lrit *lriterator, smap *meta.Smap, pt *cos.ParsedTemp var syncit lriterator debug.Assert(lrit.lrp == lrpRange) - err := syncit.init(lrit.parent, lrit.msg, rp.bckTo) + err := syncit.init(lrit.parent, lrit.msg, rp.bckTo, lrpWorkersDflt) debug.AssertNoErr(err) syncit.pt = pt syncwi := &syncwi{&rp} // reusing only prune.do (and not init/run/wait)