Skip to content

Commit

Permalink
global rebalance: amend streams init/term
Browse files Browse the repository at this point in the history
* part two

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 22, 2025
1 parent 8c45ccf commit a17d9da
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 26 deletions.
2 changes: 1 addition & 1 deletion ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func (t *target) _runRe(newRMD *rebMD, msg *actMsgExt, smap *smapX, oxid string)
NID: newRMD.Version,
}
)
if msg.UUID != nxid {
if msg.UUID != nxid && msg.UUID != "" {
nlog.Warningln(tag, msg.UUID, "vs", nxid)
}

Expand Down
35 changes: 25 additions & 10 deletions reb/globrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func New(config *cmn.Config) *Reb {
}

func (reb *Reb) _preempt(logHdr, oxid string) error {
const (
retries = 10 // TODO: config
)
oxreb, err := xreg.GetXact(oxid)
if err != nil {
return err
Expand All @@ -161,15 +164,18 @@ func (reb *Reb) _preempt(logHdr, oxid string) error {
oxreb.Abort(cmn.ErrXactRenewAbort)
nlog.Warningln(logHdr, "[", cmn.ErrXactRenewAbort, oxreb.String(), "]", reb.dm.String())
}
for i := range 10 { // TODO: config
for i := range retries {
if reb.dm.IsFree() {
break
return nil
}
time.Sleep(time.Second)
if i > 2 && i&1 == 1 {
nlog.Warningln(logHdr, "preempt: polling for", reb.dm.String())
}
}
// force
reb.dm.Abort()
reb.dm.UnregRecv()
return nil
}

Expand Down Expand Up @@ -400,7 +406,12 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, haveStreams boo
if dm := reb.dm.Renew(trname, reb.recvObj, cmn.OwtRebalance, dmExtra); dm != nil {
reb.dm = dm
}
reb.beginStreams(rargs)
if err := reb.beginStreams(rargs); err != nil {
rargs.xreb.Abort(err)
reb.mu.Unlock()
nlog.Errorln(err)
return false
}
}

if reb.awaiting.targets == nil {
Expand All @@ -419,7 +430,7 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, haveStreams boo
if fatalErr != nil {
err = fatalErr
}
reb.endStreams(err)
reb.endStreams(err, rargs.logHdr)
rargs.xreb.Abort(err)
reb.mu.Unlock()
nlog.Errorln("FATAL:", fatalErr, "WRITE:", writeErr)
Expand All @@ -436,16 +447,20 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, haveStreams boo
return true
}

func (reb *Reb) beginStreams(rargs *rebArgs) {
debug.Assert(reb.stages.stage.Load() == rebStageInit)
func (reb *Reb) beginStreams(rargs *rebArgs) error {
if reb.stages.stage.Load() != rebStageInit {
return fmt.Errorf("%s: cannot begin at stage %d", rargs.logHdr, reb.stages.stage.Load())
}

reb.dm.SetXact(rargs.xreb)
reb.dm.Open()
return nil
}

func (reb *Reb) endStreams(err error) {
swapped := reb.stages.stage.CAS(rebStageFin, rebStageFinStreams)
debug.Assert(swapped)
func (reb *Reb) endStreams(err error, loghdr string) {
if !reb.stages.stage.CAS(rebStageFin, rebStageFinStreams) {
nlog.Warningln(loghdr, "stage", reb.stages.stage.Load())
}
reb.dm.Close(err)
}

Expand Down Expand Up @@ -689,7 +704,7 @@ func (reb *Reb) fini(rargs *rebArgs, err error, tstats cos.StatsUpdater) {
_ = fs.RemoveMarker(fname.NodeRestartedPrev, tstats)
}

reb.endStreams(err)
reb.endStreams(err, rargs.logHdr)
reb.filterGFN.Reset()

xreb.ToStats(&stats)
Expand Down
4 changes: 2 additions & 2 deletions transport/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func New(cl transport.Client, args Args) (sb *Streams) {
sb._lid()
nlog.Infoln("open", sb.lid)

// register this stream-bundle as Smap listener
// for auto-resync, register this stream-bundle as Smap listener
if !sb.manualResync {
listeners := core.T.Sowner().Listeners()
listeners.Reg(sb)
Expand Down Expand Up @@ -306,7 +306,7 @@ func (sb *Streams) Abort() {
}

func (sb *Streams) apply(action int) {
cos.Assert(action == closeFin || action == closeStop)
debug.Assert(action == closeFin || action == closeStop)
var (
streams = sb.get()
wg = &sync.WaitGroup{}
Expand Down
18 changes: 6 additions & 12 deletions transport/bundle/dmover.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,26 +246,20 @@ func (dm *DataMover) Close(err error) {
err = dm.xctn.AbortErr()
}
// nil: close gracefully via `fin`, otherwise abort
if err == nil {
dm.data.streams.Close(true)
if dm.useACKs() {
dm.ack.streams.Close(true)
}
nlog.Infoln(dm.String(), "closed")
} else {
dm.data.streams.Close(false)
if dm.useACKs() {
dm.ack.streams.Close(false)
}
nlog.Infoln(dm.String(), "aborted:", err)
dm.data.streams.Close(err == nil)
if dm.useACKs() {
dm.ack.streams.Close(err == nil)
}
nlog.Infoln(dm.String(), err)
}

func (dm *DataMover) Abort() {
dm.data.streams.Abort()
if dm.useACKs() {
dm.ack.streams.Abort()
}
dm.stage.opened.Store(false)
nlog.Warningln("dm.abort", dm.String())
}

func (dm *DataMover) Send(obj *transport.Obj, roc cos.ReadOpenCloser, tsi *meta.Snode) (err error) {
Expand Down
2 changes: 1 addition & 1 deletion xact/xs/rebres.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (p *rebFactory) WhenPrevIsRunning(prevEntry xreg.Renewable) (wpr xreg.WPR,
nlog.Errorln("FATAL:", prev.Args.UUID, ep)
return xreg.WprAbort, ep
}
debug.Assert(ip < ic)
debug.Assert(ip <= ic, "curr ", p.Args.UUID, "> prev ", prev.Args.UUID)
return xreg.WprAbort, nil
}

Expand Down

0 comments on commit a17d9da

Please sign in to comment.