From 12408c36a5f9e8b041cb7926df64e00f91bfd1ec Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Mon, 20 Nov 2023 12:54:55 -0500 Subject: [PATCH] dsort: minor ref, cleanup; docs: performance.md Signed-off-by: Alex Aizman --- docs/performance.md | 17 +++++++++++++++-- ext/dsort/manager.go | 2 +- ext/dsort/request_spec.go | 2 +- ext/dsort/shard/init.go | 26 +++++++++++++++++++++++++ ext/dsort/shard/key.go | 40 +++++++++++++++++++++++++++------------ ext/dsort/shard/recm.go | 6 +++--- ext/dsort/shard/rw.go | 2 -- ext/dsort/shard/tar.go | 13 ++++--------- ext/dsort/shard/targz.go | 2 +- ext/dsort/shard/tarlz4.go | 2 +- ext/dsort/shard/zip.go | 4 ++-- tools/tarch/archive.go | 5 ++++- 12 files changed, 86 insertions(+), 35 deletions(-) create mode 100644 ext/dsort/shard/init.go diff --git a/docs/performance.md b/docs/performance.md index 6ed63b096d8..0b42b6ed182 100644 --- a/docs/performance.md +++ b/docs/performance.md @@ -90,11 +90,24 @@ Once the packages are installed (the step that will depend on your Linux distrib ## Network +AIStore supports 3 (three) logical networks: + +* public (default port `51081`) +* intra-cluster control (`51082`), and +* intra-cluster data (`51083`) + +Ideally, all 3 are provisioned (physically) separately - to reduce contention, avoid HoL, and ultimately optimize intra-cluster performance. + +Separately, and in addition: + * MTU should be set to `9000` (Jumbo frames) - this is one of the most important configurations * Optimize TCP send buffer sizes on the target side (`net.core.rmem_max`, `net.ipv4.tcp_rmem`) * Optimize TCP receive buffer on the client (reading) side (`net.core.wmem_max`, `net.ipv4.tcp_wmem`) -* `net.ipv4.tcp_mtu_probing = 2` # especially important in communication between client <-> proxy or client <-> target and if client has `mtu` set > 1500 -* Wait.. there is more: [all ip-sysctl configurations](https://wiki.linuxfoundation.org/networking/ip-sysctl) +* Set `net.ipv4.tcp_mtu_probing = 2` + +> The last setting is especially important when client's MTU is greater than 1500. + +> The list of tunables (above) cannot be considered _exhaustive_. Optimal (high-performance) choices always depend on the hardware, Linux kernel, and a variety of factors outside the scope. ## Smoke test diff --git a/ext/dsort/manager.go b/ext/dsort/manager.go index 7c8f3e8f4ce..7a54702e004 100644 --- a/ext/dsort/manager.go +++ b/ext/dsort/manager.go @@ -146,7 +146,7 @@ func Tinit(t cluster.Target, stats stats.Tracker, db kvdb.Driver, config *cmn.Co g.t = t g.tstats = stats - shard.T = t + shard.Init(t) fs.CSM.Reg(ct.DsortFileType, &ct.DsortFile{}) fs.CSM.Reg(ct.DsortWorkfileType, &ct.DsortFile{}) diff --git a/ext/dsort/request_spec.go b/ext/dsort/request_spec.go index 252e56463e3..3398dbd1f70 100644 --- a/ext/dsort/request_spec.go +++ b/ext/dsort/request_spec.go @@ -256,7 +256,7 @@ func parseAlgorithm(alg Algorithm) (*Algorithm, error) { if alg.Ext == "" || alg.Ext[0] != '.' { return nil, fmt.Errorf("%w %q", errAlgExt, alg.Ext) } - if err := shard.ValidateContentKeyT(alg.ContentKeyType); err != nil { + if err := shard.ValidateContentKeyTy(alg.ContentKeyType); err != nil { return nil, err } } else { diff --git a/ext/dsort/shard/init.go b/ext/dsort/shard/init.go new file mode 100644 index 00000000000..aa3f047e23b --- /dev/null +++ b/ext/dsort/shard/init.go @@ -0,0 +1,26 @@ +// Package shard provides Extract(shard), Create(shard), and associated methods +// across all suppported archival formats (see cmn/archive/mime.go) +/* + * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + */ +package shard + +import ( + "github.com/NVIDIA/aistore/cluster" + "github.com/NVIDIA/aistore/cmn/archive" +) + +type global struct { + t cluster.Target +} + +var ( + g global + + // padding buffer (zero-filled) + padBuf [archive.TarBlockSize]byte +) + +func Init(t cluster.Target) { + g.t = t +} diff --git a/ext/dsort/shard/key.go b/ext/dsort/shard/key.go index aa263aa22b4..02dd65886aa 100644 --- a/ext/dsort/shard/key.go +++ b/ext/dsort/shard/key.go @@ -21,12 +21,6 @@ const ( ContentKeyInt = "int" ContentKeyFloat = "float" ContentKeyString = "string" - - fmtErrInvalidSortingKeyType = "invalid content sorting key %q (expecting one of: %+v)" -) - -var ( - contentKeyTypes = []string{ContentKeyInt, ContentKeyFloat, ContentKeyString} ) type ( @@ -51,8 +45,16 @@ type ( ty string // one of contentKeyTypes: {"int", "string", ... } - see above ext string // file with this extension provides sorting key (of the type `ty`) } + + ErrSortingKeyType struct { + ty string + } ) +///////////////////// +// md5KeyExtractor // +///////////////////// + func NewMD5KeyExtractor() (KeyExtractor, error) { return &md5KeyExtractor{h: md5.New()}, nil } @@ -67,6 +69,10 @@ func (*md5KeyExtractor) PrepareExtractor(name string, r cos.ReadSizer, _ string) return r, &SingleKeyExtractor{name: name}, false } +////////////////////// +// nameKeyExtractor // +////////////////////// + func NewNameKeyExtractor() (KeyExtractor, error) { return &nameKeyExtractor{}, nil } @@ -79,8 +85,12 @@ func (*nameKeyExtractor) ExtractKey(ske *SingleKeyExtractor) (any, error) { return ske.name, nil } +///////////////////////// +// contentKeyExtractor // +///////////////////////// + func NewContentKeyExtractor(ty, ext string) (KeyExtractor, error) { - if err := ValidateContentKeyT(ty); err != nil { + if err := ValidateContentKeyTy(ty); err != nil { return nil, err } return &contentKeyExtractor{ty: ty, ext: ext}, nil @@ -113,13 +123,19 @@ func (ke *contentKeyExtractor) ExtractKey(ske *SingleKeyExtractor) (any, error) case ContentKeyString: return key, nil default: - return nil, fmt.Errorf(fmtErrInvalidSortingKeyType, ke.ty, contentKeyTypes) + return nil, &ErrSortingKeyType{ke.ty} } } -func ValidateContentKeyT(ty string) error { - if !cos.StringInSlice(ty, contentKeyTypes) { - return fmt.Errorf(fmtErrInvalidSortingKeyType, ty, contentKeyTypes) +func ValidateContentKeyTy(ty string) error { + switch ty { + case ContentKeyInt, ContentKeyFloat, ContentKeyString: + return nil + default: + return &ErrSortingKeyType{ty} } - return nil +} + +func (e *ErrSortingKeyType) Error() string { + return fmt.Sprintf("invalid content sorting key %q, expecting one of: 'int', 'float', 'string'", e.ty) } diff --git a/ext/dsort/shard/recm.go b/ext/dsort/shard/recm.go index 3fcb317578f..21bb39cc2be 100644 --- a/ext/dsort/shard/recm.go +++ b/ext/dsort/shard/recm.go @@ -118,7 +118,7 @@ func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64, storeType = SGLStoreType contentPath, fullContentPath = recm.encodeRecordName(storeType, args.shardName, args.recordName) - sgl := T.PageMM().NewSGL(r.Size() + int64(len(args.metadata))) + sgl := g.t.PageMM().NewSGL(r.Size() + int64(len(args.metadata))) // No need for `io.CopyBuffer` since SGL implements `io.ReaderFrom`. if _, err = io.Copy(sgl, bytes.NewReader(args.metadata)); err != nil { sgl.Free() @@ -180,7 +180,7 @@ func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64, recm.Records.Insert(&Record{ Key: key, Name: recordUniqueName, - DaemonID: T.SID(), + DaemonID: g.t.SID(), Objects: []*RecordObj{{ ContentPath: contentPath, ObjectFileType: args.fileType, @@ -356,7 +356,7 @@ func (recm *RecordManager) Cleanup() { recm.contents = nil // NOTE: may call cos.FreeMemToOS - T.PageMM().FreeSpec(memsys.FreeSpec{ + g.t.PageMM().FreeSpec(memsys.FreeSpec{ Totally: true, ToOS: true, MinSize: 1, // force toGC to free all (even small) memory to system diff --git a/ext/dsort/shard/rw.go b/ext/dsort/shard/rw.go index f8ce1dc1b51..ffbddff99ef 100644 --- a/ext/dsort/shard/rw.go +++ b/ext/dsort/shard/rw.go @@ -23,8 +23,6 @@ type RW interface { } var ( - T cluster.Target - RWs = map[string]RW{ archive.ExtTar: &tarRW{archive.ExtTar}, archive.ExtTgz: &tgzRW{archive.ExtTgz}, diff --git a/ext/dsort/shard/tar.go b/ext/dsort/shard/tar.go index 70bcec20cf6..674865be81b 100644 --- a/ext/dsort/shard/tar.go +++ b/ext/dsort/shard/tar.go @@ -35,13 +35,8 @@ type ( } ) -var ( - // Predefined padding buffer (zero-initialized). - padBuf [archive.TarBlockSize]byte - - // interface guard - _ RW = (*tarRW)(nil) -) +// interface guard +var _ RW = (*tarRW)(nil) ///////////////////////// // tarRecordDataReader // @@ -49,7 +44,7 @@ var ( func newTarRecordDataReader() *tarRecordDataReader { rd := &tarRecordDataReader{} - rd.metadataBuf, rd.slab = T.ByteMM().Alloc() + rd.metadataBuf, rd.slab = g.t.ByteMM().Alloc() return rd } @@ -113,7 +108,7 @@ func (trw *tarRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record return 0, 0, err } c := &rcbCtx{parent: trw, tw: nil, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk} - buf, slab := T.PageMM().AllocSize(lom.SizeBytes()) + buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes()) c.buf = buf _, err = ar.Range("", c.xtar) diff --git a/ext/dsort/shard/targz.go b/ext/dsort/shard/targz.go index 4e45843e4e8..8bb77d2d17a 100644 --- a/ext/dsort/shard/targz.go +++ b/ext/dsort/shard/targz.go @@ -46,7 +46,7 @@ func (trw *tgzRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record c := &rcbCtx{parent: trw, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk} c.tw = tar.NewWriter(wfh) - buf, slab := T.PageMM().AllocSize(lom.SizeBytes()) + buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes()) c.buf = buf _, err = ar.Range("", c.xtar) diff --git a/ext/dsort/shard/tarlz4.go b/ext/dsort/shard/tarlz4.go index ea567f592ec..c55ec6ab86c 100644 --- a/ext/dsort/shard/tarlz4.go +++ b/ext/dsort/shard/tarlz4.go @@ -45,7 +45,7 @@ func (trw *tlz4RW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Recor c := &rcbCtx{parent: trw, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk} c.tw = tar.NewWriter(wfh) - buf, slab := T.PageMM().AllocSize(lom.SizeBytes()) + buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes()) c.buf = buf _, err = ar.Range("", c.xtar) diff --git a/ext/dsort/shard/zip.go b/ext/dsort/shard/zip.go index d9715f5ab36..7bef11d8e54 100644 --- a/ext/dsort/shard/zip.go +++ b/ext/dsort/shard/zip.go @@ -62,7 +62,7 @@ func (zrw *zipRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor Record return 0, 0, err } c := &rcbCtx{parent: zrw, extractor: extractor, shardName: lom.ObjName, toDisk: toDisk} - buf, slab := T.PageMM().AllocSize(lom.SizeBytes()) + buf, slab := g.t.PageMM().AllocSize(lom.SizeBytes()) c.buf = buf _, err = ar.Range("", c.xzip) @@ -99,7 +99,7 @@ func (*zipRW) Create(s *Shard, w io.Writer, loader ContentLoader) (written int64 func newZipRecordDataReader() *zipRecordDataReader { rd := &zipRecordDataReader{} - rd.metadataBuf, rd.slab = T.ByteMM().Alloc() + rd.metadataBuf, rd.slab = g.t.ByteMM().Alloc() return rd } diff --git a/tools/tarch/archive.go b/tools/tarch/archive.go index 6e002be81f4..5f35c90296f 100644 --- a/tools/tarch/archive.go +++ b/tools/tarch/archive.go @@ -107,6 +107,9 @@ func CreateArchCustomFilesToW(w io.Writer, tarFormat tar.Format, ext string, fil if !missingKeys || (missingKeys && rand.Intn(2) == 0) { var buf []byte // random content + if err := shard.ValidateContentKeyTy(customFileType); err != nil { + return err + } switch customFileType { case shard.ContentKeyInt: buf = []byte(strconv.Itoa(rand.Int())) @@ -115,7 +118,7 @@ func CreateArchCustomFilesToW(w io.Writer, tarFormat tar.Format, ext string, fil case shard.ContentKeyFloat: buf = []byte(fmt.Sprintf("%d.%d", rand.Int(), rand.Int())) default: - return fmt.Errorf("invalid custom file type: %q", customFileType) + debug.Assert(false, customFileType) // validated above } if err := addBufferToArch(aw, fileName+customFileExt, len(buf), buf); err != nil { return err