Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handling non reschedule disconnecting and reconnecting allocs #18701

Merged
merged 27 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d7c3c66
style: change how the error is handled, for clarity, no functional ch…
Juanadelacuesta Oct 13, 2023
96bf373
func: take into account untaited disconnecting allocs to add at the f…
Juanadelacuesta Oct 10, 2023
c05ef46
style: simplify function, no functional change at all
Juanadelacuesta Oct 10, 2023
b83b9b2
func: avoid returning filterByRescheduleable if an untainted alloc hh…
Juanadelacuesta Oct 10, 2023
f904ba9
func: refactor filterByReschedulable for clarity, no functional updates
Juanadelacuesta Oct 10, 2023
8a4ac9a
func: Update reconcile util functions to properly handle reconnecting…
Juanadelacuesta Oct 11, 2023
ced0f6d
func: put back the resrtiction of not disconneting to let the createT…
Juanadelacuesta Oct 11, 2023
d3f30f4
func: when there are unknow allocs, dont ignore them, because they wi…
Juanadelacuesta Oct 12, 2023
9a256cf
func: update test to reflect change in behaviour for unknown allocs
Juanadelacuesta Oct 12, 2023
6b9ec36
style: refactor functions for clarity, no functional updates
Juanadelacuesta Oct 12, 2023
a09d674
style: change how the error is handled, for clarity, no functional ch…
Juanadelacuesta Oct 13, 2023
8e51c1e
style: function name change, for clarity, no functional changes
Juanadelacuesta Oct 16, 2023
96db1d4
func: use the last event to calculate the rescheduling time, not the …
Juanadelacuesta Oct 16, 2023
93a1bc3
func: dont try to remove allocs with uknown status
Juanadelacuesta Oct 18, 2023
653e80a
func: filter out unknown allocs from final count to avoid stoping all…
Juanadelacuesta Oct 18, 2023
36e7ba4
fix: create a copy of untainted allocs and filter that one, because t…
Juanadelacuesta Oct 19, 2023
3b7edfe
fix: refactor filterByRescheduleable to avoid having allocs that can …
Juanadelacuesta Oct 20, 2023
8cf9e9c
func: fix the logic for the expired allocation
Juanadelacuesta Oct 23, 2023
9e92400
func: stopped allocations from a dissconnectable client need to be ig…
Juanadelacuesta Oct 23, 2023
7522321
func: add extra condition of using max disconnect client hen deciding…
Juanadelacuesta Oct 24, 2023
d4ceaf1
fix: fix the condition for early return on next schedule time, it has…
Juanadelacuesta Oct 24, 2023
6f633d9
fix: change the logic for alloc replacement detection to using the al…
Juanadelacuesta Oct 25, 2023
832a087
func: avoid restoping stopped allocs, by checking the state before ad…
Juanadelacuesta Oct 25, 2023
bb34579
func: add extra condition to elegible later allocs, so allocs that ha…
Juanadelacuesta Oct 25, 2023
d684b7d
fix: update tests to not count stopped allocs as updates and correctl…
Juanadelacuesta Oct 25, 2023
3b83461
fix: update the filter by tainted function to ignore allocs that need…
Juanadelacuesta Oct 25, 2023
67d9033
fix: add correct alloc states and previous eval ID to disconnected al…
Juanadelacuesta Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ const (
)

