Skip to content

Commit

Permalink
Merge pull request containerd#9768 from fuweid/16-drain-exec-io
Browse files Browse the repository at this point in the history
[release/1.6] pkg/cri: add timeout to drain exec io
  • Loading branch information
mikebrow authored Feb 16, 2024
2 parents 087f0de + aac4887 commit 6fdb617
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 7 deletions.
10 changes: 10 additions & 0 deletions docs/cri/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ version = 2
# Note that currently default is set to disabled but target change it in future together with enable_unprivileged_ports
enable_unprivileged_icmp = false

# drain_exec_sync_io_timeout is the maximum duration to wait for ExecSync API'
# IO EOF event after exec init process exits. A zero value means there is no
# timeout.
#
# The string is in the golang duration format, see:
# https://golang.org/pkg/time/#ParseDuration
#
# For example, the value can be '5h', '2h30m', '10s'.
drain_exec_sync_io_timeout = "0s"

# 'plugins."io.containerd.grpc.v1.cri".containerd' contains config related to containerd
[plugins."io.containerd.grpc.v1.cri".containerd]

Expand Down
70 changes: 70 additions & 0 deletions integration/container_exec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestContainerDrainExecIOAfterExit(t *testing.T) {
// FIXME(fuweid): support it for windows container.
if runtime.GOOS == "windows" {
t.Skip("it seems that windows platform doesn't support detached process. skip it")
}

t.Log("Create a sandbox")
sb, sbConfig := PodSandboxConfigWithCleanup(t, "sandbox", "container-exec-drain-io-after-exit")

var (
testImage = GetImage(BusyBox)
containerName = "test-container-exec"
)

EnsureImageExists(t, testImage)

t.Log("Create a container")
cnConfig := ContainerConfig(
containerName,
testImage,
WithCommand("sh", "-c", "sleep 365d"),
)

cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.RemoveContainer(cn))
}()

t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))
defer func() {
assert.NoError(t, runtimeService.StopContainer(cn, 10))
}()

t.Log("Exec in container")
_, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 365d &"}, 5*time.Second)
require.ErrorContains(t, err, "failed to drain exec process")

t.Log("Exec in container")
_, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 2s &"}, 10*time.Second)
require.NoError(t, err, "should drain IO in time")
}
16 changes: 16 additions & 0 deletions pkg/cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,15 @@ type PluginConfig struct {
// IgnoreDeprecationWarnings is the list of the deprecation IDs (such as "io.containerd.deprecation/pull-schema-1-image")
// that should be ignored for checking "ContainerdHasNoDeprecationWarnings" condition.
IgnoreDeprecationWarnings []string `toml:"ignore_deprecation_warnings" json:"ignoreDeprecationWarnings"`
// DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync
// API' IO EOF event after exec init process exits. A zero value means
// there is no timeout.
//
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
//
// For example, the value can be '5h', '2h30m', '10s'.
DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"`
}

// X509KeyPairStreaming contains the x509 configuration for streaming
Expand Down Expand Up @@ -467,5 +476,12 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W
return warnings, fmt.Errorf("invalid stream idle timeout: %w", err)
}
}

// Validation for drain_exec_sync_io_timeout
if c.DrainExecSyncIOTimeout != "" {
if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil {
return warnings, fmt.Errorf("invalid `drain_exec_sync_io_timeout`: %w", err)
}
}
return warnings, nil
}
14 changes: 14 additions & 0 deletions pkg/cri/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,20 @@ func TestValidateConfig(t *testing.T) {
},
warnings: []deprecation.Warning{deprecation.CRICRIUPath},
},
"invalid drain_exec_sync_io_timeout input": {
config: &PluginConfig{
ContainerdConfig: ContainerdConfig{
DefaultRuntimeName: RuntimeDefault,
Runtimes: map[string]Runtime{
RuntimeDefault: {
Type: "default",
},
},
},
DrainExecSyncIOTimeout: "10",
},
expectedErr: "invalid `drain_exec_sync_io_timeout`",
},
} {
t.Run(desc, func(t *testing.T) {
w, err := ValidatePluginConfig(context.Background(), test.config)
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/config/config_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ func DefaultConfig() PluginConfig {
ImageDecryption: ImageDecryption{
KeyModel: KeyModelNode,
},
DrainExecSyncIOTimeout: "0s",
}
}
1 change: 1 addition & 0 deletions pkg/cri/config/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ func DefaultConfig() PluginConfig {
ImageDecryption: ImageDecryption{
KeyModel: KeyModelNode,
},
DrainExecSyncIOTimeout: "0s",
}
}
67 changes: 62 additions & 5 deletions pkg/cri/server/container_execsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var drainExecSyncIOTimeout time.Duration
var err error

if c.config.DrainExecSyncIOTimeout != "" {
drainExecSyncIOTimeout, err = time.ParseDuration(c.config.DrainExecSyncIOTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w",
c.config.DrainExecSyncIOTimeout, err)
}
}

spec, err := container.Spec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get container spec: %w", err)
Expand Down Expand Up @@ -162,7 +173,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
defer func() {
deferCtx, deferCancel := ctrdutil.DeferContext()
defer deferCancel()
if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil {
if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("Failed to delete exec process %q for container %q", execID, id)
}
}()
Expand Down Expand Up @@ -209,17 +220,22 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
exitRes := <-exitCh
log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v",
execID, exitRes.ExitCode(), exitRes.Error())
<-attachDone
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)

