From abffbe3d4572a5619913f99a55ef9881e557b61b Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Tue, 23 Jul 2024 10:16:29 -0400 Subject: [PATCH] metasync: amend GFN notifications * num connection-refused retries: sync vs notify * metasync-notify: never reset handle-pending timer * add err-work-channel-full, and use it * with minor refactoring * part two, prev. commit: 1a01903358fcb0 Signed-off-by: Alex Aizman --- ais/clustermap.go | 4 +-- ais/metasync.go | 63 ++++++++++++++++++++++++++++++-------------- cmn/cos/err.go | 2 ++ mirror/put_copies.go | 2 +- transport/base.go | 2 +- xact/xs/tcobjs.go | 2 +- 6 files changed, 50 insertions(+), 25 deletions(-) diff --git a/ais/clustermap.go b/ais/clustermap.go index 55d5dbf75ce..365fb4d8ce1 100644 --- a/ais/clustermap.go +++ b/ais/clustermap.go @@ -711,7 +711,7 @@ func (sls *sls) notify(ver int64) { return } sls.postCh <- ver - if len(sls.postCh) == cap(sls.postCh) { - nlog.ErrorDepth(1, "sls channel full: Smap v", ver) // unlikely + if l, c := len(sls.postCh), cap(sls.postCh); l > c/2 { + nlog.ErrorDepth(1, cos.ErrWorkChanFull, l, c, "Smap version:", ver) // unlikely } } diff --git a/ais/metasync.go b/ais/metasync.go index 53e15587c32..e80061be6f1 100644 --- a/ais/metasync.go +++ b/ais/metasync.go @@ -94,7 +94,12 @@ const ( const failsync = "failing to sync" -const retryConnRefused = 4 +const ( + retrySyncRefused = 4 + retryNotifyRefused = 2 +) + +const workChanCap = 32 type ( revs interface { @@ -113,7 +118,7 @@ type ( wg *sync.WaitGroup failedCnt *atomic.Int32 pairs []revsPair - reqType int // enum: reqSync, etc. + ty int // enum: { reqSync, reqNotify } } msPayload map[string][]byte // tag => revs' body ndRevs map[string]int64 // tag => version (see nodesRevs) @@ -155,7 +160,7 @@ func newMetasyncer(p *proxy) (y *metasyncer) { y.lastSynced = make(map[string]revs, revsMaxTags) y.stopCh = make(chan struct{}, 1) - y.workCh = make(chan revsReq, 32) + y.workCh = make(chan revsReq, workChanCap) y.retryTimer = time.NewTimer(time.Hour) y.retryTimer.Stop() @@ -164,9 +169,8 @@ func newMetasyncer(p *proxy) (y *metasyncer) { } func (y *metasyncer) Run() error { - nlog.Infof("Starting %s", y.Name()) + nlog.Infoln("Starting", y.Name()) for { - config := cmn.GCO.Get() select { case revsReq, ok := <-y.workCh: if !ok { @@ -180,14 +184,17 @@ func (y *metasyncer) Run() error { y.timerStopped = true break } - failedCnt := y.do(revsReq.pairs, revsReq.reqType) + failedCnt := y.do(revsReq.pairs, revsReq.ty) if revsReq.wg != nil { if revsReq.failedCnt != nil { revsReq.failedCnt.Store(int32(failedCnt)) } revsReq.wg.Done() } - if y.timerStopped && failedCnt > 0 { + + // timed retry, via handlePending() + if y.timerStopped && failedCnt > 0 && revsReq.ty != reqNotify { + config := cmn.GCO.Get() y.retryTimer.Reset(config.Periodic.RetrySyncTime.D()) y.timerStopped = false } @@ -197,8 +204,14 @@ func (y *metasyncer) Run() error { case <-y.retryTimer.C: failedCnt := y.handlePending() if failedCnt > 0 { + config := cmn.GCO.Get() y.retryTimer.Reset(config.Periodic.RetrySyncTime.D()) y.timerStopped = false + + if l, c := len(y.workCh), cap(y.workCh); l > c/2 { + nlog.Errorln("Warning:", y.p.String(), "[hp]:", cos.ErrWorkChanFull, "len", l, "cap", c, + "failed", failedCnt) + } } else { y.timerStopped = true } @@ -210,8 +223,11 @@ func (y *metasyncer) Run() error { } func (y *metasyncer) Stop(err error) { - nlog.Infof("Stopping %s: %v", y.Name(), err) - + if err == nil { + nlog.Infoln("Stopping", y.Name()) + } else { + nlog.Infoln("Stopping", y.Name()+":", err) + } y.stopCh <- struct{}{} close(y.stopCh) } @@ -220,7 +236,7 @@ func (y *metasyncer) Stop(err error) { func (y *metasyncer) notify(wait bool, pair revsPair) (failedCnt int) { var ( failedCntAtomic = atomic.NewInt32(0) - req = revsReq{pairs: []revsPair{pair}, reqType: reqNotify} + req = revsReq{pairs: []revsPair{pair}, ty: reqNotify} ) if y.isPrimary() != nil { return @@ -248,7 +264,7 @@ func (y *metasyncer) sync(pairs ...revsPair) *sync.WaitGroup { return req.wg } req.wg.Add(1) - req.reqType = reqSync + req.ty = reqSync y.workCh <- req return req.wg } @@ -281,7 +297,7 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) { method = http.MethodPost } if nlog.Stopping() { - return + return 0 } // step: build payload and update last sync-ed @@ -326,11 +342,13 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) { body = payload.marshal(y.p.gmm) to = core.AllNodes smap = y.p.owner.smap.get() + retries = retrySyncRefused // connection-refused ) defer body.Free() if reqT == reqNotify { to = core.Targets + retries = retryNotifyRefused } args := allocBcArgs() args.req = cmn.HreqArgs{Method: method, Path: urlPath, BodyR: body} @@ -368,9 +386,10 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) { } } freeBcastRes(results) + // step: handle connection-refused right away lr := len(refused) - for range retryConnRefused { + for range retries { if len(refused) == 0 { if lr > 0 { nlog.Infof("%s: %d node%s sync-ed", y.p, lr, cos.Plural(lr)) @@ -381,12 +400,13 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) { smap = y.p.owner.smap.get() if !smap.isPrimary(y.p.si) { y.becomeNonPrimary() - return + return 0 } if !y.handleRefused(method, urlPath, body, refused, pairs, smap) { break } } + // step: housekeep and return new pending smap = y.p.owner.smap.get() for sid := range y.nodesRevs { @@ -396,7 +416,7 @@ func (y *metasyncer) do(pairs []revsPair, reqT int) (failedCnt int) { } } failedCnt += len(refused) - return + return failedCnt } func (y *metasyncer) jit(pair revsPair) revs { @@ -460,7 +480,7 @@ func (y *metasyncer) handleRefused(method, urlPath string, body io.Reader, refus // failing to sync if res.status == http.StatusConflict { if e := err2MsyncErr(res.err); e != nil { - msg := fmt.Sprintf("%s [hr]: %s %s: %s [%v]", y.p.si, failsync, res.si, e.Message, e.Cii) + msg := fmt.Sprintf("%s [hr]: %s %s: %s [%v]", y.p, failsync, res.si, e.Message, e.Cii) if !y.remainPrimary(e, res.si, smap) { nlog.Errorln(msg + " - aborting") freeBcastRes(results) @@ -522,8 +542,11 @@ func (y *metasyncer) _pending() (pending meta.NodeMap, smap *smapX) { func (y *metasyncer) handlePending() (failedCnt int) { pending, smap := y._pending() if len(pending) == 0 { - nlog.Infof("no pending revs - all good") - return + nlog.Infoln("no pending revs - all good") + return 0 + } + if nlog.Stopping() { + return 0 } var ( l = len(y.lastSynced) @@ -582,7 +605,7 @@ func (y *metasyncer) handlePending() (failedCnt int) { nlog.Warningf("%s [hp]: %s %s: %v(%d)", y.p, failsync, res.si, res.err, res.status) } freeBcastRes(results) - return + return failedCnt } // cie and isPrimary checks versus remote clusterInfo @@ -608,7 +631,7 @@ func (y *metasyncer) remainPrimary(e *errMsync, from *meta.Snode, smap *smapX) b return true } nlog.Errorf("%s: [%s %s] vs %v from %s", ciError(90), y.p, smap.StringEx(), e.Cii, from) - return true // TODO: iffy; may need to do more + return true } func (y *metasyncer) isPrimary() (err error) { diff --git a/cmn/cos/err.go b/cmn/cos/err.go index 31c1551339b..97da469cd57 100644 --- a/cmn/cos/err.go +++ b/cmn/cos/err.go @@ -42,6 +42,8 @@ var ( errQuantityNonNegative = errors.New("quantity should not be negative") ) +var ErrWorkChanFull = errors.New("work channel full") + var errBufferUnderrun = errors.New("buffer underrun") // ErrNotFound diff --git a/mirror/put_copies.go b/mirror/put_copies.go index ea61a8d75ec..9fcfd5ae4fc 100644 --- a/mirror/put_copies.go +++ b/mirror/put_copies.go @@ -214,7 +214,7 @@ func (r *XactPut) stop() (err error) { err = fmt.Errorf("%s: dropped %d object%s", r, n, cos.Plural(n)) } if cnt := r.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && cmn.Rom.FastV(5, cos.SmoduleMirror)) { - nlog.Errorln("work channel full (all mp workers)", r.String(), cnt) + nlog.Errorln(cos.ErrWorkChanFull, "(all mp workers)", r.String(), "cnt", cnt) } return } diff --git a/transport/base.go b/transport/base.go index 85388126a18..cf4c2129ba3 100644 --- a/transport/base.go +++ b/transport/base.go @@ -289,7 +289,7 @@ func (s *streamBase) sendLoop(dryrun bool) { s.streamer.abortPending(err, false /*completions*/) if cnt := s.chanFull.Load(); (cnt >= 10 && cnt <= 20) || (cnt > 0 && verbose) { - nlog.Errorln("work channel full", s.lid, cnt) + nlog.Errorln(cos.ErrWorkChanFull, s.lid, "cnt", cnt) } } diff --git a/xact/xs/tcobjs.go b/xact/xs/tcobjs.go index f514f64ba2f..db36aa37dfd 100644 --- a/xact/xs/tcobjs.go +++ b/xact/xs/tcobjs.go @@ -222,7 +222,7 @@ func (r *XactTCObjs) Do(msg *cmn.TCObjsMsg) { if l == c { cnt := r.chanFull.Inc() if (cnt >= 10 && cnt <= 20) || (cnt > 0 && cmn.Rom.FastV(5, cos.SmoduleXs)) { - nlog.Errorln("work channel full", r.Name()) + nlog.Errorln(cos.ErrWorkChanFull, r.Name(), "cnt", cnt) } } }