Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Refactor command/container/run.go #5693

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
181 changes: 97 additions & 84 deletions cli/command/container/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"os"
"strings"
"syscall"

Expand Down Expand Up @@ -116,12 +117,8 @@ func runRun(ctx context.Context, dockerCli command.Cli, flags *pflag.FlagSet, ro
return runContainer(ctx, dockerCli, ropts, copts, containerCfg)
}

//nolint:gocyclo
func runContainer(ctx context.Context, dockerCli command.Cli, runOpts *runOptions, copts *containerOptions, containerCfg *containerConfig) error {
config := containerCfg.Config
stdout, stderr := dockerCli.Out(), dockerCli.Err()
apiClient := dockerCli.Client()

config.ArgsEscaped = false

if !runOpts.detach {
Expand All @@ -143,123 +140,141 @@ func runContainer(ctx context.Context, dockerCli command.Cli, runOpts *runOption
if err != nil {
return toStatusError(err)
}
if runOpts.sigProxy {
sigc := notifyAllSignals()
// since we're explicitly setting up signal handling here, and the daemon will
// get notified independently of the clients ctx cancellation, we use this context
// but without cancellation to avoid ForwardAllSignals from returning
// before all signals are forwarded.
bgCtx := context.WithoutCancel(ctx)
go ForwardAllSignals(bgCtx, apiClient, containerID, sigc)
defer signal.StopCatch(sigc)
}

ctx, cancelFun := context.WithCancel(context.WithoutCancel(ctx))
defer cancelFun()
apiClient := dockerCli.Client()

var (
waitDisplayID chan struct{}
errCh chan error
)
// New context here because we don't to cancel waiting on container exit/remove
// when we cancel attach, etc.
statusCtx, cancelStatusCtx := context.WithCancel(context.WithoutCancel(ctx))
defer cancelStatusCtx()
statusChan := waitExitOrRemoved(statusCtx, apiClient, containerID, copts.autoRemove)

var waitDisplayID chan struct{}
attach := config.AttachStdin || config.AttachStdout || config.AttachStderr
if !attach {
// Make this asynchronous to allow the client to write to stdin before having to read the ID
waitDisplayID = make(chan struct{})
go func() {
defer close(waitDisplayID)
_, _ = fmt.Fprintln(stdout, containerID)
_, _ = fmt.Fprintln(dockerCli.Out(), containerID)
}()
}
if attach {
detachKeys := dockerCli.ConfigFile().DetachKeys
if runOpts.detachKeys != "" {
detachKeys = runOpts.detachKeys
}

// ctx should not be cancellable here, as this would kill the stream to the container
// and we want to keep the stream open until the process in the container exits or until
// the user forcefully terminates the CLI.
closeFn, err := attachContainer(ctx, dockerCli, containerID, &errCh, config, container.AttachOptions{
Stream: true,
Stdin: config.AttachStdin,
Stdout: config.AttachStdout,
Stderr: config.AttachStderr,
DetachKeys: detachKeys,
})
var attachWait func(<-chan int, error) error
if attach {
attachWait, err = setupContainerAttach(ctx, dockerCli, containerID, runOpts, config)
if err != nil {
return err
}
defer closeFn()
}

// New context here because we don't to cancel waiting on container exit/remove
// when we cancel attach, etc.
statusCtx, cancelStatusCtx := context.WithCancel(context.WithoutCancel(ctx))
defer cancelStatusCtx()
statusChan := waitExitOrRemoved(statusCtx, apiClient, containerID, copts.autoRemove)

// start the container
if err := apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
// If we have hijackedIOStreamer, we should notify
// hijackedIOStreamer we are going to exit and wait
// to avoid the terminal are not restored.
err = apiClient.ContainerStart(ctx, containerID, container.StartOptions{})
if err != nil {
if attach {
cancelFun()
<-errCh
attachWait(statusChan, err)
}

if copts.autoRemove {
// wait container to be removed
<-statusChan
}
return toStatusError(err)
}

if attach {
return attachWait(statusChan, nil)
}

// Detached mode: wait for the id to be displayed and return.
if !attach {
// Detached mode
<-waitDisplayID
return nil
<-waitDisplayID
return nil
}

func setupContainerAttach(ctx context.Context, dockerCli command.Cli, containerID string, runOpts *runOptions, config *container.Config) (func(<-chan int, error) error, error) {
detachKeys := dockerCli.ConfigFile().DetachKeys
if runOpts.detachKeys != "" {
detachKeys = runOpts.detachKeys
}

if config.Tty && dockerCli.Out().IsTerminal() {
if err := MonitorTtySize(ctx, dockerCli, containerID, false); err != nil {
_, _ = fmt.Fprintln(stderr, "Error monitoring TTY size:", err)
}
// ctx should not be cancellable here, as this would kill the stream to the container
// and we want to keep the stream open until the process in the container exits or until
// the user forcefully terminates the CLI.
attachCtx, attachCancel := context.WithCancel(context.WithoutCancel(ctx))
errCh, closeFn, err := attachContainer(attachCtx, dockerCli, containerID, config, container.AttachOptions{
Stream: true,
Stdin: config.AttachStdin,
Stdout: config.AttachStdout,
Stderr: config.AttachStderr,
DetachKeys: detachKeys,
})
if err != nil {
attachCancel()
return nil, err
}

select {
case err := <-errCh:
if err != nil {
if _, ok := err.(term.EscapeError); ok {
// The user entered the detach escape sequence.
return nil
}
var sigc chan os.Signal
if runOpts.sigProxy {
sigc = notifyAllSignals()
// since we're explicitly setting up signal handling here, and the daemon will
// get notified independently of the clients ctx cancellation, we use this context
// but without cancellation to avoid ForwardAllSignals from returning
// before all signals are forwarded.
bgCtx := context.WithoutCancel(ctx)
go ForwardAllSignals(bgCtx, dockerCli.Client(), containerID, sigc)
}

logrus.Debugf("Error hijack: %s", err)
return err
return func(statusC <-chan int, err error) error {
defer closeFn()
if runOpts.sigProxy {
defer signal.StopCatch(sigc)
}
status := <-statusChan
if status != 0 {
return cli.StatusError{StatusCode: status}

// if the container failed to start, just cancel the streamer
// and wait for the terminal to be restored
if err != nil {
attachCancel()
<-errCh
return nil
}
case status := <-statusChan:
// notify hijackedIOStreamer that we're exiting and wait
// so that the terminal can be restored.
cancelFun()
<-errCh
if status != 0 {
return cli.StatusError{StatusCode: status}

if config.Tty && dockerCli.Out().IsTerminal() {
if err := MonitorTtySize(attachCtx, dockerCli, containerID, false); err != nil {
_, _ = fmt.Fprintln(dockerCli.Err(), "Error monitoring TTY size:", err)
}
}
}

return nil
select {
case err := <-errCh:
if err != nil {
if _, ok := err.(term.EscapeError); ok {
// The user entered the detach escape sequence.
return nil
}

logrus.Debugf("Error hijack: %s", err)
return err
}
status := <-statusC
if status != 0 {
return cli.StatusError{StatusCode: status}
}
case status := <-statusC:
// notify hijackedIOStreamer that we're exiting and wait
// so that the terminal can be restored.
attachCancel()
<-errCh
if status != 0 {
return cli.StatusError{StatusCode: status}
}
}
return nil
}, nil
}

func attachContainer(ctx context.Context, dockerCli command.Cli, containerID string, errCh *chan error, config *container.Config, options container.AttachOptions) (func(), error) {
func attachContainer(ctx context.Context, dockerCli command.Cli, containerID string, config *container.Config, options container.AttachOptions) (chan error, func(), error) {
resp, errAttach := dockerCli.Client().ContainerAttach(ctx, containerID, options)
if errAttach != nil {
return nil, errAttach
return nil, nil, errAttach
}

var (
Expand All @@ -281,8 +296,6 @@ func attachContainer(ctx context.Context, dockerCli command.Cli, containerID str
}

ch := make(chan error, 1)
*errCh = ch

go func() {
ch <- func() error {
streamer := hijackedIOStreamer{
Expand All @@ -301,7 +314,7 @@ func attachContainer(ctx context.Context, dockerCli command.Cli, containerID str
return errAttach
}()
}()
return resp.Close, nil
return ch, resp.Close, nil
}

// withHelp decorates the error with a suggestion to use "--help".
Expand Down
29 changes: 24 additions & 5 deletions cli/command/container/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func TestRunAttachTermination(t *testing.T) {
var conn net.Conn
killCh := make(chan struct{})
attachCh := make(chan struct{})
startCh := make(chan struct{})
containerExitC := make(chan struct{})
fakeCLI := test.NewFakeCli(&fakeClient{
createContainerFunc: func(_ *container.Config, _ *container.HostConfig, _ *network.NetworkingConfig, _ *specs.Platform, _ string) (container.CreateResponse, error) {
return container.CreateResponse{
Expand All @@ -158,6 +160,7 @@ func TestRunAttachTermination(t *testing.T) {
},
containerKillFunc: func(ctx context.Context, containerID, signal string) error {
killCh <- struct{}{}
containerExitC <- struct{}{}
return nil
},
containerAttachFunc: func(ctx context.Context, containerID string, options container.AttachOptions) (types.HijackedResponse, error) {
Expand All @@ -173,11 +176,19 @@ func TestRunAttachTermination(t *testing.T) {
responseChan := make(chan container.WaitResponse, 1)
errChan := make(chan error)

responseChan <- container.WaitResponse{
StatusCode: 130,
}
go func() {
<-containerExitC
responseChan <- container.WaitResponse{
StatusCode: 130,
}
}()
return responseChan, errChan
},
containerStartFunc: func(containerID string, options container.StartOptions) error {
startCh <- struct{}{}
return nil
},

// use new (non-legacy) wait API
// see: 38591f20d07795aaef45d400df89ca12f29c603b
Version: "1.30",
Expand All @@ -201,16 +212,24 @@ func TestRunAttachTermination(t *testing.T) {
case <-attachCh:
}

// run command should attempt to start the container
select {
case <-time.After(5 * time.Second):
t.Fatal("containerStartCh was not called before the timeout")
case <-startCh:
}

assert.NilError(t, syscall.Kill(syscall.Getpid(), syscall.SIGINT))
// end stream from "container" so that we'll detach
conn.Close()

select {
case <-killCh:
case <-time.After(5 * time.Second):
t.Fatal("containerKillFunc was not called before the timeout")
}

// end stream from "container" so that we'll detach
conn.Close()

select {
case cmdErr := <-cmdErrC:
assert.Equal(t, cmdErr, cli.StatusError{
Expand Down
Loading