Skip to content

Commit

Permalink
fix: handling non reschedule disconnecting and reconnecting allocs (#…
Browse files Browse the repository at this point in the history
…18701)

This PR fixes a long lived bug, where disconnecting allocations where never rescheduled by their policy but because the group count was short. The default reschedule time for services and batches is 30 and 5 seconds respectively, in order to properly reschedule disconnected allocs, they need to be able to be rescheduled for later, a path that was not handled before. This PR introduces a way to handle such allocations.
  • Loading branch information
Juanadelacuesta committed Oct 27, 2023
1 parent 1e0ed5a commit 931240c
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 236 deletions.
14 changes: 10 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10427,6 +10427,12 @@ func (a *Allocation) MigrateStrategy() *MigrateStrategy {
func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
failTime := a.LastEventTime()
reschedulePolicy := a.ReschedulePolicy()

//If reschedule is disabled, return early
if reschedulePolicy.Attempts == 0 && !reschedulePolicy.Unlimited {
return time.Time{}, false
}

if a.DesiredStatus == AllocDesiredStatusStop || a.ClientStatus != AllocClientStatusFailed || failTime.IsZero() || reschedulePolicy == nil {
return time.Time{}, false
}
Expand All @@ -10446,16 +10452,16 @@ func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *Re
return nextRescheduleTime, rescheduleEligible
}

// NextRescheduleTimeByFailTime works like NextRescheduleTime but allows callers
// NextRescheduleTimeByTime works like NextRescheduleTime but allows callers
// specify a failure time. Useful for things like determining whether to reschedule
// an alloc on a disconnected node.
func (a *Allocation) NextRescheduleTimeByFailTime(failTime time.Time) (time.Time, bool) {
func (a *Allocation) NextRescheduleTimeByTime(t time.Time) (time.Time, bool) {
reschedulePolicy := a.ReschedulePolicy()
if reschedulePolicy == nil {
return time.Time{}, false
}

return a.nextRescheduleTime(failTime, reschedulePolicy)
return a.nextRescheduleTime(t, reschedulePolicy)
}

// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
Expand Down Expand Up @@ -10794,7 +10800,7 @@ func (a *Allocation) Expired(now time.Time) bool {
}

expiry := lastUnknown.Add(*tg.MaxClientDisconnect)
return now.UTC().After(expiry) || now.UTC().Equal(expiry)
return expiry.Sub(now) <= 0
}

// LastUnknown returns the timestamp for the last time the allocation
Expand Down
2 changes: 1 addition & 1 deletion nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ SUBMIT:
}
return err
} else {
w.logger.Debug("created evaluation", "eval", log.Fmt("%#v", eval))
w.logger.Debug("created evaluation", "eval", log.Fmt("%#v", eval), "waitUntil", log.Fmt("%#v", eval.WaitUntil.String()))
w.backoffReset()
}
return nil
Expand Down
45 changes: 37 additions & 8 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6753,6 +6753,7 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
NodeID: disconnectedNode.ID,
Status: structs.EvalStatusPending,
}}

nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))

Expand All @@ -6762,16 +6763,21 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")

// One followup delayed eval created
require.Len(t, h.CreateEvals, 1)
followUpEval := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval.PreviousEval)
require.Equal(t, "pending", followUpEval.Status)
require.NotEmpty(t, followUpEval.WaitUntil)
// Two followup delayed eval created
require.Len(t, h.CreateEvals, 2)
followUpEval1 := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
require.Equal(t, "pending", followUpEval1.Status)
require.NotEmpty(t, followUpEval1.WaitUntil)

followUpEval2 := h.CreateEvals[1]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
require.Equal(t, "pending", followUpEval2.Status)
require.NotEmpty(t, followUpEval2.WaitUntil)

// Insert eval in the state store
// Insert eval1 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval.ID)
found, err := h.State.EvalByID(nil, followUpEval1.ID)
if err != nil {
return false, err
}
Expand All @@ -6785,12 +6791,34 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)

return true, nil
}, func(err error) {

require.NoError(t, err)
})

// Insert eval2 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval2.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}

require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)

return true, nil
}, func(err error) {

require.NoError(t, err)
})

// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
// Pending update should have unknown status.

for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}
Expand All @@ -6800,6 +6828,7 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
require.NoError(t, err, "plan.NodeUpdate")

// Validate that the StateStore Upsert applied the ClientStatus we specified.

for _, alloc := range unknownAllocs {
alloc, err = h.State.AllocByID(nil, alloc.ID)
require.NoError(t, err)
Expand Down
92 changes: 58 additions & 34 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
groupComplete := a.computeGroup(group, as)
complete = complete && groupComplete
}

return complete
}

Expand Down Expand Up @@ -395,6 +396,7 @@ func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescr
// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {

