Skip to content

Commit

Permalink
add page iterator (LPI) for local pagination
Browse files Browse the repository at this point in the history
* part four, prev. commit: 693ff5c

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 23, 2025
1 parent 9f2ecbd commit 8109466
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 51 deletions.
7 changes: 7 additions & 0 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ func (mi *Mountpath) MakePathCT(bck *cmn.Bck, contentType string) string {
return cos.UnsafeS(buf)
}

func (mi *Mountpath) MakePathPrefix(bck *cmn.Bck, contentType, prefix string) string {
if prefix == "" {
return mi.MakePathCT(bck, contentType)
}
return mi.MakePathFQN(bck, contentType, prefix)
}

func (mi *Mountpath) MakePathFQN(bck *cmn.Bck, contentType, objName string) string {
debug.Assert(contentType != "")
debug.Assert(objName != "")
Expand Down
98 changes: 98 additions & 0 deletions fs/lpi/allmps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Package lpi: local page iterator
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package lpi

import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/fs"
)

type (
milpi struct {
page Page
it *Iter
mi *fs.Mountpath
}
Lpis struct {
a []milpi
bck *cmn.Bck
}
)

func (lpis *Lpis) Init(bck *cmn.Bck, prefix string) {
var (
avail = fs.GetAvail()
)
{
lpis.a = make([]milpi, 0, len(avail))
lpis.bck = bck
}
for _, mi := range avail {
it, err := New(mi.MakePathPrefix(bck, fs.ObjectType, prefix))
debug.AssertNoErr(err)
milpi := milpi{
page: make(Page),
it: it,
mi: mi,
}
lpis.a = append(lpis.a, milpi)
}
}

// TODO: consider jogger-per-mountpath

func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) {
var (
lastName string
eop = AllPages
num = len(lastPage) // num entries
)
if num > 0 {
lastName = lastPage[num-1].Name
}
// 1. all mountpaths: next page
for _, milpi := range lpis.a {
if milpi.it.Pos() == "" {
// iterated to the end, exhausted local content
milpi.it.Clear()
continue
}
if lastName != "" {
eop = milpi.mi.MakePathPrefix(lpis.bck, fs.ObjectType, lastName)
}

// next local page "until"
lpiMsg := Msg{EOP: eop}
if err := milpi.it.Next(lpiMsg, milpi.page); err != nil {
if cmn.Rom.FastV(4, cos.SmoduleXs) {
nlog.Warningln(tag, err)
}
}
}

// 2. last page as a map
lastPageMap := allocPage()
for _, en := range lastPage {
lastPageMap[en.Name] = struct{}{}
}

// 3. find and add 'remotely-deleted'
for _, milpi := range lpis.a {
for lname := range milpi.page {
if _, ok := lastPageMap[lname]; ok {
delete(milpi.page, lname)
continue
}
en := &cmn.LsoEnt{Name: lname}
en.SetFlag(apc.EntryVerRemoved | apc.EntryIsCached)
outPage.Entries = append(outPage.Entries, en)
}
}
freePage(lastPageMap)
}
83 changes: 83 additions & 0 deletions fs/lpi/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Package lpi_test: local page iterator
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package lpi_test

import (
"strconv"
"sync"
"testing"
)

func _clr(m map[int]struct{}) {
for k := range m {
delete(m, k)
}
}

func BenchClear(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
m := make(map[int]struct{})
for pb.Next() {
for i := range 1000 {
m[i] = struct{}{}
}
clear(m)
}
})
}

func BenchManualClear(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
m := make(map[int]struct{})
for pb.Next() {
for i := range 1000 {
m[i] = struct{}{}
}
_clr(m)
}
})
}

// init
var keys []string

func init() {
keys = make([]string, 1000)
for i := range 1000 {
keys[i] = "key-" + strconv.Itoa(i)
}
}

var pool = sync.Pool{
New: func() interface{} {
return make(map[string]struct{}, 1000)
},
}

func BenchmarkPool(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m := pool.Get().(map[string]struct{})
for i := range 1000 {
k := keys[i]
m[k] = struct{}{}
}
clear(m)
pool.Put(m)
}
})
}

func BenchmarkNoPool(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m := make(map[string]struct{}, 1000)
for i := range 1000 {
k := keys[i]
m[k] = struct{}{}
}
}
})
}
1 change: 1 addition & 0 deletions fs/lpi/lpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func New(root string) (*Iter, error) {
}

func (lpi *Iter) Pos() string { return lpi.next }
func (lpi *Iter) Clear() { clear(lpi.page) }

