Skip to content

Commit

Permalink
Merge branch 'develop' into bugfix/recover_from_job_panics
Browse files Browse the repository at this point in the history
  • Loading branch information
xaionaro authored May 9, 2024
2 parents dfc6f87 + 6ff4d0f commit 1aa19ab
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
Expand All @@ -16,7 +16,7 @@ jobs:
- name: Run Tests
run: CI=true ./run_tests.sh
contest-generator:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ui-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
- 'cmds/admin_server/ui/**'
jobs:
build:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
Expand Down
1 change: 0 additions & 1 deletion docker-compose.mariadb.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3.2"
services:
contest:
build:
Expand Down
13 changes: 10 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3.2"
services:
contest:
build:
Expand All @@ -24,15 +23,23 @@ services:
ports:
- 3306:3306
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h127.0.0.1", "-ucontest", "-pcontest"]
test:
[
"CMD",
"mysqladmin",
"ping",
"-h127.0.0.1",
"-ucontest",
"-pcontest"
]
interval: 5s
timeout: 1s
retries: 10
networks:
net:
aliases:
- dbstorage

admin-server:
build:
context: .
Expand Down
2 changes: 1 addition & 1 deletion docker/contest/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.18-buster
FROM golang:1.18

RUN apt-get update && apt-get install -y mariadb-client openssh-server

Expand Down
4 changes: 3 additions & 1 deletion docker/contest/tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fi
# Run integration tests collecting coverage only for the business logic (pkg directory)
i=1
for tag in integration integration_admin integration_storage; do
echo "Running integration tests with tag \"${tag}\""
echo ">>> Running integration tests with tag \"${tag}\""
for d in $(go list -tags=${tag} ./tests/... | grep -Ev "integ$|common$|vendor"); do
pflag=""
if test ${tag} = "integration_storage"; then
Expand All @@ -51,6 +51,8 @@ for tag in integration integration_admin integration_storage; do
# to have tests run serially.
pflag="-p 1"
fi

