Skip to content

Commit

Permalink
Return bundle errors through execPipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Aug 31, 2023
1 parent e9e42d2 commit ceb39d8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
10 changes: 4 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,10 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c

slog.Debug("Execute: processing", "bundle", rb)
defer b.Cleanup(wk)
b.Fail = func(errMsg string) {
slog.Debug("job failed", "bundle", rb, "job", j)
err := fmt.Errorf("stage exec %v", errMsg)
j.Failed(err)
}
dataReady = b.ProcessOn(ctx, wk)
default:
err := fmt.Errorf("unknown environment[%v]", s.envID)
slog.Error("Execute", err)
slog.Error("Execute", "error", err)
panic(err)
}

Expand Down Expand Up @@ -195,6 +190,9 @@ progress:
var resp *fnpb.ProcessBundleResponse
select {
case resp = <-b.Resp:
if b.BundleErr != nil {
return b.BundleErr
}
case <-ctx.Done():
return context.Cause(ctx)
}
Expand Down
11 changes: 5 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ type B struct {
dataSema atomic.Int32
OutputData engine.TentativeData

Resp chan *fnpb.ProcessBundleResponse
Resp chan *fnpb.ProcessBundleResponse
BundleErr error
responded bool

SinkToPCollection map[string]string

Fail func(err string) // Called if bundle returns an error.
responded bool
}

// Init initializes the bundle's internal state for waiting on all
Expand Down Expand Up @@ -89,12 +88,12 @@ func (b *B) LogValue() slog.Value {

func (b *B) Respond(resp *fnpb.InstructionResponse) {
if b.responded {
slog.Warn("second bundle response", "bundle", b, "resp", resp)
slog.Warn("additional bundle response", "bundle", b, "resp", resp)
return
}
b.responded = true
if resp.GetError() != "" {
b.Fail(resp.GetError())
b.BundleErr = fmt.Errorf("bundle %v failed:%v", resp.GetInstructionId(), resp.GetError())
close(b.Resp)
return
}
Expand Down

0 comments on commit ceb39d8

Please # to comment.