const (

// SystemInitializationType is used for messages that initialize parts of
// the system, such as the state store. These messages are not included in
// the event stream.
Expand Down Expand Up @@ -10895,6 +10896,12 @@ func (a *Allocation) MigrateStrategy() *MigrateStrategy {
func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
failTime := a.LastEventTime()
reschedulePolicy := a.ReschedulePolicy()

//If reschedule is disabled, return early
if reschedulePolicy.Attempts == 0 && !reschedulePolicy.Unlimited {
return time.Time{}, false
}

if a.DesiredStatus == AllocDesiredStatusStop || a.ClientStatus != AllocClientStatusFailed || failTime.IsZero() || reschedulePolicy == nil {
return time.Time{}, false
}
Expand All @@ -10914,16 +10921,16 @@ func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *Re
return nextRescheduleTime, rescheduleEligible
}

// NextRescheduleTimeByFailTime works like NextRescheduleTime but allows callers
// NextRescheduleTimeByTime works like NextRescheduleTime but allows callers
// specify a failure time. Useful for things like determining whether to reschedule
// an alloc on a disconnected node.
func (a *Allocation) NextRescheduleTimeByFailTime(failTime time.Time) (time.Time, bool) {
func (a *Allocation) NextRescheduleTimeByTime(t time.Time) (time.Time, bool) {
reschedulePolicy := a.ReschedulePolicy()
if reschedulePolicy == nil {
return time.Time{}, false
}

return a.nextRescheduleTime(failTime, reschedulePolicy)
return a.nextRescheduleTime(t, reschedulePolicy)
}

// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
Expand Down Expand Up @@ -11223,7 +11230,7 @@ func (a *Allocation) Expired(now time.Time) bool {
}

expiry := lastUnknown.Add(*tg.MaxClientDisconnect)
return now.UTC().After(expiry) || now.UTC().Equal(expiry)
return expiry.Sub(now) <= 0
}

// LastUnknown returns the timestamp for the last time the allocation
Expand Down
2 changes: 1 addition & 1 deletion nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ SUBMIT:
}
return err
} else {
w.logger.Debug("created evaluation", "eval", log.Fmt("%#v", eval))
w.logger.Debug("created evaluation", "eval", log.Fmt("%#v", eval), "waitUntil", log.Fmt("%#v", eval.WaitUntil.String()))
w.backoffReset()
}
return nil
Expand Down
45 changes: 37 additions & 8 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6939,6 +6939,7 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
NodeID: disconnectedNode.ID,
Status: structs.EvalStatusPending,
}}

nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))

Expand All @@ -6948,16 +6949,21 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")

// One followup delayed eval created
require.Len(t, h.CreateEvals, 1)
followUpEval := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval.PreviousEval)
require.Equal(t, "pending", followUpEval.Status)
require.NotEmpty(t, followUpEval.WaitUntil)
// Two followup delayed eval created
require.Len(t, h.CreateEvals, 2)
followUpEval1 := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval1.PreviousEval)
require.Equal(t, "pending", followUpEval1.Status)
require.NotEmpty(t, followUpEval1.WaitUntil)

followUpEval2 := h.CreateEvals[1]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval2.PreviousEval)
require.Equal(t, "pending", followUpEval2.Status)
require.NotEmpty(t, followUpEval2.WaitUntil)

// Insert eval in the state store
// Insert eval1 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval.ID)
found, err := h.State.EvalByID(nil, followUpEval1.ID)
if err != nil {
return false, err
}
Expand All @@ -6971,12 +6977,34 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)

return true, nil
}, func(err error) {

require.NoError(t, err)
})

// Insert eval2 in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval2.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}

require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)

return true, nil
}, func(err error) {

require.NoError(t, err)
})

// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
// Pending update should have unknown status.

for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}
Expand All @@ -6986,6 +7014,7 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
require.NoError(t, err, "plan.NodeUpdate")

// Validate that the StateStore Upsert applied the ClientStatus we specified.

for _, alloc := range unknownAllocs {
alloc, err = h.State.AllocByID(nil, alloc.ID)
require.NoError(t, err)
Expand Down
92 changes: 58 additions & 34 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
groupComplete := a.computeGroup(group, as)
complete = complete && groupComplete
}

return complete
}

Expand Down Expand Up @@ -398,6 +399,7 @@ func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescr
// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {

// Create the desired update object for the group
desiredChanges := new(structs.DesiredUpdates)
a.result.desiredTGUpdates[groupName] = desiredChanges
Expand Down Expand Up @@ -426,6 +428,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)

// If there are allocations reconnecting we need to reconcile them and
// their replacements first because there is specific logic when deciding
// which ones to keep that can only be applied when the client reconnects.
Expand Down Expand Up @@ -454,20 +459,28 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
untainted = untainted.union(reconnect)
}

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of disconnecting allocations need to be rescheduled now,
// which ones later and which ones can't be rescheduled at all.
timeoutLaterEvals := map[string]string{}
if len(disconnecting) > 0 {
untaintedDisconnecting, rescheduleDisconnecting, laterDisconnecting := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)

