diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index c363eb63a725..4d8d4621168d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -117,15 +117,10 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c slog.Debug("Execute: processing", "bundle", rb) defer b.Cleanup(wk) - b.Fail = func(errMsg string) { - slog.Debug("job failed", "bundle", rb, "job", j) - err := fmt.Errorf("stage exec %v", errMsg) - j.Failed(err) - } dataReady = b.ProcessOn(ctx, wk) default: err := fmt.Errorf("unknown environment[%v]", s.envID) - slog.Error("Execute", err) + slog.Error("Execute", "error", err) panic(err) } @@ -195,6 +190,9 @@ progress: var resp *fnpb.ProcessBundleResponse select { case resp = <-b.Resp: + if b.BundleErr != nil { + return b.BundleErr + } case <-ctx.Done(): return context.Cause(ctx) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 86a7a53d01ae..98479e3db071 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -52,12 +52,11 @@ type B struct { dataSema atomic.Int32 OutputData engine.TentativeData - Resp chan *fnpb.ProcessBundleResponse + Resp chan *fnpb.ProcessBundleResponse + BundleErr error + responded bool SinkToPCollection map[string]string - - Fail func(err string) // Called if bundle returns an error. - responded bool } // Init initializes the bundle's internal state for waiting on all @@ -89,12 +88,12 @@ func (b *B) LogValue() slog.Value { func (b *B) Respond(resp *fnpb.InstructionResponse) { if b.responded { - slog.Warn("second bundle response", "bundle", b, "resp", resp) + slog.Warn("additional bundle response", "bundle", b, "resp", resp) return } b.responded = true if resp.GetError() != "" { - b.Fail(resp.GetError()) + b.BundleErr = fmt.Errorf("bundle %v failed:%v", resp.GetInstructionId(), resp.GetError()) close(b.Resp) return }