Skip to content

Commit

Permalink
core: cold GET: fast path & slow path
Browse files Browse the repository at this point in the history
* add `Disable-Fast-Cold-GET` (feature)
* compute get/put latencies unconditionally (rm "sparsing" logic)
* CLI to show more latencies; usability; bump version
* (part three)

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 1, 2023
1 parent b732d06 commit c3bfd04
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 80 deletions.
13 changes: 6 additions & 7 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/NVIDIA/aistore/cmn/feat"
"github.com/NVIDIA/aistore/cmn/fname"
"github.com/NVIDIA/aistore/cmn/kvdb"
"github.com/NVIDIA/aistore/cmn/mono"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ec"
"github.com/NVIDIA/aistore/ext/dload"
Expand Down Expand Up @@ -94,9 +95,6 @@ func (*target) interruptedRestarted() (interrupted, restarted bool) {
return
}

func sparseVerbStats(tm int64) bool { return tm&7 == 1 }
func sparseRedirStats(tm int64) bool { return tm&3 == 2 }

//
// target
//
Expand Down Expand Up @@ -696,7 +694,8 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck
goi := allocGOI()
{
goi.atime = time.Now().UnixNano()
if dpq.ptime != "" && sparseRedirStats(goi.atime) {
goi.ltime = mono.NanoTime()
if dpq.ptime != "" {
if d := ptLatency(goi.atime, dpq.ptime, r.Header.Get(apc.HdrCallerIsPrimary)); d > 0 {
t.statsT.Add(stats.GetRedirLatency, d)
}
Expand All @@ -708,10 +707,10 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck
goi.ranges = byteRanges{Range: r.Header.Get(cos.HdrRange), Size: 0}
goi.archive = archiveQuery{
filename: filename,
mime: dpq.archmime, // query.Get(apc.QparamArchmime)
mime: dpq.archmime, // apc.QparamArchmime
}
goi.isGFN = cos.IsParseBool(dpq.isGFN) // query.Get(apc.QparamIsGFNRequest)
// goi.chunked = cmn.GCO.Get().Net.HTTP.Chunked NOTE: disabled - no need
// goi.chunked = config.Net.HTTP.Chunked NOTE: disabled - no need
}
if bck.IsHTTP() {
originalURL := dpq.origURL // query.Get(apc.QparamOrigURL)
Expand Down Expand Up @@ -813,7 +812,7 @@ func (t *target) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiR
poi := allocPOI()
{
poi.atime = started
if apireq.dpq.ptime != "" && sparseRedirStats(poi.atime) {
if apireq.dpq.ptime != "" {
if d := ptLatency(poi.atime, apireq.dpq.ptime, r.Header.Get(apc.HdrCallerIsPrimary)); d > 0 {
t.statsT.Add(stats.PutRedirLatency, d)
}
Expand Down
10 changes: 6 additions & 4 deletions ais/tgtcold.go → ais/tgtfcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/NVIDIA/aistore/cluster"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/feat"
"github.com/NVIDIA/aistore/cmn/mono"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ec"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/stats"
)

const ftcg = "failed to cold-GET"
Expand Down Expand Up @@ -57,13 +60,11 @@ func (goi *getOI) coldSeek(res *cluster.GetReaderResult) error {
}
cos.Close(res.R)

if err == nil && written != res.Size && res.Size > 0 {
err = fmt.Errorf("expected: %s size %d, got %d", lom.Cname(), res.Size, written) // (unlikely)
}
if err != nil {
goi._cleanup(revert, lmfh, buf, slab, err, "(rr/wl)")
return err
}
debug.Assertf(written == res.Size, "%s: expected size=%d, got %d", lom.Cname(), res.Size, written)

// fsync, if requested
if cmn.Rom.Features().IsSet(feat.FsyncPUT) {
Expand All @@ -72,6 +73,7 @@ func (goi *getOI) coldSeek(res *cluster.GetReaderResult) error {
return err
}
}
goi.t.statsT.Add(stats.GetColdRwLatency, mono.SinceNano(goi.ltime))

// persist lom (main repl)
lom.SetSize(written)
Expand Down Expand Up @@ -149,7 +151,7 @@ func (goi *getOI) coldSeek(res *cluster.GetReaderResult) error {
}
goi.lom.Unlock(true)

goi.stats(written)
goi.stats(written) // where `GetLatency` = `GetColdRwLatency` + (transmit)
return nil
}

Expand Down
92 changes: 46 additions & 46 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type (
config *cmn.Config // (during this request)
resphdr http.Header // as implied
workFQN string // temp fqn to be renamed
atime int64 // access time
atime int64 // access time.Now()
ltime int64 // mono.NanoTime, to measure latency
size int64 // aka Content-Length
owt cmn.OWT // object write transaction enum { OwtPut, ..., OwtGet* }
restful bool // being invoked via RESTful API
Expand All @@ -70,12 +71,14 @@ type (
lom *cluster.LOM // obj
archive archiveQuery // archive query
ranges byteRanges // range read (see https://www.rfc-editor.org/rfc/rfc7233#section-2.1)
atime int64 // access time
atime int64 // access time.Now()
ltime int64 // mono.NanoTime, to measure latency
isGFN bool // is GFN
chunked bool // chunked transfer (en)coding: https://tools.ietf.org/html/rfc7230#page-36
unlocked bool // internal
verchanged bool // version changed
retry bool // once
cold bool // executed backend.Get
}

// append handle (packed)
Expand Down Expand Up @@ -161,6 +164,7 @@ func (poi *putOI) do(resphdr http.Header, r *http.Request, dpq *dpq) (int, error
}

func (poi *putOI) putObject() (errCode int, err error) {
poi.ltime = mono.NanoTime()
// PUT is a no-op if the checksums do match
if !poi.skipVC && !poi.coldGET && !poi.cksumToUse.IsEmpty() {
if poi.lom.EqCksum(poi.cksumToUse) {
Expand Down Expand Up @@ -192,11 +196,8 @@ func (poi *putOI) putObject() (errCode int, err error) {
poi.t.statsT.AddMany(
cos.NamedVal64{Name: stats.PutCount, Value: 1},
cos.NamedVal64{Name: stats.PutThroughput, Value: poi.lom.SizeBytes()},
cos.NamedVal64{Name: stats.PutLatency, Value: mono.SinceNano(poi.ltime)},
)
if sparseVerbStats(poi.atime) {
// see also: sparseRedirStats
poi.t.statsT.Add(stats.PutLatency, time.Now().UnixNano()-poi.atime)
}
// RESTful PUT response header
if poi.resphdr != nil {
cmn.ToHeader(poi.lom.ObjAttrs(), poi.resphdr)
Expand Down Expand Up @@ -598,13 +599,13 @@ do:
}
}

// cold GET: upgrade rlock => wlock, call t.Backend.GetObjReader
// cold-GET: upgrade rlock => wlock, call t.Backend.GetObjReader
if cold {
var (
res cluster.GetReaderResult
cksumConf = goi.lom.CksumConf()
loaded bool
fast bool
res cluster.GetReaderResult
ckconf = goi.lom.CksumConf()
fast = !cmn.Rom.Features().IsSet(feat.DisableFastColdGET) // can be disabled
loaded bool
)
if cs.IsNil() {
cs = fs.Cap()
Expand Down Expand Up @@ -635,14 +636,13 @@ do:
}
return res.ErrCode, res.Err
}
goi.t.statsT.AddMany(
cos.NamedVal64{Name: stats.GetColdCount, Value: 1},
cos.NamedVal64{Name: stats.GetColdSize, Value: res.Size},
)
goi.cold = true
// inc immediately with rest of the goi.stats() at the end
goi.t.statsT.Inc(stats.GetColdCount)

// fast path limitations (TODO: reduce or remove completely)
fast = goi.archive.filename == "" &&
(cksumConf.Type == cos.ChecksumNone || (!cksumConf.ValidateColdGet && !cksumConf.EnableReadRange))
// fast path limitations: read archived; compute more checksums (TODO: reduce)
fast = fast && goi.archive.filename == "" &&
(ckconf.Type == cos.ChecksumNone || (!ckconf.ValidateColdGet && !ckconf.EnableReadRange))

// fast path
if fast {
Expand All @@ -652,7 +652,7 @@ do:
}

// regular path
errCode, err = goi.coldDisk(&res)
errCode, err = goi.coldPut(&res)
if err != nil {
goi.unlocked = true
return
Expand All @@ -661,7 +661,7 @@ do:

// read locally and stream back
fin:
errCode, err = goi.finalize(cold)
errCode, err = goi.finalize()
if err == nil {
return
}
Expand Down Expand Up @@ -707,7 +707,7 @@ outer:
}

// see also: t.GetCold() and goi.coldMem()
func (goi *getOI) coldDisk(res *cluster.GetReaderResult) (int, error) {
func (goi *getOI) coldPut(res *cluster.GetReaderResult) (int, error) {
var (
t, lom = goi.t, goi.lom
poi = allocPOI()
Expand All @@ -729,9 +729,10 @@ func (goi *getOI) coldDisk(res *cluster.GetReaderResult) (int, error) {

if err != nil {
lom.Unlock(true)
nlog.Infoln("failed to cold-GET(put)", lom.Cname(), err)
nlog.Infoln(ftcg+"(put)", lom.Cname(), err)
return code, err
}
goi.t.statsT.Add(stats.GetColdRwLatency, mono.SinceNano(goi.ltime))

// load, downgrade lock, inc stats
if err = lom.Load(true /*cache it*/, true /*locked*/); err != nil {
Expand Down Expand Up @@ -965,13 +966,13 @@ func (goi *getOI) getFromNeighbor(lom *cluster.LOM, tsi *meta.Snode) bool {
return false
}

func (goi *getOI) finalize(coldGet bool) (errCode int, err error) {
func (goi *getOI) finalize() (errCode int, err error) {
var (
lmfh *os.File
hrng *htrange
fqn = goi.lom.FQN
)
if !coldGet && !goi.isGFN {
if !goi.cold && !goi.isGFN {
fqn = goi.lom.LBGet() // best-effort GET load balancing (see also mirror.findLeastUtilized())
}
lmfh, err = os.Open(fqn)
Expand Down Expand Up @@ -1002,14 +1003,14 @@ func (goi *getOI) finalize(coldGet bool) (errCode int, err error) {
goto ret
}
}
errCode, err = goi.fini(fqn, lmfh, hdr, hrng, coldGet)
errCode, err = goi.fini(fqn, lmfh, hdr, hrng)
ret:
cos.Close(lmfh)
return
}

// in particular, setup reader and writer and set headers
func (goi *getOI) fini(fqn string, lmfh *os.File, hdr http.Header, hrng *htrange, coldGet bool) (errCode int, err error) {
func (goi *getOI) fini(fqn string, lmfh *os.File, hdr http.Header, hrng *htrange) (errCode int, err error) {
var (
size int64
reader io.Reader = lmfh
Expand Down Expand Up @@ -1049,22 +1050,22 @@ func (goi *getOI) fini(fqn string, lmfh *os.File, hdr http.Header, hrng *htrange
hdr.Set(apc.HdrArchmime, mime)
hdr.Set(apc.HdrArchpath, goi.archive.filename)
case hrng != nil: // range
cksumConf := goi.lom.CksumConf()
cksumRange := cksumConf.Type != cos.ChecksumNone && cksumConf.EnableReadRange
ckconf := goi.lom.CksumConf()
cksumRange := ckconf.Type != cos.ChecksumNone && ckconf.EnableReadRange
size = hrng.Length
reader = io.NewSectionReader(lmfh, hrng.Start, hrng.Length)
if cksumRange {
var (
cksum *cos.CksumHash
sgl = goi.t.gmm.NewSGL(size)
)
_, cksum, err = cos.CopyAndChecksum(sgl /*as ReaderFrom*/, reader, nil, cksumConf.Type)
_, cksum, err = cos.CopyAndChecksum(sgl /*as ReaderFrom*/, reader, nil, ckconf.Type)
if err != nil {
sgl.Free()
return
}
hdr.Set(apc.HdrObjCksumVal, cksum.Value())
hdr.Set(apc.HdrObjCksumType, cksumConf.Type)
hdr.Set(apc.HdrObjCksumType, ckconf.Type)
reader = sgl
defer func() {
sgl.Free()
Expand All @@ -1077,12 +1078,12 @@ func (goi *getOI) fini(fqn string, lmfh *os.File, hdr http.Header, hrng *htrange
hdr.Set(cos.HdrContentLength, strconv.FormatInt(size, 10))
hdr.Set(cos.HdrContentType, cos.ContentBinary)
buf, slab := goi.t.gmm.AllocSize(size)
err = goi.transmit(reader, buf, fqn, coldGet)
err = goi.transmit(reader, buf, fqn)
slab.Free(buf)
return
}

func (goi *getOI) transmit(r io.Reader, buf []byte, fqn string, coldGet bool) error {
func (goi *getOI) transmit(r io.Reader, buf []byte, fqn string) error {
// NOTE: hide `ReadFrom` of the `http.ResponseWriter`
// (in re: sendfile; see also cos.WriterOnly comment)
w := cos.WriterOnly{Writer: io.Writer(goi.w)}
Expand All @@ -1096,20 +1097,18 @@ func (goi *getOI) transmit(r io.Reader, buf []byte, fqn string, coldGet bool) er
// return special code to indicate just that
return errSendingResp
}
// GFN: atime must be already set
if !coldGet && !goi.isGFN {
if err := goi.lom.Load(false /*cache it*/, true /*locked*/); err != nil {
nlog.Errorf("%s: GET post-transmission failure: %v", goi.t, err)
return errSendingResp
}
goi.lom.SetAtimeUnix(goi.atime)
goi.lom.Recache() // GFN and cold GETs have already done this
}
// Update objects which were sent during GFN. Thanks to this we will not
// have to resend them in rebalance. In case of a race between rebalance
// and GFN, the former wins and it will result in double send.
if goi.isGFN {
goi.t.reb.FilterAdd([]byte(goi.lom.Uname()))
} else if !goi.cold { // GFN & cold-GET: must be already loaded w/ atime set
if err := goi.lom.Load(false /*cache it*/, true /*locked*/); err != nil {
nlog.Errorf("%s: GET post-transmission failure: %v", goi.t, err)
return errSendingResp
}
goi.lom.SetAtimeUnix(goi.atime)
goi.lom.Recache()
}
//
// stats
Expand All @@ -1121,11 +1120,12 @@ func (goi *getOI) transmit(r io.Reader, buf []byte, fqn string, coldGet bool) er
func (goi *getOI) stats(written int64) {
goi.t.statsT.AddMany(
cos.NamedVal64{Name: stats.GetCount, Value: 1},
cos.NamedVal64{Name: stats.GetThroughput, Value: written},
cos.NamedVal64{Name: stats.GetThroughput, Value: written}, // vis-à-vis user (as written m.b. range)
cos.NamedVal64{Name: stats.GetLatency, Value: mono.SinceNano(goi.ltime)}, // see also: stats.GetColdRwLatency
)
if sparseVerbStats(goi.atime) {
// see also: sparseRedirStats
goi.t.statsT.Add(stats.GetLatency, time.Now().UnixNano()-goi.atime)
if goi.cold {
// see also: stats.GetColdCount
goi.t.statsT.Add(stats.GetColdSize, goi.lom.SizeBytes())
}
if goi.verchanged {
goi.t.statsT.AddMany(
Expand Down Expand Up @@ -1530,7 +1530,7 @@ func (coi *copyOI) doSend(lom *cluster.LOM, sargs *sendArgs) (size int64, err er
} else {
// 3. get a reader for this object
// - iff the object is not present ("cached"):
// - call t.Backend.GetObjReader, a variation of "cold GET"
// - call t.Backend.GetObjReader, a variation of cold-GET
if sargs.reader, sargs.objAttrs, err = coi.DP.Reader(lom); err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type (
WorkTag string // (=> work fqn)
OWT cmn.OWT
SkipEC bool // don't erasure-code when finalizing
ColdGET bool // this PUT is in fact cold-GET
ColdGET bool // this PUT is in fact a cold-GET
}
CopyObjectParams struct {
DM DataMover
Expand Down
8 changes: 4 additions & 4 deletions cmd/cli/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type (
}
longRun struct {
count int
footer int
lfooter int
refreshRate time.Duration
offset int64
mapBegin, mapEnd teb.StstMap
Expand Down Expand Up @@ -134,7 +134,7 @@ func (a *acli) runForever(args []string) error {
rate := a.longRun.refreshRate
for {
time.Sleep(rate)
printLongRunFooter(a.outWriter, a.longRun.footer)
printLongRunFooter(a.outWriter, a.longRun.lfooter)
if err := a.runOnce(args); err != nil {
return err
}
Expand Down Expand Up @@ -327,9 +327,9 @@ func setLongRunParams(c *cli.Context, footer ...int) bool {
if params.isSet() {
return false
}
params.footer = 8
params.lfooter = 8
if len(footer) > 0 {
params.footer = footer[0]
params.lfooter = footer[0]
}
params.init(c, false)
return true
Expand Down
Loading

0 comments on commit c3bfd04

Please sign in to comment.