Skip to content

Commit

Permalink
prefetch/copy/transform: number of concurrent workers
Browse files Browse the repository at this point in the history
* prefetch (job):
  - extend `apc.PrefetchMsg` control: add num-workers
* CLI `ais prefetch`: add '--num-workers' option
* copy-objects/transform-objects (jobs):
  - extend `apc.TCOMsg`: add num-workers
* amend & revise common list-range iterator (lrit)
* with refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 13, 2024
1 parent 8aa8326 commit a5a3024
Show file tree
Hide file tree
Showing 26 changed files with 180 additions and 113 deletions.
2 changes: 1 addition & 1 deletion ais/plstcx.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type (
cnt int
lsmsg apc.LsoMsg
altmsg apc.ActMsg
tcomsg cmn.TCObjsMsg
tcomsg cmn.TCOMsg
stopped atomic.Bool
}
)
Expand Down
2 changes: 1 addition & 1 deletion ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,7 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg
}
case apc.ActCopyObjects, apc.ActETLObjects:
var (
tcomsg = &cmn.TCObjsMsg{}
tcomsg = &cmn.TCOMsg{}
bckTo *meta.Bck
ecode int
eq bool
Expand Down
2 changes: 1 addition & 1 deletion ais/prxtxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (p *proxy) tcb(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg, dryRun bool) (xid
}

// transform or copy a list or a range of objects
func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, config *cmn.Config, msg *apc.ActMsg, tcomsg *cmn.TCObjsMsg) (string, error) {
func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, config *cmn.Config, msg *apc.ActMsg, tcomsg *cmn.TCOMsg) (string, error) {
// 1. prep
var (
_, existsTo = p.owner.bmd.get().Get(bckTo) // cleanup on fail: destroy if created
Expand Down
6 changes: 3 additions & 3 deletions ais/test/cp_multiobj_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestCopyMultiObjSimple(t *testing.T) {
template := "test/a-" + fmt.Sprintf("{%04d..%04d}", rangeStart, rangeStart+copyCnt-1)
tlog.Logf("[%s] %s => %s\n", template, bckFrom.Cname(""), bckTo.Cname(""))

msg := cmn.TCObjsMsg{ToBck: bckTo}
msg := cmn.TCOMsg{ToBck: bckTo}
msg.Template = template
xid, err = api.CopyMultiObj(baseParams, bckFrom, &msg)
tassert.CheckFatal(t, err)
Expand Down Expand Up @@ -190,7 +190,7 @@ func testCopyMobj(t *testing.T, bck *meta.Bck) {
var (
err error
xid string
msg = cmn.TCObjsMsg{ToBck: bckTo}
msg = cmn.TCOMsg{ToBck: bckTo}
)
msg.ObjNames = list
if m.bck.IsRemote() && test.evictRemoteSrc {
Expand Down Expand Up @@ -224,7 +224,7 @@ func testCopyMobj(t *testing.T, bck *meta.Bck) {
err error
xid string
template = fmt.Sprintf(fmtRange, m.prefix, start, start+numToCopy-1)
msg = cmn.TCObjsMsg{ToBck: bckTo}
msg = cmn.TCOMsg{ToBck: bckTo}
)
msg.Template = template
if m.bck.IsRemote() && test.evictRemoteSrc {
Expand Down
2 changes: 1 addition & 1 deletion ais/test/etl_cp_multiobj_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func testETLMultiObj(t *testing.T, etlName string, bckFrom, bckTo cmn.Bck, fileR
objList = pt.ToSlice()
objCnt = len(objList)
requestTimeout = 30 * time.Second
tcomsg = cmn.TCObjsMsg{ToBck: bckTo}
tcomsg = cmn.TCOMsg{ToBck: bckTo}
)
tcomsg.Transform.Name = etlName
tcomsg.Transform.Timeout = cos.Duration(requestTimeout)
Expand Down
4 changes: 2 additions & 2 deletions ais/tgttxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (t *target) txnHandler(w http.ResponseWriter, r *http.Request) {
case apc.ActCopyObjects, apc.ActETLObjects:
var (
dp core.DP
tcomsg = &cmn.TCObjsMsg{}
tcomsg = &cmn.TCOMsg{}
)
if err := cos.MorphMarshal(c.msg.Value, tcomsg); err != nil {
t.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, t.si, msg.Action, c.msg.Value, err)
Expand Down Expand Up @@ -632,7 +632,7 @@ func (t *target) _tcbBegin(c *txnSrv, msg *apc.TCBMsg, dp core.DP) (err error) {
// Two IDs:
// - TxnUUID: transaction (txn) ID
// - xid: xaction ID (will have "tco-" prefix)
func (t *target) tcobjs(c *txnSrv, msg *cmn.TCObjsMsg, dp core.DP) (xid string, _ error) {
func (t *target) tcobjs(c *txnSrv, msg *cmn.TCOMsg, dp core.DP) (xid string, _ error) {
switch c.phase {
case apc.ActBegin:
var (
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtxact.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (t *target) httpxpost(w http.ResponseWriter, r *http.Request) {
err error
xctn core.Xact
amsg *apc.ActMsg
tcomsg cmn.TCObjsMsg
tcomsg cmn.TCOMsg
)
if amsg, err = t.readActionMsg(w, r); err != nil {
return
Expand Down
4 changes: 2 additions & 2 deletions ais/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type (
}
txnTCObjs struct {
xtco *xs.XactTCObjs
msg *cmn.TCObjsMsg
msg *cmn.TCOMsg
txnBckBase
}
txnECEncode struct {
Expand Down Expand Up @@ -581,7 +581,7 @@ func (txn *txnTCB) String() string {
// txnTCObjs //
///////////////

func newTxnTCObjs(c *txnSrv, bckFrom *meta.Bck, xtco *xs.XactTCObjs, msg *cmn.TCObjsMsg) (txn *txnTCObjs) {
func newTxnTCObjs(c *txnSrv, bckFrom *meta.Bck, xtco *xs.XactTCObjs, msg *cmn.TCOMsg) (txn *txnTCObjs) {
txn = &txnTCObjs{xtco: xtco, msg: msg}
txn.init(bckFrom)
txn.fillFromCtx(c)
Expand Down
10 changes: 6 additions & 4 deletions api/apc/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
const _bldl = "blob-downloader"

type BlobMsg struct {
ChunkSize int64 `json:"chunk-size"`
FullSize int64 `json:"full-size"`
NumWorkers int `json:"num-workers"`
LatestVer bool `json:"latest-ver"`
ChunkSize int64 `json:"chunk-size"` // as in: chunk size
FullSize int64 `json:"full-size"` // user-specified (full) size of the object to download
NumWorkers int `json:"num-workers"` // number of concurrent downloading workers (readers); `dfltNumWorkers` when zero
LatestVer bool `json:"latest-ver"` // when true and in-cluster: check with remote whether (deleted | version-changed)
}

// in re LatestVer, see also: `QparamLatestVer`, 'versioning.validate_warm_get'

// using textproto.CanonicalMIMEHeaderKey() to check presence -
// if a given key is present and is an empty string, it's an error
func (msg *BlobMsg) FromHeader(hdr http.Header) error {
Expand Down
78 changes: 42 additions & 36 deletions api/apc/multiobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,56 @@
*/
package apc

// (common for all multi-object operations)
type (
// List of object names _or_ a template specifying { optional Prefix, zero or more Ranges }
ListRange struct {
Template string `json:"template"`
ObjNames []string `json:"objnames"`
}
PrefetchMsg struct {
ListRange
BlobThreshold int64 `json:"blob-threshold"`
ContinueOnError bool `json:"coer"`
LatestVer bool `json:"latest-ver"` // see also: QparamLatestVer, 'versioning.validate_warm_get'
}

// ArchiveMsg contains the parameters (all except the destination bucket)
// for archiving mutiple objects as one of the supported archive.FileExtensions types
// at the specified (bucket) destination.
// See also: api.PutApndArchArgs
// -------------------- terminology ---------------------
// here and elsewhere "archive" is any (.tar, .tgz/.tar.gz, .zip, .tar.lz4) formatted object.
ArchiveMsg struct {
TxnUUID string `json:"-"` // internal use
FromBckName string `json:"-"` // ditto
ArchName string `json:"archname"` // one of the archive.FileExtensions
Mime string `json:"mime"` // user-specified mime type (NOTE: takes precedence if defined)
ListRange
BaseNameOnly bool `json:"bnonly"` // only extract the base name of objects as names of archived objects
InclSrcBname bool `json:"isbn"` // include source bucket name into the names of archived objects
AppendIfExists bool `json:"aate"` // adding a list or a range of objects to an existing archive
ContinueOnError bool `json:"coer"` // on err, keep running arc xaction in a any given multi-object transaction
}
// Multi-object copy & transform (see also: TCBMsg)
TCObjsMsg struct {
ListRange
TxnUUID string // (plstcx client, internal use)
TCBMsg
ContinueOnError bool `json:"coer"`
}
)

///////////////
// ListRange //
///////////////

// empty `ListRange{}` implies operating on an entire bucket ("all objects in the source bucket")
// [NOTE]
// - empty `ListRange{}` implies operating on an entire bucket ("all objects in the source bucket")
// - in re `LatestVer`, see related: `QparamLatestVer`, 'versioning.validate_warm_get'

func (lrm *ListRange) IsList() bool { return len(lrm.ObjNames) > 0 }
func (lrm *ListRange) HasTemplate() bool { return lrm.Template != "" }

// prefetch
type PrefetchMsg struct {
ListRange
BlobThreshold int64 `json:"blob-threshold"` // when greater than threshold prefetch using blob-downloader; otherwise cold GET
NumWorkers int `json:"num-workers"` // number of concurrent workers; 0 - number of mountpaths (default); (-1) none
ContinueOnError bool `json:"coer"` // ignore non-critical errors, keep going
LatestVer bool `json:"latest-ver"` // when true & in-cluster: check with remote whether (deleted | version-changed)
}

// ArchiveMsg contains the parameters (all except the destination bucket)
// for archiving mutiple objects as one of the supported archive.FileExtensions types
// at the specified (bucket) destination.
// See also: api.PutApndArchArgs
// -------------------- terminology ---------------------
// here and elsewhere "archive" is any (.tar, .tgz/.tar.gz, .zip, .tar.lz4) formatted object.
// [NOTE] see cmn/api for cmn.ArchiveMsg (that also contains ToBck)
type ArchiveMsg struct {
TxnUUID string `json:"-"` // internal use
FromBckName string `json:"-"` // ditto
ArchName string `json:"archname"` // one of the archive.FileExtensions
Mime string `json:"mime"` // user-specified mime type (NOTE: takes precedence if defined)
ListRange
BaseNameOnly bool `json:"bnonly"` // only extract the base name of objects as names of archived objects
InclSrcBname bool `json:"isbn"` // include source bucket name into the names of archived objects
AppendIfExists bool `json:"aate"` // adding a list or a range of objects to an existing archive
ContinueOnError bool `json:"coer"` // on err, keep running arc xaction in a any given multi-object transaction
}

// multi-object copy & transform
// [NOTE] see cmn/api for cmn.TCOMsg (that also contains ToBck); see also TCBMsg
type TCOMsg struct {
ListRange
TxnUUID string // (plstcx client, internal use)
NumWorkers int `json:"num-workers"` // number of concurrent workers; 0 - number of mountpaths (default); (-1) none
TCBMsg
ContinueOnError bool `json:"coer"`
}
4 changes: 2 additions & 2 deletions api/multiobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func ArchiveMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.ArchiveBckMsg) (st
// `fltPresence` applies exclusively to remote `bckFrom` (is ignored if the source is ais://)
// and is one of: { apc.FltExists, apc.FltPresent, ... } - for complete enum, see api/apc/query.go

func CopyMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCObjsMsg, fltPresence ...int) (xid string, err error) {
func CopyMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCOMsg, fltPresence ...int) (xid string, err error) {
bp.Method = http.MethodPost
q := bckFrom.NewQuery()
if len(fltPresence) > 0 {
Expand All @@ -43,7 +43,7 @@ func CopyMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCObjsMsg, fltPresenc
return dolr(bp, bckFrom, apc.ActCopyObjects, msg, q)
}

func ETLMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCObjsMsg, fltPresence ...int) (xid string, err error) {
func ETLMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCOMsg, fltPresence ...int) (xid string, err error) {
bp.Method = http.MethodPost
q := bckFrom.NewQuery()
if len(fltPresence) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/cli/blob_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func blobDownloadHandler(c *cli.Context) error {
return err
}
}
if flagIsSet(c, numWorkersFlag) {
msg.NumWorkers = parseIntFlag(c, numWorkersFlag)
if flagIsSet(c, numBlobWorkersFlag) {
msg.NumWorkers = parseIntFlag(c, numBlobWorkersFlag)
}
msg.LatestVer = flagIsSet(c, latestVerFlag)

Expand Down
11 changes: 10 additions & 1 deletion cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,13 +835,22 @@ var (
Usage: "utilize built-in blob-downloader (and the corresponding alternative datapath) to read very large remote objects",
}

numWorkersFlag = cli.IntFlag{
// num-workers
numBlobWorkersFlag = cli.IntFlag{
Name: "num-workers",
Usage: "number of concurrent blob-downloading workers (readers); system default when omitted or zero",
}
numListRangeWorkersFlag = cli.IntFlag{
Name: "num-workers",
Usage: "number of concurrent workers (readers); number of target mountpaths if omitted or zero;\n" +
indent4 + "\t(-1) is a special value indicating no workers (ie., single-threaded execution);\n" +
indent4 + "\tany positive value will be adjusted not to exceed the number of CPUs",
}

// validate
cksumFlag = cli.BoolFlag{Name: "checksum", Usage: "validate checksum"}

// ais put
putObjCksumText = indent4 + "\tand provide it as part of the PUT request for subsequent validation on the server side"
putObjCksumFlags = initPutObjCksumFlags()
putObjDfltCksumFlag = cli.BoolFlag{
Expand Down
10 changes: 5 additions & 5 deletions cmd/cli/cli/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,20 +466,20 @@ func getObject(c *cli.Context, bck cmn.Bck, objName, outFile string, a qparamArc
}
hdr.Set(apc.HdrBlobChunk, parseStrFlag(c, chunkSizeFlag))
}
if flagIsSet(c, numWorkersFlag) {
nw := parseIntFlag(c, numWorkersFlag)
if flagIsSet(c, numBlobWorkersFlag) {
nw := parseIntFlag(c, numBlobWorkersFlag)
if nw <= 0 || nw > 128 {
return fmt.Errorf("invalid %s=%d: expecting (1..128) range", flprn(numWorkersFlag), nw)
return fmt.Errorf("invalid %s=%d: expecting (1..128) range", flprn(numBlobWorkersFlag), nw)
}
hdr.Set(apc.HdrBlobWorkers, strconv.Itoa(nw))
}

if !quiet {
now = mono.NanoTime()
}
} else if flagIsSet(c, chunkSizeFlag) || flagIsSet(c, numWorkersFlag) {
} else if flagIsSet(c, chunkSizeFlag) || flagIsSet(c, numBlobWorkersFlag) {
return fmt.Errorf("command line options (%s, %s) can be used only together with %s",
qflprn(chunkSizeFlag), qflprn(numWorkersFlag), qflprn(blobDownloadFlag))
qflprn(chunkSizeFlag), qflprn(numBlobWorkersFlag), qflprn(blobDownloadFlag))
}

var getArgs api.GetArgs
Expand Down
23 changes: 12 additions & 11 deletions cmd/cli/cli/job_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
"github.com/urfave/cli"
)

const (
prefetchUsage = "prefetch one remote bucket, multiple remote buckets, or\n" +
indent1 + "selected objects in a given remote bucket or buckets, e.g.:\n" +
indent1 + "\t- 'prefetch gs://abc'\t- prefetch entire bucket (all gs://abc objects that are _not_ present in the cluster);\n" +
indent1 + "\t- 'prefetch gs:'\t- prefetch all visible/accessible GCP buckets;\n" +
indent1 + "\t- 'prefetch gs://abc --template images/'\t- prefetch all objects from the virtual subdirectory \"images\";\n" +
indent1 + "\t- 'prefetch gs://abc/images/'\t- same as above;\n" +
indent1 + "\t- 'prefetch gs://abc --template \"shard-{0000..9999}.tar.lz4\"'\t- prefetch the matching range (prefix + brace expansion);\n" +
indent1 + "\t- 'prefetch \"gs://abc/shard-{0000..9999}.tar.lz4\"'\t- same as above (notice double quotes)"
)
const prefetchUsage = "prefetch one remote bucket, multiple remote buckets, or\n" +
indent1 + "selected objects in a given remote bucket or buckets, e.g.:\n" +
indent1 + "\t- 'prefetch gs://abc'\t- prefetch entire bucket (all gs://abc objects that are _not_ in-cluster);\n" +
indent1 + "\t- 'prefetch gs://abc --num-workers 32'\t- same as above with 32 concurrent (prefetching) workers;\n" +
indent1 + "\t- 'prefetch gs:'\t- prefetch all visible/accessible GCP buckets;\n" +
indent1 + "\t- 'prefetch gs: --num-workers=48'\t- same as above employing 48 workers;\n" +
indent1 + "\t- 'prefetch gs://abc --template images/'\t- prefetch all objects from the virtual subdirectory \"images\";\n" +
indent1 + "\t- 'prefetch gs://abc/images/'\t- same as above;\n" +
indent1 + "\t- 'prefetch gs://abc --template \"shard-{0000..9999}.tar.lz4\"'\t- prefetch the matching range (prefix + brace expansion);\n" +
indent1 + "\t- 'prefetch \"gs://abc/shard-{0000..9999}.tar.lz4\"'\t- same as above (notice double quotes)"

// top-level job command
var (
Expand Down Expand Up @@ -85,13 +85,14 @@ var (
verbObjPrefixFlag, // to disambiguate bucket/prefix vs bucket/objName
latestVerFlag,
blobThresholdFlag,
numListRangeWorkersFlag,
),
cmdBlobDownload: {
refreshFlag,
progressFlag,
listFlag,
chunkSizeFlag,
numWorkersFlag,
numBlobWorkersFlag,
waitFlag,
waitJobXactFinishedFlag,
latestVerFlag,
Expand Down
17 changes: 10 additions & 7 deletions cmd/cli/cli/multiobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func runTCO(c *cli.Context, bckFrom, bckTo cmn.Bck, listObjs, tmplObjs, etlName
}

// 2. TCO message
msg := cmn.TCObjsMsg{ToBck: bckTo}
msg := cmn.TCOMsg{ToBck: bckTo}
{
msg.ListRange = lrMsg
msg.DryRun = flagIsSet(c, copyDryRunFlag)
Expand Down Expand Up @@ -444,11 +444,14 @@ func (lr *lrCtx) _do(c *cli.Context, fileList []string) (xid, kind, action strin
msg.ObjNames = fileList
msg.Template = lr.tmplObjs
msg.LatestVer = flagIsSet(c, latestVerFlag)
}
if flagIsSet(c, blobThresholdFlag) {
msg.BlobThreshold, err = parseSizeFlag(c, blobThresholdFlag)
if err != nil {
return
if flagIsSet(c, blobThresholdFlag) {
msg.BlobThreshold, err = parseSizeFlag(c, blobThresholdFlag)
if err != nil {
return
}
}
if flagIsSet(c, numListRangeWorkersFlag) {
msg.NumWorkers = parseIntFlag(c, numListRangeWorkersFlag)
}
}
xid, err = api.Prefetch(apiBP, lr.bck, msg)
Expand All @@ -462,7 +465,7 @@ func (lr *lrCtx) _do(c *cli.Context, fileList []string) (xid, kind, action strin
kind = apc.ActEvictObjects
action = "evict"
default:
debug.Assert(false, verb)
debug.Assert(false, "invalid subcommand: ", verb)
}
return xid, kind, action, err
}
2 changes: 1 addition & 1 deletion cmd/cli/cli/object_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
// blob-downloader
blobDownloadFlag,
chunkSizeFlag,
numWorkersFlag,
numBlobWorkersFlag,
// archive
archpathGetFlag,
archmimeFlag,
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.22.3

require (
github.com/NVIDIA/aistore v1.3.24-0.20240911183340-8fd68450c4bc
github.com/NVIDIA/aistore v1.3.24-0.20240913135903-a3912a7b3c07
github.com/fatih/color v1.17.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.20.0
Expand Down
Loading

0 comments on commit a5a3024

Please sign in to comment.