Skip to content

Commit

Permalink
dsort: minor ref, cleanup; docs: performance.md
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 20, 2023
1 parent 0d58f66 commit 12408c3
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 35 deletions.
17 changes: 15 additions & 2 deletions docs/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,24 @@ Once the packages are installed (the step that will depend on your Linux distrib

## Network

AIStore supports 3 (three) logical networks:

* public (default port `51081`)
* intra-cluster control (`51082`), and
* intra-cluster data (`51083`)

Ideally, all 3 are provisioned (physically) separately - to reduce contention, avoid HoL, and ultimately optimize intra-cluster performance.

Separately, and in addition:

* MTU should be set to `9000` (Jumbo frames) - this is one of the most important configurations
* Optimize TCP send buffer sizes on the target side (`net.core.rmem_max`, `net.ipv4.tcp_rmem`)
* Optimize TCP receive buffer on the client (reading) side (`net.core.wmem_max`, `net.ipv4.tcp_wmem`)
* `net.ipv4.tcp_mtu_probing = 2` # especially important in communication between client <-> proxy or client <-> target and if client has `mtu` set > 1500
* Wait.. there is more: [all ip-sysctl configurations](https://wiki.linuxfoundation.org/networking/ip-sysctl)
* Set `net.ipv4.tcp_mtu_probing = 2`

> The last setting is especially important when client's MTU is greater than 1500.
> The list of tunables (above) cannot be considered _exhaustive_. Optimal (high-performance) choices always depend on the hardware, Linux kernel, and a variety of factors outside the scope.
## Smoke test

Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func Tinit(t cluster.Target, stats stats.Tracker, db kvdb.Driver, config *cmn.Co
g.t = t
g.tstats = stats

shard.T = t
shard.Init(t)

fs.CSM.Reg(ct.DsortFileType, &ct.DsortFile{})
fs.CSM.Reg(ct.DsortWorkfileType, &ct.DsortFile{})
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/request_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func parseAlgorithm(alg Algorithm) (*Algorithm, error) {
if alg.Ext == "" || alg.Ext[0] != '.' {
return nil, fmt.Errorf("%w %q", errAlgExt, alg.Ext)
}
if err := shard.ValidateContentKeyT(alg.ContentKeyType); err != nil {
if err := shard.ValidateContentKeyTy(alg.ContentKeyType); err != nil {
return nil, err
}
} else {
Expand Down
26 changes: 26 additions & 0 deletions ext/dsort/shard/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Package shard provides Extract(shard), Create(shard), and associated methods
// across all suppported archival formats (see cmn/archive/mime.go)
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package shard

import (
"github.com/NVIDIA/aistore/cluster"
"github.com/NVIDIA/aistore/cmn/archive"
)

type global struct {
t cluster.Target
}

var (
g global

// padding buffer (zero-filled)
padBuf [archive.TarBlockSize]byte
)

func Init(t cluster.Target) {
g.t = t
}
40 changes: 28 additions & 12 deletions ext/dsort/shard/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ const (
ContentKeyInt = "int"
ContentKeyFloat = "float"
ContentKeyString = "string"

fmtErrInvalidSortingKeyType = "invalid content sorting key %q (expecting one of: %+v)"
)

var (
contentKeyTypes = []string{ContentKeyInt, ContentKeyFloat, ContentKeyString}
)

type (
Expand All @@ -51,8 +45,16 @@ type (
ty string // one of contentKeyTypes: {"int", "string", ... } - see above
ext string // file with this extension provides sorting key (of the type `ty`)
}

ErrSortingKeyType struct {
ty string
}
)

/////////////////////
// md5KeyExtractor //
/////////////////////

func NewMD5KeyExtractor() (KeyExtractor, error) {
return &md5KeyExtractor{h: md5.New()}, nil
}
Expand All @@ -67,6 +69,10 @@ func (*md5KeyExtractor) PrepareExtractor(name string, r cos.ReadSizer, _ string)
return r, &SingleKeyExtractor{name: name}, false
}

//////////////////////
// nameKeyExtractor //
//////////////////////

func NewNameKeyExtractor() (KeyExtractor, error) {
return &nameKeyExtractor{}, nil
}
Expand All @@ -79,8 +85,12 @@ func (*nameKeyExtractor) ExtractKey(ske *SingleKeyExtractor) (any, error) {
return ske.name, nil
}

/////////////////////////
// contentKeyExtractor //
/////////////////////////

func NewContentKeyExtractor(ty, ext string) (KeyExtractor, error) {
if err := ValidateContentKeyT(ty); err != nil {
if err := ValidateContentKeyTy(ty); err != nil {
return nil, err
}
return &contentKeyExtractor{ty: ty, ext: ext}, nil
Expand Down Expand Up @@ -113,13 +123,19 @@ func (ke *contentKeyExtractor) ExtractKey(ske *SingleKeyExtractor) (any, error)
case ContentKeyString:
return key, nil
default:
return nil, fmt.Errorf(fmtErrInvalidSortingKeyType, ke.ty, contentKeyTypes)
return nil, &ErrSortingKeyType{ke.ty}
}
}

func ValidateContentKeyT(ty string) error {
if !cos.StringInSlice(ty, contentKeyTypes) {
return fmt.Errorf(fmtErrInvalidSortingKeyType, ty, contentKeyTypes)
func ValidateContentKeyTy(ty string) error {
switch ty {
case ContentKeyInt, ContentKeyFloat, ContentKeyString:
return nil
default:
return &ErrSortingKeyType{ty}
}
return nil
}

func (e *ErrSortingKeyType) Error() string {
return fmt.Sprintf("invalid content sorting key %q, expecting one of: 'int', 'float', 'string'", e.ty)
}
6 changes: 3 additions & 3 deletions ext/dsort/shard/recm.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64,
storeType = SGLStoreType
contentPath, fullContentPath = recm.encodeRecordName(storeType, args.shardName, args.recordName)

sgl := T.PageMM().NewSGL(r.Size() + int64(len(args.metadata)))
sgl := g.t.PageMM().NewSGL(r.Size() + int64(len(args.metadata)))
// No need for `io.CopyBuffer` since SGL implements `io.ReaderFrom`.
if _, err = io.Copy(sgl, bytes.NewReader(args.metadata)); err != nil {
sgl.Free()
Expand Down Expand Up @@ -180,7 +180,7 @@ func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64,
recm.Records.Insert(&Record{
Key: key,
Name: recordUniqueName,
DaemonID: T.SID(),
DaemonID: g.t.SID(),
Objects: []*RecordObj{{
ContentPath: contentPath,
ObjectFileType: args.fileType,
Expand Down Expand Up @@ -356,7 +356,7 @@ func (recm *RecordManager) Cleanup() {
recm.contents = nil

// NOTE: may call cos.FreeMemToOS
T.PageMM().FreeSpec(memsys.FreeSpec{
g.t.PageMM().FreeSpec(memsys.FreeSpec{
Totally: true,
ToOS: true,
MinSize: 1, // force toGC to free all (even small) memory to system
Expand Down
2 changes: 0 additions & 2 deletions ext/dsort/shard/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type RW interface {
}

var (
T cluster.Target

RWs = map[string]RW{
archive.ExtTar: &tarRW{archive.ExtTar},
archive.ExtTgz: &tgzRW{archive.ExtTgz},
Expand Down
13 changes: 4 additions & 9 deletions ext/dsort/shard/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,16 @@ type (
}
)

var (
// Predefined padding buffer (zero-initialized).
padBuf [archive.TarBlockSize]byte

// interface guard
_ RW = (*tarRW)(nil)
)
// interface guard
var _ RW = (*tarRW)(nil)

/////////////////////////
// tarRecordDataReader //
/////////////////////////

func newTarRecordDataReader() *tarRecordDataReader {
rd := &tarRecordDataReader{}
rd.metadataBuf, rd.slab = T.ByteMM().Alloc()
rd.metadataBuf, rd.slab = g.t.ByteMM().Alloc()
return rd
}

Expand Down Expand Up @@ -113,7 +108,7 @@ func (trw *tarRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record
return 0, 0, err
}
c := &rcbCtx{parent: trw, tw: nil, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk}
buf, slab := T.PageMM().AllocSize(lom.SizeBytes())
buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes())
c.buf = buf

_, err = ar.Range("", c.xtar)
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/shard/targz.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (trw *tgzRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record

c := &rcbCtx{parent: trw, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk}
c.tw = tar.NewWriter(wfh)
buf, slab := T.PageMM().AllocSize(lom.SizeBytes())
buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes())
c.buf = buf

_, err = ar.Range("", c.xtar)
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/shard/tarlz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (trw *tlz4RW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Recor

c := &rcbCtx{parent: trw, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk}
c.tw = tar.NewWriter(wfh)
buf, slab := T.PageMM().AllocSize(lom.SizeBytes())
buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes())
c.buf = buf

_, err = ar.Range("", c.xtar)
Expand Down
4 changes: 2 additions & 2 deletions ext/dsort/shard/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (zrw *zipRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record
return 0, 0, err
}
c := &rcbCtx{parent: zrw, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk}
buf, slab := T.PageMM().AllocSize(lom.SizeBytes())
buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes())
c.buf = buf

_, err = ar.Range("", c.xzip)
Expand Down Expand Up @@ -99,7 +99,7 @@ func (*zipRW) Create(s *Shard, w io.Writer, loader ContentLoader) (written int64

func newZipRecordDataReader() *zipRecordDataReader {
rd := &zipRecordDataReader{}
rd.metadataBuf, rd.slab = T.ByteMM().Alloc()
rd.metadataBuf, rd.slab = g.t.ByteMM().Alloc()
return rd
}

Expand Down
5 changes: 4 additions & 1 deletion tools/tarch/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func CreateArchCustomFilesToW(w io.Writer, tarFormat tar.Format, ext string, fil
if !missingKeys || (missingKeys && rand.Intn(2) == 0) {
var buf []byte
// random content
if err := shard.ValidateContentKeyTy(customFileType); err != nil {
return err
}
switch customFileType {
case shard.ContentKeyInt:
buf = []byte(strconv.Itoa(rand.Int()))
Expand All @@ -115,7 +118,7 @@ func CreateArchCustomFilesToW(w io.Writer, tarFormat tar.Format, ext string, fil
case shard.ContentKeyFloat:
buf = []byte(fmt.Sprintf("%d.%d", rand.Int(), rand.Int()))
default:
return fmt.Errorf("invalid custom file type: %q", customFileType)
debug.Assert(false, customFileType) // validated above
}
if err := addBufferToArch(aw, fileName+customFileExt, len(buf), buf); err != nil {
return err
Expand Down

0 comments on commit 12408c3

Please sign in to comment.