Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Commit

Permalink
Fixing more issues with closed pipes
Browse files Browse the repository at this point in the history
Added to code to ensure that all std* IO is done before Wait() is called
to ensure that no deadlocks happen.

Also closing the ReadPipe now with io.EOF and filtering that out.
Without the explicit io.EOF a pipe will return io.ErrClosedPipe. Thus we
can differentiate between the intended closing and unintended closing of
the ReadPipe.
  • Loading branch information
Kidswiss committed May 27, 2020
1 parent 5bf4194 commit 1256328
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 40 deletions.
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"go.testTimeout": "120s",
"go.testFlags": [
"-v",
// "--race",
"--race",
"--count=1",
],
"go.testEnvVars": {
"RESTIC_REPOSITORY":"s3:http://localhost:9000/baas1",
Expand All @@ -13,7 +14,7 @@
"BACKUP_DIR":"testdata",
"HOSTNAME":"test",
"STATS_URL":"http://localhost:8091",
"PROM_URL": "http://localhost:8090",
// "PROM_URL": "http://localhost:8090",
"RESTORE_ACCESSKEYID": "8U0UDNYPNUDTUS1LIAF3",
"RESTORE_SECRETACCESSKEY":"ip3cdrkXcHmH4S7if7erKPNoxDn27V0vrg6CHHem",
"RESTORE_S3ENDPOINT": "http://localhost:9000/restore",
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ RUN apk --no-cache add ca-certificates
COPY --from=build /usr/local/bin/restic /usr/local/bin/restic
COPY --from=build /go/bin/wrestic /app/

# USER 1001

ENTRYPOINT [ "./wrestic" ]
2 changes: 1 addition & 1 deletion kubernetes/podExec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type ExecData struct {
Reader io.ReadCloser
Reader *io.PipeReader
Done chan bool
}

Expand Down
54 changes: 40 additions & 14 deletions restic/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"io/ioutil"
"os"
"path"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/vshn/wrestic/kubernetes"
)

type backupEnvelope struct {
Expand Down Expand Up @@ -90,13 +92,10 @@ func (r *Restic) Backup(backupDir string, tags ArrayOpts) error {

}

// TODO: this shares quite some code with stdinBackup
func (r *Restic) folderBackup(folder string, backuplogger logr.Logger, tags ArrayOpts) error {

readPipe, writePipe := io.Pipe()
defer readPipe.Close()
defer writePipe.Close()

go r.parseBackupOutput(readPipe, backuplogger, folder)

backuplogger.Info("starting backup for folder", "foldername", path.Base(folder))

Expand All @@ -113,18 +112,10 @@ func (r *Restic) folderBackup(folder string, backuplogger logr.Logger, tags Arra
StdErr: writePipe,
}

if len(tags) > 0 {
opts.Args = append(opts.Args, tags.BuildArgs()...)
}

cmd := NewCommand(r.ctx, backuplogger, opts)
cmd.Run()

return cmd.FatalError
return r.triggerBackup(folder, backuplogger, tags, readPipe, opts, nil)
}

func (r *Restic) parseBackupOutput(reader io.ReadCloser, log logr.Logger, folder string) {
defer reader.Close()
func (r *Restic) parseBackupOutput(reader *io.PipeReader, log logr.Logger, folder string) {

decoder := json.NewDecoder(reader)

Expand Down Expand Up @@ -203,3 +194,38 @@ func (r *Restic) sendPostWebhook() {
}

}

func (r *Restic) triggerBackup(name string, logger logr.Logger, tags ArrayOpts, readPipe *io.PipeReader, opts CommandOptions, data *kubernetes.ExecData) error {
if len(tags) > 0 {
opts.Args = append(opts.Args, tags.BuildArgs()...)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
r.parseBackupOutput(readPipe, logger, name)
wg.Done()
}()

cmd := NewCommand(r.ctx, logger, opts)
cmd.Configure()

cmd.Start()

// All std* io has to be finished before calling Wait() as it will block
// otherwise.
if data != nil {
// wait for data to finish writing, before waiting for the command
<-data.Done
}
// wait for the outputparsing to finish
wg.Wait()

err := readPipe.CloseWithError(io.EOF)
if err != nil {
return fmt.Errorf("error during pipe close: %v", err)
}

cmd.Wait()

return cmd.FatalError
}
7 changes: 5 additions & 2 deletions restic/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *Command) Start() {
}
err := c.cmd.Start()
if err != nil {
c.FatalError = err
c.FatalError = fmt.Errorf("cmd.Start() err: %v", err)
return
}
}
Expand All @@ -86,7 +86,10 @@ func (c *Command) Wait() {
}
err := c.cmd.Wait()
if err != nil {
c.FatalError = err
if err == io.EOF {
return
}
c.FatalError = fmt.Errorf("cmd.Wait() err: %v", err)
return
}
}
22 changes: 1 addition & 21 deletions restic/stdinbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ func (r *Restic) StdinBackup(data *kubernetes.ExecData, filename, fileExt string
stdinlogger.Info("starting stdin backup", "filename", filename, "extension", fileExt)

readPipe, writePipe := io.Pipe()
defer readPipe.Close()
defer writePipe.Close()

go r.parseBackupOutput(readPipe, stdinlogger, filename+fileExt)

opts := CommandOptions{
Path: r.resticPath,
Expand All @@ -37,21 +33,5 @@ func (r *Restic) StdinBackup(data *kubernetes.ExecData, filename, fileExt string
StdIn: data.Reader,
}

if len(tags) > 0 {
opts.Args = append(opts.Args, tags.BuildArgs()...)
}

cmd := NewCommand(r.ctx, stdinlogger, opts)
cmd.Configure()

cmd.Start()

// wait for data to finish writing, before waiting for the command
<-data.Done

cmd.Wait()

data.Reader.Close()

return cmd.FatalError
return r.triggerBackup(filename+fileExt, stdinlogger, tags, readPipe, opts, data)
}

0 comments on commit 1256328

Please # to comment.