// Determine what set of disconnecting allocations need to be rescheduled
_, rescheduleDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
untainted = untainted.union(untaintedDisconnecting)
rescheduleLater = append(rescheduleLater, laterDisconnecting...)

// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.createLostLaterEvals(lostLater, tg.Name)
// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals = a.createTimeoutLaterEvals(disconnecting, tg.Name)
}

// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name)
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLaterEvals := map[string]string{}
lostLater := []*delayedRescheduleInfo{}
if len(lost) > 0 {
lostLater = lost.delayByStopAfterClientDisconnect()
lostLaterEvals = a.createLostLaterEvals(lostLater, tg.Name)
}

// Merge disconnecting with the stop_after_client_disconnect set into the
// lostLaterEvals so that computeStop can add them to the stop set.
Expand All @@ -486,13 +499,15 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)

desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)

// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignore))
ignoreUpdates, inplace, destructive := a.computeUpdates(tg, untainted)

desiredChanges.Ignore += uint64(len(ignoreUpdates))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
dstate.DesiredTotal += len(destructive) + len(inplace)
Expand Down Expand Up @@ -796,7 +811,8 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
// replacements based off that.
failed := make(allocSet)
for id, alloc := range rescheduleNow {
if _, ok := a.result.disconnectUpdates[id]; !ok {
_, ok := a.result.disconnectUpdates[id]
if !ok && alloc.ClientStatus != structs.AllocClientStatusUnknown {
failed[id] = alloc
}
}
Expand Down Expand Up @@ -968,8 +984,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
untainted = untainted.difference(canaries)
}

// Remove disconnected allocations so they won't be stopped
knownUntainted := untainted.filterOutByClientStatus(structs.AllocClientStatusUnknown)

// Hot path the nothing to do case
remove := len(untainted) + len(migrate) - group.Count
remove := len(knownUntainted) + len(migrate) - group.Count
if remove <= 0 {
return stop
}
Expand Down Expand Up @@ -1072,7 +1091,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others allocSet) (allocSet, allocSet) {
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)

Expand Down Expand Up @@ -1111,14 +1130,11 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al

// Find replacement allocations and decide which one to stop. A
// reconnecting allocation may have multiple replacements.
for _, replacementAlloc := range others {
for _, replacementAlloc := range all {

// Skip allocations that are not a replacement of the one
// reconnecting. Replacement allocations have the same name but a
// higher CreateIndex and a different ID.
isReplacement := replacementAlloc.ID != reconnectingAlloc.ID &&
replacementAlloc.Name == reconnectingAlloc.Name &&
replacementAlloc.CreateIndex > reconnectingAlloc.CreateIndex
// reconnecting.
isReplacement := replacementAlloc.ID == reconnectingAlloc.NextAllocation

// Skip allocations that are server terminal.
// We don't want to replace a reconnecting allocation with one that
Expand All @@ -1142,12 +1158,14 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, others al
}
} else {
// The reconnecting allocation is preferred, so stop this
// replacement.
stop[replacementAlloc.ID] = replacementAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: replacementAlloc,
statusDescription: allocReconnected,
})
// replacement, but avoid re-stopping stopped allocs
if replacementAlloc.ClientStatus != structs.AllocClientStatusFailed {
stop[replacementAlloc.ID] = replacementAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: replacementAlloc,
statusDescription: allocReconnected,
})
}
}
}
}
Expand Down Expand Up @@ -1235,11 +1253,17 @@ func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedR
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName)

// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
for _, laterAlloc := range rescheduleLater {
existingAlloc := all[laterAlloc.alloc.ID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
updatedAlloc.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]

// Can't updated an allocation that is disconnected
if _, ok := a.result.disconnectUpdates[laterAlloc.allocID]; !ok {
a.result.attributeUpdates[laterAlloc.allocID] = updatedAlloc
} else {
a.result.disconnectUpdates[laterAlloc.allocID].FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID]
}
}
}

Expand Down Expand Up @@ -1355,8 +1379,8 @@ func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName
}

timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now)
if err != nil || len(timeoutDelays) != len(disconnecting) {
a.logger.Error("error computing disconnecting timeouts for task_group",
if err != nil {
a.logger.Error("error for task_group",
"task_group", tgName, "error", err)
return map[string]string{}
}
Expand Down
Loading
Loading