mirror of
https://github.com/opentofu/opentofu.git
synced 2024-12-28 18:01:01 -06:00
Fix cancellation when spawning a subprocess
If the shell spawns a subprocess which doesn't close the output file descriptors, the exec.Cmd will block on Wait() (see golang.org/issue/18874). Use an os.Pipe to provide the command with a real file descriptor so the exec package doesn't need to do the copy manually. This in turn may block our own reading goroutine, but we can select on that and leave it for cleanup later.
This commit is contained in:
parent
e0325d9b8f
commit
ff2936bb3f
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
|
||||
@ -52,18 +53,28 @@ func applyFn(ctx context.Context) error {
|
||||
flag = "-c"
|
||||
}
|
||||
|
||||
// Setup the reader that will read the lines from the command
|
||||
pr, pw := io.Pipe()
|
||||
copyDoneCh := make(chan struct{})
|
||||
go copyOutput(o, pr, copyDoneCh)
|
||||
// Setup the reader that will read the output from the command.
|
||||
// We use an os.Pipe so that the *os.File can be passed directly to the
|
||||
// process, and not rely on goroutines copying the data which may block.
|
||||
// See golang.org/issue/18874
|
||||
pr, pw, err := os.Pipe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize pipe for output: %s", err)
|
||||
}
|
||||
|
||||
// Setup the command
|
||||
cmd := exec.Command(shell, flag, command)
|
||||
// TODO: use exec.CommandContext when cancelation is fixed in Go
|
||||
cmd := exec.CommandContext(ctx, shell, flag, command)
|
||||
cmd.Stderr = pw
|
||||
cmd.Stdout = pw
|
||||
|
||||
output, _ := circbuf.NewBuffer(maxBufSize)
|
||||
cmd.Stderr = io.MultiWriter(output, pw)
|
||||
cmd.Stdout = io.MultiWriter(output, pw)
|
||||
|
||||
// Write everything we read from the pipe to the output buffer too
|
||||
tee := io.TeeReader(pr, output)
|
||||
|
||||
// copy the teed output to the UI output
|
||||
copyDoneCh := make(chan struct{})
|
||||
go copyOutput(o, tee, copyDoneCh)
|
||||
|
||||
// Output what we're about to run
|
||||
o.Output(fmt.Sprintf(
|
||||
@ -71,27 +82,22 @@ func applyFn(ctx context.Context) error {
|
||||
shell, flag, command))
|
||||
|
||||
// Start the command
|
||||
err := cmd.Start()
|
||||
err = cmd.Start()
|
||||
if err == nil {
|
||||
// Wait for the command to complete in a goroutine
|
||||
doneCh := make(chan error, 1)
|
||||
go func() {
|
||||
doneCh <- cmd.Wait()
|
||||
}()
|
||||
|
||||
// Wait for the command to finish or for us to be interrupted
|
||||
select {
|
||||
case err = <-doneCh:
|
||||
case <-ctx.Done():
|
||||
cmd.Process.Kill()
|
||||
err = <-doneCh
|
||||
}
|
||||
err = cmd.Wait()
|
||||
}
|
||||
|
||||
// Close the write-end of the pipe so that the goroutine mirroring output
|
||||
// ends properly.
|
||||
pw.Close()
|
||||
<-copyDoneCh
|
||||
|
||||
// Cancelling the command may block the pipe reader if the file descriptor
|
||||
// was passed to a child process which hasn't closed it. In this case the
|
||||
// copyOutput goroutine will just hang out until exit.
|
||||
select {
|
||||
case <-copyDoneCh:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error running command '%s': %v. Output: %s",
|
||||
|
@ -2,6 +2,7 @@ package localexec
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -38,7 +39,9 @@ func TestResourceProvider_Apply(t *testing.T) {
|
||||
|
||||
func TestResourceProvider_stop(t *testing.T) {
|
||||
c := testConfig(t, map[string]interface{}{
|
||||
"command": "sleep 60",
|
||||
// bash/zsh/ksh will exec a single command in the same process. This
|
||||
// makes certain there's a subprocess in the shell.
|
||||
"command": "sleep 30; sleep 30",
|
||||
})
|
||||
|
||||
output := new(terraform.MockUIOutput)
|
||||
@ -54,7 +57,7 @@ func TestResourceProvider_stop(t *testing.T) {
|
||||
select {
|
||||
case <-doneCh:
|
||||
t.Fatal("should not finish quickly")
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
|
||||
// Stop it
|
||||
@ -62,8 +65,8 @@ func TestResourceProvider_stop(t *testing.T) {
|
||||
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("should finish")
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
log.Fatal("should finish")
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user