From 81094664a9f81fc5867543dbb5b687a104c4db38 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Wed, 22 Jan 2025 14:40:54 -0500 Subject: [PATCH] add page iterator (LPI) for local pagination * part four, prev. commit: 693ff5c2c7798 Signed-off-by: Alex Aizman --- fs/fs.go | 7 ++++ fs/lpi/allmps.go | 98 +++++++++++++++++++++++++++++++++++++++++++ fs/lpi/bench_test.go | 83 ++++++++++++++++++++++++++++++++++++ fs/lpi/lpi.go | 1 + fs/lpi/map_test.go | 43 ------------------- fs/lpi/pool.go | 22 ++++++++++ fs/lpi_test.go | 2 +- xact/xs/lso.go | 19 +++++++-- xact/xs/nextpage.go | 5 ++- xact/xs/nsumm.go | 1 + xact/xs/wanted_lso.go | 2 +- 11 files changed, 232 insertions(+), 51 deletions(-) create mode 100644 fs/lpi/allmps.go create mode 100644 fs/lpi/bench_test.go delete mode 100644 fs/lpi/map_test.go create mode 100644 fs/lpi/pool.go diff --git a/fs/fs.go b/fs/fs.go index d5955b5e2b3..9611e836785 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -281,6 +281,13 @@ func (mi *Mountpath) MakePathCT(bck *cmn.Bck, contentType string) string { return cos.UnsafeS(buf) } +func (mi *Mountpath) MakePathPrefix(bck *cmn.Bck, contentType, prefix string) string { + if prefix == "" { + return mi.MakePathCT(bck, contentType) + } + return mi.MakePathFQN(bck, contentType, prefix) +} + func (mi *Mountpath) MakePathFQN(bck *cmn.Bck, contentType, objName string) string { debug.Assert(contentType != "") debug.Assert(objName != "") diff --git a/fs/lpi/allmps.go b/fs/lpi/allmps.go new file mode 100644 index 00000000000..4aa6cd56968 --- /dev/null +++ b/fs/lpi/allmps.go @@ -0,0 +1,98 @@ +// Package lpi: local page iterator +/* + * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + */ +package lpi + +import ( + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/cos" + "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/nlog" + "github.com/NVIDIA/aistore/fs" +) + +type ( + milpi struct { + page Page + it *Iter + mi *fs.Mountpath + } + Lpis struct { + a []milpi + bck *cmn.Bck + } +) + +func (lpis *Lpis) Init(bck *cmn.Bck, prefix string) { + var ( + avail = fs.GetAvail() + ) + { + lpis.a = make([]milpi, 0, len(avail)) + lpis.bck = bck + } + for _, mi := range avail { + it, err := New(mi.MakePathPrefix(bck, fs.ObjectType, prefix)) + debug.AssertNoErr(err) + milpi := milpi{ + page: make(Page), + it: it, + mi: mi, + } + lpis.a = append(lpis.a, milpi) + } +} + +// TODO: consider jogger-per-mountpath + +func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) { + var ( + lastName string + eop = AllPages + num = len(lastPage) // num entries + ) + if num > 0 { + lastName = lastPage[num-1].Name + } + // 1. all mountpaths: next page + for _, milpi := range lpis.a { + if milpi.it.Pos() == "" { + // iterated to the end, exhausted local content + milpi.it.Clear() + continue + } + if lastName != "" { + eop = milpi.mi.MakePathPrefix(lpis.bck, fs.ObjectType, lastName) + } + + // next local page "until" + lpiMsg := Msg{EOP: eop} + if err := milpi.it.Next(lpiMsg, milpi.page); err != nil { + if cmn.Rom.FastV(4, cos.SmoduleXs) { + nlog.Warningln(tag, err) + } + } + } + + // 2. last page as a map + lastPageMap := allocPage() + for _, en := range lastPage { + lastPageMap[en.Name] = struct{}{} + } + + // 3. find and add 'remotely-deleted' + for _, milpi := range lpis.a { + for lname := range milpi.page { + if _, ok := lastPageMap[lname]; ok { + delete(milpi.page, lname) + continue + } + en := &cmn.LsoEnt{Name: lname} + en.SetFlag(apc.EntryVerRemoved | apc.EntryIsCached) + outPage.Entries = append(outPage.Entries, en) + } + } + freePage(lastPageMap) +} diff --git a/fs/lpi/bench_test.go b/fs/lpi/bench_test.go new file mode 100644 index 00000000000..5b6782cb577 --- /dev/null +++ b/fs/lpi/bench_test.go @@ -0,0 +1,83 @@ +// Package lpi_test: local page iterator +/* + * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + */ +package lpi_test + +import ( + "strconv" + "sync" + "testing" +) + +func _clr(m map[int]struct{}) { + for k := range m { + delete(m, k) + } +} + +func BenchClear(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + m := make(map[int]struct{}) + for pb.Next() { + for i := range 1000 { + m[i] = struct{}{} + } + clear(m) + } + }) +} + +func BenchManualClear(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + m := make(map[int]struct{}) + for pb.Next() { + for i := range 1000 { + m[i] = struct{}{} + } + _clr(m) + } + }) +} + +// init +var keys []string + +func init() { + keys = make([]string, 1000) + for i := range 1000 { + keys[i] = "key-" + strconv.Itoa(i) + } +} + +var pool = sync.Pool{ + New: func() interface{} { + return make(map[string]struct{}, 1000) + }, +} + +func BenchmarkPool(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + m := pool.Get().(map[string]struct{}) + for i := range 1000 { + k := keys[i] + m[k] = struct{}{} + } + clear(m) + pool.Put(m) + } + }) +} + +func BenchmarkNoPool(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + m := make(map[string]struct{}, 1000) + for i := range 1000 { + k := keys[i] + m[k] = struct{}{} + } + } + }) +} diff --git a/fs/lpi/lpi.go b/fs/lpi/lpi.go index 7ef7a29093b..786018535c2 100644 --- a/fs/lpi/lpi.go +++ b/fs/lpi/lpi.go @@ -70,6 +70,7 @@ func New(root string) (*Iter, error) { } func (lpi *Iter) Pos() string { return lpi.next } +func (lpi *Iter) Clear() { clear(lpi.page) } func (lpi *Iter) Next(msg Msg, out Page) error { { diff --git a/fs/lpi/map_test.go b/fs/lpi/map_test.go deleted file mode 100644 index 220ff12b525..00000000000 --- a/fs/lpi/map_test.go +++ /dev/null @@ -1,43 +0,0 @@ -// Package lpi: local page iterator -/* - * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. - */ -package lpi_test - -import ( - "testing" -) - -// [TODO] consider and bench lpi.Page alternatives: -// - sorted (reusable) slice with binary search -// - what else? - -func clearMap(m map[int]struct{}) { - for k := range m { - delete(m, k) - } -} - -func BenchmarkClearFunction(b *testing.B) { - b.RunParallel(func(pb *testing.PB) { - m := make(map[int]struct{}) - for pb.Next() { - for i := range 1000 { - m[i] = struct{}{} - } - clear(m) - } - }) -} - -func BenchmarkManualClear(b *testing.B) { - b.RunParallel(func(pb *testing.PB) { - m := make(map[int]struct{}) - for pb.Next() { - for i := range 1000 { - m[i] = struct{}{} - } - clearMap(m) - } - }) -} diff --git a/fs/lpi/pool.go b/fs/lpi/pool.go new file mode 100644 index 00000000000..4514386f4e8 --- /dev/null +++ b/fs/lpi/pool.go @@ -0,0 +1,22 @@ +// Package lpi: local page iterator +/* + * Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + */ +package lpi + +import "sync" + +var pool sync.Pool + +func allocPage() Page { + v := pool.Get() + if v != nil { + return v.(Page) + } + return make(Page, 1000) +} + +func freePage(v Page) { + clear(v) + pool.Put(v) +} diff --git a/fs/lpi_test.go b/fs/lpi_test.go index b82e6a21bc4..8f37366401e 100644 --- a/fs/lpi_test.go +++ b/fs/lpi_test.go @@ -25,7 +25,7 @@ const ( lpiTestVerbose = false ) -func TestLocalPageIt(t *testing.T) { +func TestLocalPageIterator(t *testing.T) { // 1. create temp root root, err := os.MkdirTemp("", "ais-lpi-") tassert.CheckFatal(t, err) diff --git a/xact/xs/lso.go b/xact/xs/lso.go index 09e97c542b5..18637e3d692 100644 --- a/xact/xs/lso.go +++ b/xact/xs/lso.go @@ -26,6 +26,7 @@ import ( "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" "github.com/NVIDIA/aistore/fs" + "github.com/NVIDIA/aistore/fs/lpi" "github.com/NVIDIA/aistore/hk" "github.com/NVIDIA/aistore/memsys" "github.com/NVIDIA/aistore/transport" @@ -63,6 +64,7 @@ type ( streamingX lensgl int64 ctx *core.LsoInvCtx + lpis lpi.Lpis } LsoRsp struct { Err error @@ -115,7 +117,7 @@ func (p *lsoFactory) Start() error { // see also: resetIdle() r.DemandBase.Init(p.UUID(), apc.ActList, p.msg.Str(p.Bck.Cname(p.msg.Prefix)) /*ctlmsg*/, p.Bck, r.config.Timeout.MaxHostBusy.D()) - // NOTE: is set by the first message, never changes + // is set by the first message, never changes r.walk.wor = r.msg.WantOnlyRemoteProps() r.walk.this = r.msg.SID == core.T.SID() @@ -128,19 +130,24 @@ func (p *lsoFactory) Start() error { if !r.walk.wor { nt := core.T.Sowner().Get().CountActiveTs() if nt > 1 { - // NOTE streams + // streams if err := p.beginStreams(r); err != nil { return err } } } - // NOTE alternative flow _this_ target will execute: + // alternative flow _this_ target will execute: // - nextpage => // - backend.GetBucketInv() => // - while { backend.ListObjectsInv } if cos.IsParseBool(p.hdr.Get(apc.HdrInventory)) && r.walk.this { r.ctx = &core.LsoInvCtx{Name: p.hdr.Get(apc.HdrInvName), ID: p.hdr.Get(apc.HdrInvID)} } + + // engage local page iterator (lpi) + if r.msg.IsFlagSet(apc.LsVerChanged) { + r.lpis.Init(r.Bck().Bucket(), r.msg.Prefix) + } } p.xctn = r @@ -339,6 +346,11 @@ func (r *LsoXact) doPage() *LsoRsp { } } page := &cmn.LsoRes{UUID: r.msg.UUID, Entries: r.lastPage, ContinuationToken: r.nextToken} + + if r.msg.IsFlagSet(apc.LsVerChanged) { + r.lpis.Do(r.lastPage, page, r.Name()) + } + return &LsoRsp{Lst: page, Status: http.StatusOK} } @@ -394,7 +406,6 @@ func (r *LsoXact) nextPageR() (err error) { } r.wiCnt.Inc() - // TODO -- FIXME: not counting/sizing (locally) present objects that are missing (deleted?) remotely if r.walk.this { nentries := allocLsoEntries() page, err = npg.nextPageR(nentries) diff --git a/xact/xs/nextpage.go b/xact/xs/nextpage.go index ccb5f1d909d..d65ec65610a 100644 --- a/xact/xs/nextpage.go +++ b/xact/xs/nextpage.go @@ -37,7 +37,7 @@ func newNpgCtx(bck *meta.Bck, msg *apc.LsoMsg, cb lomVisitedCb, ctx *core.LsoInv ctx: ctx, } if msg.IsFlagSet(apc.LsVerChanged) { - npg.wi.custom = make(cos.StrKVs) + npg.wi.custom = make(cos.StrKVs) // TODO -- FIXME: move to parent x-lso; clear and reuse here } return } @@ -112,6 +112,7 @@ func (npg *npgCtx) nextPageR(nentries cmn.LsoEntries) (lst *cmn.LsoRes, err erro // - see also: cmn.ConcatLso func (npg *npgCtx) filterAddLmeta(lst *cmn.LsoRes) error { var ( + bck = npg.bck.Bucket() post = npg.wi.lomVisitedCb i int ) @@ -129,7 +130,7 @@ func (npg *npgCtx) filterAddLmeta(lst *cmn.LsoRes) error { } lom := core.AllocLOM(en.Name) - if err := lom.InitBck(npg.bck.Bucket()); err != nil { + if err := lom.InitBck(bck); err != nil { if cmn.IsErrBucketNought(err) { core.FreeLOM(lom) return err diff --git a/xact/xs/nsumm.go b/xact/xs/nsumm.go index ef3088f8ade..60a25b66ee0 100644 --- a/xact/xs/nsumm.go +++ b/xact/xs/nsumm.go @@ -86,6 +86,7 @@ func newSumm(p *nsummFactory) (r *XactNsumm, err error) { return nil, err } + // TODO -- FIXME: ref duplicated and slightly changed `listRemote` line; flag set vs listRemote listRemote := p.Bck.IsCloud() && !p.msg.ObjCached if listRemote { var ( diff --git a/xact/xs/wanted_lso.go b/xact/xs/wanted_lso.go index 8b7fa4e4be8..83bcf28dff5 100644 --- a/xact/xs/wanted_lso.go +++ b/xact/xs/wanted_lso.go @@ -46,7 +46,7 @@ func (wi *walkInfo) setWanted(en *cmn.LsoEnt, lom *core.LOM) { switch name { case apc.GetPropsName: case apc.GetPropsStatus: - case apc.GetPropsCached: // via obj.SetPresent() + case apc.GetPropsCached: // (apc.EntryIsCached) case apc.GetPropsSize: if en.Size > 0 && lom.Lsize() != en.Size {