if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID)
}

return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err())
case exitRes := <-exitCh:
code, _, err := exitRes.Result()
log.G(ctx).Debugf("Exec process %q exits with exit code %d and error %v", execID, code, err)
if err != nil {
return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err)
}
<-attachDone
log.G(ctx).Debugf("Stream pipe for exec process %q done", execID)

if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil {
return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err)
}
return &code, nil
}
}
Expand Down Expand Up @@ -252,3 +268,44 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp

return c.execInternal(ctx, cntr.Container, id, opts)
}

// drainExecSyncIO drains process IO with timeout after exec init process exits.
//
// By default, the child processes spawned by exec process will inherit standard
// io file descriptors. The shim server creates a pipe as data channel. Both
// exec process and its children write data into the write end of the pipe.
// And the shim server will read data from the pipe. If the write end is still
// open, the shim server will continue to wait for data from pipe.
//
// If the exec command is like `bash -c "sleep 365d &"`, the exec process
// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold
// the write end of the pipe for a year! It doesn't make senses that CRI plugin
// should wait for it.
func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error {
var timerCh <-chan time.Time

if drainExecIOTimeout != 0 {
timer := time.NewTimer(drainExecIOTimeout)
defer timer.Stop()

timerCh = timer.C
}

select {
case <-timerCh:
case <-attachDone:
log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID())
return nil
}

log.G(ctx).Debugf("Exec process %q exits but the io is still held by other processes. Trying to delete exec process to release io", execProcess.ID())
_, err := execProcess.Delete(ctx, containerd.WithProcessKill)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to release exec io by deleting exec process %q: %w",
execProcess.ID(), err)
}
}
return fmt.Errorf("failed to drain exec process %q io in %s because io is still held by other processes",
execProcess.ID(), drainExecIOTimeout)
}
98 changes: 98 additions & 0 deletions pkg/cri/server/container_execsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ package server

import (
"bytes"
"context"
"os"
"syscall"
"testing"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
cioutil "github.com/containerd/containerd/pkg/ioutil"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) {
err := cw.Close()
assert.NoError(t, err)
}

func TestDrainExecSyncIO(t *testing.T) {
ctx := context.TODO()

t.Run("NoTimeout", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}

attachDoneCh := make(chan struct{})
time.AfterFunc(2*time.Second, func() { close(attachDoneCh) })
assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh))
assert.Equal(t, 0, len(ep.actionEvents))
})

t.Run("With3Seconds", func(t *testing.T) {
ep := &fakeExecProcess{
id: t.Name(),
pid: uint32(os.Getpid()),
}

attachDoneCh := make(chan struct{})
time.AfterFunc(100*time.Second, func() { close(attachDoneCh) })
assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh))
assert.Equal(t, []string{"Delete"}, ep.actionEvents)
})
}

type fakeExecProcess struct {
id string
pid uint32
actionEvents []string
}

// ID of the process
func (p *fakeExecProcess) ID() string {
return p.id
}

// Pid is the system specific process id
func (p *fakeExecProcess) Pid() uint32 {
return p.pid
}

// Start starts the process executing the user's defined binary
func (p *fakeExecProcess) Start(context.Context) error {
p.actionEvents = append(p.actionEvents, "Start")
return nil
}

// Delete removes the process and any resources allocated returning the exit status
func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Delete")
return nil, nil
}

// Kill sends the provided signal to the process
func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error {
p.actionEvents = append(p.actionEvents, "Kill")
return nil
}

// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel
func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) {
p.actionEvents = append(p.actionEvents, "Wait")
return nil, nil
}

// CloseIO allows various pipes to be closed on the process
func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error {
p.actionEvents = append(p.actionEvents, "CloseIO")
return nil
}

// Resize changes the width and height of the process's terminal
func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error {
p.actionEvents = append(p.actionEvents, "Resize")
return nil
}

// IO returns the io set for the process
func (p *fakeExecProcess) IO() cio.IO {
p.actionEvents = append(p.actionEvents, "IO")
return nil
}

// Status returns the executing status of the process
func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) {
p.actionEvents = append(p.actionEvents, "Status")
return containerd.Status{}, nil
}
16 changes: 14 additions & 2 deletions script/test/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,23 @@ CONTAINERD_RUNTIME=${CONTAINERD_RUNTIME:-""}
if [ -z "${CONTAINERD_CONFIG_FILE}" ]; then
config_file="${CONTAINERD_CONFIG_DIR}/containerd-config-cri.toml"
truncate --size 0 "${config_file}"
echo "version=2" >> ${config_file}
# TODO(fuweid): if the config.Imports supports patch update, it will be easy
# to write the integration test case with different configuration, like:
#
# 1. write configuration into importable containerd config path.
# 2. restart containerd
# 3. verify the behaviour
# 4. delete the configuration
# 5. restart containerd
cat >>${config_file} <<EOF
version=2
[plugins."io.containerd.grpc.v1.cri"]
drain_exec_sync_io_timeout = "10s"
EOF

if command -v sestatus >/dev/null 2>&1; then
cat >>${config_file} <<EOF
[plugins."io.containerd.grpc.v1.cri"]
enable_selinux = true
EOF
fi
Expand Down

0 comments on commit 6fdb617

Please sign in to comment.