diff --git a/docs/cri/config.md b/docs/cri/config.md index 4527f93ec898..9a98ba19bc35 100644 --- a/docs/cri/config.md +++ b/docs/cri/config.md @@ -120,6 +120,16 @@ version = 2 # Note that currently default is set to disabled but target change it in future together with enable_unprivileged_ports enable_unprivileged_icmp = false + # drain_exec_sync_io_timeout is the maximum duration to wait for ExecSync API' + # IO EOF event after exec init process exits. A zero value means there is no + # timeout. + # + # The string is in the golang duration format, see: + # https://golang.org/pkg/time/#ParseDuration + # + # For example, the value can be '5h', '2h30m', '10s'. + drain_exec_sync_io_timeout = "0s" + # 'plugins."io.containerd.grpc.v1.cri".containerd' contains config related to containerd [plugins."io.containerd.grpc.v1.cri".containerd] diff --git a/integration/container_exec_test.go b/integration/container_exec_test.go new file mode 100644 index 000000000000..e98ac6caaf34 --- /dev/null +++ b/integration/container_exec_test.go @@ -0,0 +1,70 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package integration + +import ( + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestContainerDrainExecIOAfterExit(t *testing.T) { + // FIXME(fuweid): support it for windows container. + if runtime.GOOS == "windows" { + t.Skip("it seems that windows platform doesn't support detached process. skip it") + } + + t.Log("Create a sandbox") + sb, sbConfig := PodSandboxConfigWithCleanup(t, "sandbox", "container-exec-drain-io-after-exit") + + var ( + testImage = GetImage(BusyBox) + containerName = "test-container-exec" + ) + + EnsureImageExists(t, testImage) + + t.Log("Create a container") + cnConfig := ContainerConfig( + containerName, + testImage, + WithCommand("sh", "-c", "sleep 365d"), + ) + + cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + + t.Log("Start the container") + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + + t.Log("Exec in container") + _, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 365d &"}, 5*time.Second) + require.ErrorContains(t, err, "failed to drain exec process") + + t.Log("Exec in container") + _, _, err = runtimeService.ExecSync(cn, []string{"sh", "-c", "sleep 2s &"}, 10*time.Second) + require.NoError(t, err, "should drain IO in time") +} diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index f024753fabbb..cb2c275c2ac4 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -306,6 +306,15 @@ type PluginConfig struct { // IgnoreDeprecationWarnings is the list of the deprecation IDs (such as "io.containerd.deprecation/pull-schema-1-image") // that should be ignored for checking "ContainerdHasNoDeprecationWarnings" condition. IgnoreDeprecationWarnings []string `toml:"ignore_deprecation_warnings" json:"ignoreDeprecationWarnings"` + // DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync + // API' IO EOF event after exec init process exits. A zero value means + // there is no timeout. + // + // The string is in the golang duration format, see: + // https://golang.org/pkg/time/#ParseDuration + // + // For example, the value can be '5h', '2h30m', '10s'. + DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"` } // X509KeyPairStreaming contains the x509 configuration for streaming @@ -467,5 +476,12 @@ func ValidatePluginConfig(ctx context.Context, c *PluginConfig) ([]deprecation.W return warnings, fmt.Errorf("invalid stream idle timeout: %w", err) } } + + // Validation for drain_exec_sync_io_timeout + if c.DrainExecSyncIOTimeout != "" { + if _, err := time.ParseDuration(c.DrainExecSyncIOTimeout); err != nil { + return warnings, fmt.Errorf("invalid `drain_exec_sync_io_timeout`: %w", err) + } + } return warnings, nil } diff --git a/pkg/cri/config/config_test.go b/pkg/cri/config/config_test.go index 8a4f3278a015..427597745d10 100644 --- a/pkg/cri/config/config_test.go +++ b/pkg/cri/config/config_test.go @@ -462,6 +462,20 @@ func TestValidateConfig(t *testing.T) { }, warnings: []deprecation.Warning{deprecation.CRICRIUPath}, }, + "invalid drain_exec_sync_io_timeout input": { + config: &PluginConfig{ + ContainerdConfig: ContainerdConfig{ + DefaultRuntimeName: RuntimeDefault, + Runtimes: map[string]Runtime{ + RuntimeDefault: { + Type: "default", + }, + }, + }, + DrainExecSyncIOTimeout: "10", + }, + expectedErr: "invalid `drain_exec_sync_io_timeout`", + }, } { t.Run(desc, func(t *testing.T) { w, err := ValidatePluginConfig(context.Background(), test.config) diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index f22e0572f1c7..b713c84e6228 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -104,5 +104,6 @@ func DefaultConfig() PluginConfig { ImageDecryption: ImageDecryption{ KeyModel: KeyModelNode, }, + DrainExecSyncIOTimeout: "0s", } } diff --git a/pkg/cri/config/config_windows.go b/pkg/cri/config/config_windows.go index c44bac913246..992598e101cc 100644 --- a/pkg/cri/config/config_windows.go +++ b/pkg/cri/config/config_windows.go @@ -62,5 +62,6 @@ func DefaultConfig() PluginConfig { ImageDecryption: ImageDecryption{ KeyModel: KeyModelNode, }, + DrainExecSyncIOTimeout: "0s", } } diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index 68cfb0900fda..4323681d5de4 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -118,6 +118,17 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont ctx, cancel := context.WithCancel(ctx) defer cancel() + var drainExecSyncIOTimeout time.Duration + var err error + + if c.config.DrainExecSyncIOTimeout != "" { + drainExecSyncIOTimeout, err = time.ParseDuration(c.config.DrainExecSyncIOTimeout) + if err != nil { + return nil, fmt.Errorf("failed to parse drain_exec_sync_io_timeout %q: %w", + c.config.DrainExecSyncIOTimeout, err) + } + } + spec, err := container.Spec(ctx) if err != nil { return nil, fmt.Errorf("failed to get container spec: %w", err) @@ -162,7 +173,7 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont defer func() { deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() - if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil { + if _, err := process.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete exec process %q for container %q", execID, id) } }() @@ -209,8 +220,11 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont exitRes := <-exitCh log.G(ctx).Debugf("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { + log.G(ctx).WithError(err).Warnf("failed to drain exec process %q io", execID) + } + return nil, fmt.Errorf("timeout %v exceeded: %w", opts.timeout, execCtx.Err()) case exitRes := <-exitCh: code, _, err := exitRes.Result() @@ -218,8 +232,10 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont if err != nil { return nil, fmt.Errorf("failed while waiting for exec %q: %w", execID, err) } - <-attachDone - log.G(ctx).Debugf("Stream pipe for exec process %q done", execID) + + if err := drainExecSyncIO(ctx, process, drainExecSyncIOTimeout, attachDone); err != nil { + return nil, fmt.Errorf("failed to drain exec process %q io: %w", execID, err) + } return &code, nil } } @@ -252,3 +268,44 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return c.execInternal(ctx, cntr.Container, id, opts) } + +// drainExecSyncIO drains process IO with timeout after exec init process exits. +// +// By default, the child processes spawned by exec process will inherit standard +// io file descriptors. The shim server creates a pipe as data channel. Both +// exec process and its children write data into the write end of the pipe. +// And the shim server will read data from the pipe. If the write end is still +// open, the shim server will continue to wait for data from pipe. +// +// If the exec command is like `bash -c "sleep 365d &"`, the exec process +// is bash and quit after create `sleep 365d`. But the `sleep 365d` will hold +// the write end of the pipe for a year! It doesn't make senses that CRI plugin +// should wait for it. +func drainExecSyncIO(ctx context.Context, execProcess containerd.Process, drainExecIOTimeout time.Duration, attachDone <-chan struct{}) error { + var timerCh <-chan time.Time + + if drainExecIOTimeout != 0 { + timer := time.NewTimer(drainExecIOTimeout) + defer timer.Stop() + + timerCh = timer.C + } + + select { + case <-timerCh: + case <-attachDone: + log.G(ctx).Debugf("Stream pipe for exec process %q done", execProcess.ID()) + return nil + } + + log.G(ctx).Debugf("Exec process %q exits but the io is still held by other processes. Trying to delete exec process to release io", execProcess.ID()) + _, err := execProcess.Delete(ctx, containerd.WithProcessKill) + if err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("failed to release exec io by deleting exec process %q: %w", + execProcess.ID(), err) + } + } + return fmt.Errorf("failed to drain exec process %q io in %s because io is still held by other processes", + execProcess.ID(), drainExecIOTimeout) +} diff --git a/pkg/cri/server/container_execsync_test.go b/pkg/cri/server/container_execsync_test.go index 98990964963d..48f72abef50c 100644 --- a/pkg/cri/server/container_execsync_test.go +++ b/pkg/cri/server/container_execsync_test.go @@ -18,8 +18,14 @@ package server import ( "bytes" + "context" + "os" + "syscall" "testing" + "time" + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" cioutil "github.com/containerd/containerd/pkg/ioutil" "github.com/stretchr/testify/assert" ) @@ -50,3 +56,95 @@ func TestCWClose(t *testing.T) { err := cw.Close() assert.NoError(t, err) } + +func TestDrainExecSyncIO(t *testing.T) { + ctx := context.TODO() + + t.Run("NoTimeout", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(2*time.Second, func() { close(attachDoneCh) }) + assert.NoError(t, drainExecSyncIO(ctx, ep, 0, attachDoneCh)) + assert.Equal(t, 0, len(ep.actionEvents)) + }) + + t.Run("With3Seconds", func(t *testing.T) { + ep := &fakeExecProcess{ + id: t.Name(), + pid: uint32(os.Getpid()), + } + + attachDoneCh := make(chan struct{}) + time.AfterFunc(100*time.Second, func() { close(attachDoneCh) }) + assert.Error(t, drainExecSyncIO(ctx, ep, 3*time.Second, attachDoneCh)) + assert.Equal(t, []string{"Delete"}, ep.actionEvents) + }) +} + +type fakeExecProcess struct { + id string + pid uint32 + actionEvents []string +} + +// ID of the process +func (p *fakeExecProcess) ID() string { + return p.id +} + +// Pid is the system specific process id +func (p *fakeExecProcess) Pid() uint32 { + return p.pid +} + +// Start starts the process executing the user's defined binary +func (p *fakeExecProcess) Start(context.Context) error { + p.actionEvents = append(p.actionEvents, "Start") + return nil +} + +// Delete removes the process and any resources allocated returning the exit status +func (p *fakeExecProcess) Delete(context.Context, ...containerd.ProcessDeleteOpts) (*containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Delete") + return nil, nil +} + +// Kill sends the provided signal to the process +func (p *fakeExecProcess) Kill(context.Context, syscall.Signal, ...containerd.KillOpts) error { + p.actionEvents = append(p.actionEvents, "Kill") + return nil +} + +// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel +func (p *fakeExecProcess) Wait(context.Context) (<-chan containerd.ExitStatus, error) { + p.actionEvents = append(p.actionEvents, "Wait") + return nil, nil +} + +// CloseIO allows various pipes to be closed on the process +func (p *fakeExecProcess) CloseIO(context.Context, ...containerd.IOCloserOpts) error { + p.actionEvents = append(p.actionEvents, "CloseIO") + return nil +} + +// Resize changes the width and height of the process's terminal +func (p *fakeExecProcess) Resize(ctx context.Context, w, h uint32) error { + p.actionEvents = append(p.actionEvents, "Resize") + return nil +} + +// IO returns the io set for the process +func (p *fakeExecProcess) IO() cio.IO { + p.actionEvents = append(p.actionEvents, "IO") + return nil +} + +// Status returns the executing status of the process +func (p *fakeExecProcess) Status(context.Context) (containerd.Status, error) { + p.actionEvents = append(p.actionEvents, "Status") + return containerd.Status{}, nil +} diff --git a/script/test/utils.sh b/script/test/utils.sh index a470e15b68fb..572eb38cfc94 100755 --- a/script/test/utils.sh +++ b/script/test/utils.sh @@ -35,11 +35,23 @@ CONTAINERD_RUNTIME=${CONTAINERD_RUNTIME:-""} if [ -z "${CONTAINERD_CONFIG_FILE}" ]; then config_file="${CONTAINERD_CONFIG_DIR}/containerd-config-cri.toml" truncate --size 0 "${config_file}" - echo "version=2" >> ${config_file} + # TODO(fuweid): if the config.Imports supports patch update, it will be easy + # to write the integration test case with different configuration, like: + # + # 1. write configuration into importable containerd config path. + # 2. restart containerd + # 3. verify the behaviour + # 4. delete the configuration + # 5. restart containerd + cat >>${config_file} </dev/null 2>&1; then cat >>${config_file} <