// Create the desired update object for the group
desiredChanges := new(structs.DesiredUpdates)
a.result.desiredTGUpdates[groupName] = desiredChanges
Expand Down Expand Up @@ -423,6 +425,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)

// If there are allocations reconnecting we need to reconcile them and
// their replacements first because there is specific logic when deciding
// which ones to keep that can only be applied when the client reconnects.
Expand Down Expand Up @@ -451,20 +456,28 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted = untainted.union(reconnect)
}

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of disconnecting allocations need to be rescheduled now,
// which ones later and which ones can't be rescheduled at all.
timeoutLaterEvals := map[string]string{}
if len(disconnecting) > 0 {
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)

// Determine what set of disconnecting allocations need to be rescheduled
_, rescheduleDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
untainted = untainted.union(untaintedDisconnecting)
rescheduleLater = append(rescheduleLater, laterDisconnecting...)

// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.createLostLaterEvals(lostLater, tg.Name)
// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals = a.createTimeoutLaterEvals(disconnecting, tg.Name)
}

// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name)
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLaterEvals := map[string]string{}
lostLater := []*delayedRescheduleInfo{}
if len(lost) > 0 {
lostLater = lost.delayByStopAfterClientDisconnect()
lostLaterEvals = a.createLostLaterEvals(lostLater, tg.Name)
}

// Merge disconnecting with the stop_after_client_disconnect set into the
// lostLaterEvals so that computeStop can add them to the stop set.
Expand All @@ -483,13 +496,15 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)

desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)

// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignore))
ignoreUpdates, inplace, destructive := a.computeUpdates(tg, untainted)

desiredChanges.Ignore += uint64(len(ignoreUpdates))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
dstate.DesiredTotal += len(destructive) + len(inplace)
Expand Down Expand Up @@ -793,7 +808,8 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
// replacements based off that.
failed := make(allocSet)
for id, alloc := range rescheduleNow {
if _, ok := a.result.disconnectUpdates[id]; !ok {
_, ok := a.result.disconnectUpdates[id]
if !ok && alloc.ClientStatus != structs.AllocClientStatusUnknown {
failed[id] = alloc
}
}
Expand Down Expand Up @@ -965,8 +981,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
untainted = untainted.difference(canaries)
}

// Remove disconnected allocations so they won't be stopped
knownUntainted := untainted.filterOutByClientStatus(structs.AllocClientStatusUnknown)

// Hot path the nothing to do case
remove := len(untainted) + len(migrate) - group.Count
remove := len(knownUntainted) + len(migrate) - group.Count
if remove <= 0 {
return stop
}
Expand Down Expand Up @@ -1069,7 +1088,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) {
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)

Expand Down Expand Up @@ -1108,14 +1127,11 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al

// Find replacement allocations and decide which one to stop. A
// reconnecting allocation may have multiple replacements.
for _, replacementAlloc := range others {
for _, replacementAlloc := range all {

// Skip allocations that are not a replacement of the one
// reconnecting. Replacement allocations have the same name but a
// higher CreateIndex and a different ID.
isReplacement := replacementAlloc.ID != reconnectingAlloc.ID &&
replacementAlloc.Name == reconnectingAlloc.Name &&
replacementAlloc.CreateIndex > reconnectingAlloc.CreateIndex
// reconnecting.
isReplacement := replacementAlloc.ID == reconnectingAlloc.NextAllocation

// Skip allocations that are server terminal.
// We don't want to replace a reconnecting allocation with one that
Expand All @@ -1139,12 +1155,14 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al
}
} else {
// The reconnecting allocation is preferred, so stop this
// replacement.
stop[replacementAlloc.ID] = replacementAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: replacementAlloc,
statusDescription: allocReconnected,
})
// replacement, but avoid re-stopping stopped allocs
if replacementAlloc.ClientStatus != structs.AllocClientStatusFailed {
stop[replacementAlloc.ID] = replacementAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: replacementAlloc,
statusDescription: allocReconnected,
})
}
}
}
}
Expand Down Expand Up @@ -1232,11 +1250,17 @@ func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedR
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName)

// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
for _, laterAlloc := range rescheduleLater {
existingAlloc := all[laterAlloc.alloc.ID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
updatedAlloc.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]

// Can't updated an allocation that is disconnected
if _, ok := a.result.disconnectUpdates[laterAlloc.allocID]; !ok {
a.result.attributeUpdates[laterAlloc.allocID] = updatedAlloc
} else {
a.result.disconnectUpdates[laterAlloc.allocID].FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]
}
}
}

Expand Down Expand Up @@ -1352,8 +1376,8 @@ func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName
}

timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now)
if err != nil || len(timeoutDelays) != len(disconnecting) {
a.logger.Error("error computing disconnecting timeouts for task_group",
if err != nil {
a.logger.Error("error for task_group",
"task_group", tgName, "error", err)
return map[string]string{}
}
Expand Down
Loading

0 comments on commit 931240c

Please sign in to comment.