func (lpi *Iter) Next(msg Msg, out Page) error {
{
Expand Down
43 changes: 0 additions & 43 deletions fs/lpi/map_test.go

This file was deleted.

22 changes: 22 additions & 0 deletions fs/lpi/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Package lpi: local page iterator
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package lpi

import "sync"

var pool sync.Pool

func allocPage() Page {
v := pool.Get()
if v != nil {
return v.(Page)
}
return make(Page, 1000)
}

func freePage(v Page) {
clear(v)
pool.Put(v)
}
2 changes: 1 addition & 1 deletion fs/lpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
lpiTestVerbose = false
)

func TestLocalPageIt(t *testing.T) {
func TestLocalPageIterator(t *testing.T) {
// 1. create temp root
root, err := os.MkdirTemp("", "ais-lpi-")
tassert.CheckFatal(t, err)
Expand Down
19 changes: 15 additions & 4 deletions xact/xs/lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/fs/lpi"
"github.com/NVIDIA/aistore/hk"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/transport"
Expand Down Expand Up @@ -63,6 +64,7 @@ type (
streamingX
lensgl int64
ctx *core.LsoInvCtx
lpis lpi.Lpis
}
LsoRsp struct {
Err error
Expand Down Expand Up @@ -115,7 +117,7 @@ func (p *lsoFactory) Start() error {
// see also: resetIdle()
r.DemandBase.Init(p.UUID(), apc.ActList, p.msg.Str(p.Bck.Cname(p.msg.Prefix)) /*ctlmsg*/, p.Bck, r.config.Timeout.MaxHostBusy.D())

// NOTE: is set by the first message, never changes
// is set by the first message, never changes
r.walk.wor = r.msg.WantOnlyRemoteProps()
r.walk.this = r.msg.SID == core.T.SID()

Expand All @@ -128,19 +130,24 @@ func (p *lsoFactory) Start() error {
if !r.walk.wor {
nt := core.T.Sowner().Get().CountActiveTs()
if nt > 1 {
// NOTE streams
// streams
if err := p.beginStreams(r); err != nil {
return err
}
}
}
// NOTE alternative flow _this_ target will execute:
// alternative flow _this_ target will execute:
// - nextpage =>
// - backend.GetBucketInv() =>
// - while { backend.ListObjectsInv }
if cos.IsParseBool(p.hdr.Get(apc.HdrInventory)) && r.walk.this {
r.ctx = &core.LsoInvCtx{Name: p.hdr.Get(apc.HdrInvName), ID: p.hdr.Get(apc.HdrInvID)}
}

// engage local page iterator (lpi)
if r.msg.IsFlagSet(apc.LsVerChanged) {
r.lpis.Init(r.Bck().Bucket(), r.msg.Prefix)
}
}

p.xctn = r
Expand Down Expand Up @@ -339,6 +346,11 @@ func (r *LsoXact) doPage() *LsoRsp {
}
}
page := &cmn.LsoRes{UUID: r.msg.UUID, Entries: r.lastPage, ContinuationToken: r.nextToken}

if r.msg.IsFlagSet(apc.LsVerChanged) {
r.lpis.Do(r.lastPage, page, r.Name())
}

return &LsoRsp{Lst: page, Status: http.StatusOK}
}

Expand Down Expand Up @@ -394,7 +406,6 @@ func (r *LsoXact) nextPageR() (err error) {
}
r.wiCnt.Inc()

// TODO -- FIXME: not counting/sizing (locally) present objects that are missing (deleted?) remotely
if r.walk.this {
nentries := allocLsoEntries()
page, err = npg.nextPageR(nentries)
Expand Down
5 changes: 3 additions & 2 deletions xact/xs/nextpage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newNpgCtx(bck *meta.Bck, msg *apc.LsoMsg, cb lomVisitedCb, ctx *core.LsoInv
ctx: ctx,
}
if msg.IsFlagSet(apc.LsVerChanged) {
npg.wi.custom = make(cos.StrKVs)
npg.wi.custom = make(cos.StrKVs) // TODO -- FIXME: move to parent x-lso; clear and reuse here
}
return
}
Expand Down Expand Up @@ -112,6 +112,7 @@ func (npg *npgCtx) nextPageR(nentries cmn.LsoEntries) (lst *cmn.LsoRes, err erro
// - see also: cmn.ConcatLso
func (npg *npgCtx) filterAddLmeta(lst *cmn.LsoRes) error {
var (
bck = npg.bck.Bucket()
post = npg.wi.lomVisitedCb
i int
)
Expand All @@ -129,7 +130,7 @@ func (npg *npgCtx) filterAddLmeta(lst *cmn.LsoRes) error {
}

lom := core.AllocLOM(en.Name)
if err := lom.InitBck(npg.bck.Bucket()); err != nil {
if err := lom.InitBck(bck); err != nil {
if cmn.IsErrBucketNought(err) {
core.FreeLOM(lom)
return err
Expand Down
1 change: 1 addition & 0 deletions xact/xs/nsumm.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func newSumm(p *nsummFactory) (r *XactNsumm, err error) {
return nil, err
}

// TODO -- FIXME: ref duplicated and slightly changed `listRemote` line; flag set vs listRemote
listRemote := p.Bck.IsCloud() && !p.msg.ObjCached
if listRemote {
var (
Expand Down
2 changes: 1 addition & 1 deletion xact/xs/wanted_lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (wi *walkInfo) setWanted(en *cmn.LsoEnt, lom *core.LOM) {
switch name {
case apc.GetPropsName:
case apc.GetPropsStatus:
case apc.GetPropsCached: // via obj.SetPresent()
case apc.GetPropsCached: // (apc.EntryIsCached)

case apc.GetPropsSize:
if en.Size > 0 && lom.Lsize() != en.Size {
Expand Down

0 comments on commit 8109466

Please sign in to comment.