Skip to content

Commit

Permalink
StepRunner: return ResultNotifier in AddTargetToStep function (#96)
Browse files Browse the repository at this point in the history
* StepRunner: return ResultNotifier in AddTargetToStep function
* StepRunner: fix race condition in getting final results in WaitResults
  • Loading branch information
rihter007 authored Jul 27, 2022
1 parent 6a2eb63 commit 240ce54
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 299 deletions.
135 changes: 67 additions & 68 deletions pkg/runner/step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@ import (
"github.com/linuxboot/contest/pkg/xcontext"
)

type AddTargetToStep func(ctx xcontext.Context, tgt *target.Target) error

type StepRunnerEvent struct {
// Target if set represents the target for which event was generated
Target *target.Target
// Err if Target is not nil refers to this Target result otherwise is execution error
Err error
type ChanNotifier interface {
NotifyCh() <-chan error
}

type AddTargetToStep func(ctx xcontext.Context, tgt *target.Target) (ChanNotifier, error)

type StepResult struct {
Err error
ResumeState json.RawMessage
Expand All @@ -37,18 +34,38 @@ type StepRunner struct {
inputWg sync.WaitGroup
activeTargets map[string]*stepTargetInfo

started bool
stopped chan struct{}
finishedCh chan struct{}
resultsChan chan<- StepRunnerEvent
runningLoopActive bool
notifyStopped *resultNotifier

resultErr error
resultResumeState json.RawMessage
notifyStopped func(err error)
}

type resultNotifier struct {
resultCh chan error
}

func newResultNotifier() *resultNotifier {
return &resultNotifier{
resultCh: make(chan error, 1),
}
}

func (str *resultNotifier) NotifyCh() <-chan error {
return str.resultCh
}

func (str *resultNotifier) postResult(err error) {
str.resultCh <- err
close(str.resultCh)
}

type stepTargetInfo struct {
targetInEmitted bool
result *resultNotifier
}

func (sti *stepTargetInfo) acquireTargetInEmission() bool {
Expand All @@ -64,6 +81,7 @@ func NewStepRunner() *StepRunner {
return &StepRunner{
input: make(chan *target.Target),
activeTargets: make(map[string]*stepTargetInfo),
notifyStopped: newResultNotifier(),
stopped: make(chan struct{}),
finishedCh: make(chan struct{}),
}
Expand All @@ -75,31 +93,22 @@ func (sr *StepRunner) Run(
ev testevent.Emitter,
resumeState json.RawMessage,
resumeStateTargets []target.Target,
) (<-chan StepRunnerEvent, AddTargetToStep, error) {
) (AddTargetToStep, []ChanNotifier, ChanNotifier, error) {
sr.mu.Lock()
defer sr.mu.Unlock()

if sr.resultsChan != nil {
return nil, nil, &cerrors.ErrAlreadyDone{}
}

var notifyStoppedOnce sync.Once
sr.notifyStopped = func(err error) {
notifyStoppedOnce.Do(func() {
select {
case sr.resultsChan <- StepRunnerEvent{Err: err}:
case <-ctx.Done():
}
})
if sr.started {
return nil, nil, nil, &cerrors.ErrAlreadyDone{}
}

resultsChan := make(chan StepRunnerEvent, 1)
sr.resultsChan = resultsChan

var resumedTargetsResults []ChanNotifier
for _, resumeTarget := range resumeStateTargets {
sr.activeTargets[resumeTarget.ID] = &stepTargetInfo{
targetInfo := &stepTargetInfo{
targetInEmitted: true,
result: newResultNotifier(),
}
sr.activeTargets[resumeTarget.ID] = targetInfo
resumedTargetsResults = append(resumedTargetsResults, targetInfo.result)
}

var activeLoopsCount int32 = 2
Expand All @@ -110,13 +119,13 @@ func (sr *StepRunner) Run(

sr.mu.Lock()
close(sr.finishedCh)
// if an error occurred we already sent notification
if sr.resultErr == nil {
sr.notifyStopped.postResult(nil)
}
sr.mu.Unlock()

// if an error occurred we already sent notification
sr.notifyStopped(nil)
close(sr.resultsChan)
ctx.Debugf("StepRunner finished")

sr.Stop()
}

Expand All @@ -133,47 +142,49 @@ func (sr *StepRunner) Run(
ctx.Debugf("Reading loop finished")
}()

return resultsChan, func(ctx xcontext.Context, tgt *target.Target) error {
sr.started = true
return func(ctx xcontext.Context, tgt *target.Target) (ChanNotifier, error) {
return sr.addTarget(ctx, bundle, ev, tgt)
}, nil
}, resumedTargetsResults, sr.notifyStopped, nil
}

func (sr *StepRunner) addTarget(
ctx xcontext.Context,
bundle test.TestStepBundle,
ev testevent.Emitter,
tgt *target.Target,
) error {
) (ChanNotifier, error) {
if tgt == nil {
return fmt.Errorf("target should not be nil")
return nil, fmt.Errorf("target should not be nil")
}

sr.mu.Lock()
stopped := sr.stopped
sr.mu.Unlock()
if stopped == nil {
if err := sr.getErr(); err != nil {
return err
return nil, err
}
return fmt.Errorf("step runner was stopped")
return nil, fmt.Errorf("step runner was stopped")
}

err := func() error {
targetInfo, err := func() (*stepTargetInfo, error) {
targetInfo, err := func() (*stepTargetInfo, error) {
sr.mu.Lock()
defer sr.mu.Unlock()

if targetInfo := sr.activeTargets[tgt.ID]; targetInfo != nil {
return nil, fmt.Errorf("target is already processed")
return nil, fmt.Errorf("target is already added")
}
targetInfo := &stepTargetInfo{
result: newResultNotifier(),
}
targetInfo := &stepTargetInfo{}
sr.activeTargets[tgt.ID] = targetInfo
sr.inputWg.Add(1)
return targetInfo, nil
}()

if err != nil {
return err
return nil, err
}

defer sr.inputWg.Done()
Expand All @@ -189,13 +200,13 @@ func (sr *StepRunner) addTarget(
}
}
sr.mu.Unlock()
return nil
return targetInfo, nil
case <-stopped:
return fmt.Errorf("step runner was stopped")
return nil, fmt.Errorf("step runner was stopped")
case <-ctx.Until(xcontext.ErrPaused):
return xcontext.ErrPaused
return nil, xcontext.ErrPaused
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
}
}()

Expand All @@ -210,16 +221,16 @@ func (sr *StepRunner) addTarget(
err = sr.resultErr
}
sr.mu.Unlock()
return err
return nil, err
}
return nil
return targetInfo.result, nil
}

func (sr *StepRunner) Started() bool {
sr.mu.Lock()
defer sr.mu.Unlock()

return sr.resultsChan != nil
return sr.started
}

// WaitResults returns TestStep.Run() output
Expand Down Expand Up @@ -253,8 +264,8 @@ func (sr *StepRunner) WaitResults(ctx context.Context) (stepResult StepResult, e
sr.mu.Lock()
defer sr.mu.Unlock()
return StepResult{
Err: resultErr,
ResumeState: resultResumeState,
Err: sr.resultErr,
ResumeState: sr.resultResumeState,
}, nil
}

Expand Down Expand Up @@ -316,24 +327,24 @@ func (sr *StepRunner) outputLoop(
}
ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID)

shouldEmitTargetIn, err := func() (bool, error) {
shouldEmitTargetIn, targetResult, err := func() (bool, *resultNotifier, error) {
sr.mu.Lock()
defer sr.mu.Unlock()

info, found := sr.activeTargets[res.Target.ID]
if !found {
return false, &cerrors.ErrTestStepReturnedUnexpectedResult{
return false, nil, &cerrors.ErrTestStepReturnedUnexpectedResult{
StepName: testStepLabel,
Target: res.Target.ID,
}
}
if info == nil {
return false, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}
return false, nil, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}
}
sr.activeTargets[res.Target.ID] = nil

shouldEmitTargetIn := info.acquireTargetInEmission()
return shouldEmitTargetIn, nil
return shouldEmitTargetIn, info.result, nil
}()
if err != nil {
sr.setErr(ctx, err)
Expand All @@ -357,16 +368,7 @@ func (sr *StepRunner) outputLoop(
sr.setErr(ctx, err)
return
}

select {
case sr.resultsChan <- StepRunnerEvent{Target: res.Target, Err: res.Err}:
case <-ctx.Done():
ctx.Debugf(
"reading loop detected context canceled, target '%s' with result: '%v' was not reported",
res.Target.ID,
res.Err,
)
}
targetResult.postResult(res.Err)
case <-ctx.Done():
ctx.Debugf("IO loop detected context canceled")
return
Expand Down Expand Up @@ -432,10 +434,7 @@ func (sr *StepRunner) setErrLocked(ctx xcontext.Context, err error) {
ctx.Errorf("err: %v", err)
sr.resultErr = err

// notifyStopped is a blocking operation: should release the lock
sr.mu.Unlock()
sr.notifyStopped(err)
sr.mu.Lock()
sr.notifyStopped.postResult(err)
}

func (sr *StepRunner) getErr() error {
Expand Down
Loading

0 comments on commit 240ce54

Please sign in to comment.