echo ">>> Running test: ${d}"
go test -tags=${tag},unsafe -race -count=4 -failfast ${pflag} "${d}"
go test -tags=${tag},unsafe -race \
-coverprofile="integ.$i.cov" ${pflag} \
Expand Down
2 changes: 2 additions & 0 deletions plugins/targetlocker/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func broker(clk clock.Clock, lockRequests, unlockRequests <-chan *request, done
}
now := clk.Now()
if l, ok := locks[t.ID]; ok {
logging.Debugf(req.ctx, "inmem locking %v, lock_owner=%v, wanted_owner=%v", t.ID, l.owner, req.owner)
// target has been locked before. are/were we the owner?
// if so, extend (even if previous lease has expired).
if l.owner == req.owner {
Expand Down Expand Up @@ -125,6 +126,7 @@ func broker(clk clock.Clock, lockRequests, unlockRequests <-chan *request, done
}
}
} else {
logging.Debugf(req.ctx, "inmem locking %v, wanted_owner=%v, lock was free", t.ID, req.owner)
if !req.requireLocked {
// target not locked and never been, create new lock
newLocks[t.ID] = lock{
Expand Down
2 changes: 1 addition & 1 deletion run_lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export GO111MODULE=on
# installing golangci-lint as recommended on the project page
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin latest
go mod download
golangci-lint run --disable typecheck --enable deadcode --enable varcheck --enable staticcheck --timeout 10m
golangci-lint run --disable typecheck --enable staticcheck --timeout 10m

# check license headers
# this needs to be run from the top level directory, because it uses
Expand Down
4 changes: 2 additions & 2 deletions run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ then
fi

codecov_env=`bash <(curl -s https://codecov.io/env)`
docker-compose -f $COMPOSE_FILE build $DATABASE contest
docker-compose -f $COMPOSE_FILE run \
docker compose -f $COMPOSE_FILE build $DATABASE contest
docker compose -f $COMPOSE_FILE run \
${codecov_env} -e "CI=${CI}" \
contest \
/go/src/github.com/linuxboot/contest/docker/contest/tests.sh \
153 changes: 145 additions & 8 deletions tests/integ/jobmanager/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/linuxboot/contest/pkg/jobmanager"
"github.com/linuxboot/contest/pkg/logging"
"github.com/linuxboot/contest/pkg/pluginregistry"
"github.com/linuxboot/contest/pkg/runner"
"github.com/linuxboot/contest/pkg/signaling"
"github.com/linuxboot/contest/pkg/signals"
"github.com/linuxboot/contest/pkg/storage"
Expand Down Expand Up @@ -486,7 +487,7 @@ func (suite *TestJobManagerSuite) TestPauseAndExit() {
}

func (suite *TestJobManagerSuite) testPauseAndResume(
pauseAfter time.Duration, lockedAfterPause bool, mutator func(jobID types.JobID),
waiter func(jobID types.JobID), lockedAfterPause bool, mutator func(jobID types.JobID),
finalState event.Name, lockedAfterResume bool) {
var jobID types.JobID

Expand All @@ -504,8 +505,7 @@ func (suite *TestJobManagerSuite) testPauseAndResume(
require.NoError(suite.T(), err)
require.Equal(suite.T(), 1, len(ev))

// Wait for second run to begin
time.Sleep(pauseAfter)
waiter(jobID)

// Signal pause to the manager.
logging.Infof(suite.jmCtx, "-> pausing")
Expand Down Expand Up @@ -576,12 +576,62 @@ func (suite *TestJobManagerSuite) testPauseAndResume(

func (suite *TestJobManagerSuite) TestPauseAndResumeDuringRun1() {
// When paused during a run, targets should remain locked.
suite.testPauseAndResume(250*time.Millisecond, true, nil, job.EventJobCompleted, false)
waiter := func(jobID types.JobID) {
// to pause during the first run, wait for 1 run start event
ev, err := pollForEvent(suite.eventManager, runner.EventRunStarted, jobID, 1*time.Second)
require.NoError(suite.T(), err)
require.Len(suite.T(), ev, 1)

var payload *runner.RunStartedPayload
err = json.Unmarshal(*ev[0].Payload, &payload)
require.NoError(suite.T(), err)

require.Equal(suite.T(), types.RunID(1), payload.RunID)
}
suite.testPauseAndResume(waiter, true, nil, job.EventJobCompleted, false)
}

func (suite *TestJobManagerSuite) TestPauseAndResumeBetweenRuns() {
// Pause between runs. Targets should not be locked in this case.
suite.testPauseAndResume(750*time.Millisecond, false, func(jobID types.JobID) {
waiter := func(jobID types.JobID) {
// to pause between the runs, wait for 1 run start event and 1 run report
ev, err := pollForEvent(suite.eventManager, runner.EventRunStarted, jobID, 1*time.Second)
require.NoError(suite.T(), err)
require.Len(suite.T(), ev, 1)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

done := make(chan interface{})
errs := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errs <- ctx.Err()
return
default:
}

jobReport, err := suite.jsm.GetJobReport(suite.jmCtx, jobID)
require.NoError(suite.T(), err)
if len(jobReport.RunReports) == 1 {
logging.Infof(suite.jmCtx, "successfully waited for 1 full run")
close(done)
return
}
time.Sleep(10 * time.Millisecond)
}
}()

select {
case <-errs:
require.Fail(suite.T(), "deadlocked on polling for 1 full run")
case <-done:
}
}

suite.testPauseAndResume(waiter, false, func(jobID types.JobID) {
// Report for the completed run must be persisted already.
jobReport, err := suite.jsm.GetJobReport(suite.jmCtx, jobID)
require.NoError(suite.T(), err)
Expand All @@ -592,18 +642,105 @@ func (suite *TestJobManagerSuite) TestPauseAndResumeBetweenRuns() {

func (suite *TestJobManagerSuite) TestPauseAndResumeDuringRun2() {
// When paused during a run, targets should remain locked.
suite.testPauseAndResume(1250*time.Millisecond, true, nil, job.EventJobCompleted, false)
waiter := func(jobID types.JobID) {
// to pause during the 2nd run, wait for 2 run start events
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

evs := make(chan []frameworkevent.Event, 1)
errs := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errs <- ctx.Err()
return
default:
}

ev, err := pollForEvent(suite.eventManager, runner.EventRunStarted, jobID, 1*time.Second)
require.NoError(suite.T(), err)

// wait for the second run to start
if len(ev) == 2 {
logging.Infof(suite.jmCtx, "successfully waited for 2 run start events")
evs <- ev
return
}
}
}()

select {
case <-errs:
require.Fail(suite.T(), "deadlocked on polling for 2 run events")
case ev := <-evs:
var payload *runner.RunStartedPayload

for runID := 1; runID <= 2; runID++ {
err := json.Unmarshal(*ev[runID-1].Payload, &payload)
require.NoError(suite.T(), err)

require.Equal(suite.T(), types.RunID(runID), payload.RunID)
}
}
}

suite.testPauseAndResume(waiter, true, nil, job.EventJobCompleted, false)
}

func (suite *TestJobManagerSuite) TestPauseAndFailToResume() {
waiter := func(jobID types.JobID) {
// to pause during the 2nd run, wait for 2 run start events
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

evs := make(chan []frameworkevent.Event, 1)
errs := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errs <- ctx.Err()
return
default:
}

ev, err := pollForEvent(suite.eventManager, runner.EventRunStarted, jobID, 1*time.Second)
require.NoError(suite.T(), err)

// wait for the second run to start
if len(ev) == 2 {
logging.Infof(suite.jmCtx, "successfully waited for 2 run start events")
evs <- ev
return
}
}
}()

select {
case <-errs:
require.Fail(suite.T(), "deadlocked on polling for 2 run events")
case ev := <-evs:
var payload *runner.RunStartedPayload

for runID := 1; runID <= 2; runID++ {
err := json.Unmarshal(*ev[runID-1].Payload, &payload)
require.NoError(suite.T(), err)

require.Equal(suite.T(), types.RunID(runID), payload.RunID)
}
}
}

v := job.CurrentPauseEventPayloadVersion
defer func() { job.CurrentPauseEventPayloadVersion = v }()
suite.testPauseAndResume(1250*time.Millisecond, true, func(_ types.JobID) {
suite.testPauseAndResume(waiter, true, func(_ types.JobID) {
job.CurrentPauseEventPayloadVersion = -1 // Resume will fail due to incompatible version
}, job.EventJobFailed,
// Targets remain locked because we were unable to deserialize the state.
// Unfortunately, there is nothing we can do.
true)
true,
)
}

func (suite *TestJobManagerSuite) TestCleanup() {
Expand Down

0 comments on commit 1aa19ab

Please sign in to comment.