Skip to content

Commit

Permalink
fix: not reporting BizStatusError to tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
felix.fengmin committed Mar 14, 2024
1 parent 92f64b0 commit 86284dd
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
18 changes: 17 additions & 1 deletion client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sync/atomic"

"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
"github.com/cloudwego/kitex/pkg/serviceinfo"

Expand Down Expand Up @@ -164,6 +165,8 @@ func (s *stream) Context() context.Context {
func (s *stream) RecvMsg(m interface{}) (err error) {
err = s.recvEndpoint(s.stream, m)
if err == nil {
// BizStatusErr is returned by the server handle, meaning the stream is ended;
// And it should be returned to the calling business code for error handling
err = s.ri.Invocation().BizStatusErr()
}
if err != nil || s.streamingMode == serviceinfo.StreamingClient {
Expand Down Expand Up @@ -193,10 +196,23 @@ func (s *stream) DoFinish(err error) {
// already called
return
}
if err == io.EOF {
if !isRPCError(err) {
// only rpc errors are reported
err = nil
}
ctx := s.Context()
ri := rpcinfo.GetRPCInfo(ctx)
s.kc.opt.TracerCtl.DoFinish(ctx, ri, err)
}

func isRPCError(err error) bool {
if err == nil {
return false
}
if err == io.EOF {
return false
}
_, isBizStatusError := err.(kerrors.BizStatusErrorIface)
// if a tracer needs to get the BizStatusError, it should read from rpcinfo.invocation.bizStatusErr
return !isBizStatusError
}
43 changes: 43 additions & 0 deletions client/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
mocksnet "github.com/cloudwego/kitex/internal/mocks/net"
mock_remote "github.com/cloudwego/kitex/internal/mocks/remote"
"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/remotecli"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
Expand Down Expand Up @@ -481,6 +482,33 @@ func Test_stream_DoFinish(t *testing.T) {
test.Assert(t, err == nil)
})

t.Run("biz-status-error", func(t *testing.T) {
ri := rpcinfo.NewRPCInfo(nil, nil, nil, nil, rpcinfo.NewRPCStats())
st := &mockStream{
ctx: rpcinfo.NewCtxWithRPCInfo(context.Background(), ri),
}
tracer := &mockTracer{}
ctl := &rpcinfo.TraceController{}
ctl.Append(tracer)
kc := &kClient{
opt: &client.Options{
TracerCtl: ctl,
},
}
s := newStream(st, kc, ri, serviceinfo.StreamingBidirectional, nil, nil)

finishCalled := false
var err error
tracer.finish = func(ctx context.Context) {
ri := rpcinfo.GetRPCInfo(ctx)
err = ri.Stats().Error()
finishCalled = true
}
s.DoFinish(kerrors.NewBizStatusError(100, "biz status error"))
test.Assert(t, finishCalled)
test.Assert(t, err == nil) // biz status error is not an rpc error
})

t.Run("error", func(t *testing.T) {
ri := rpcinfo.NewRPCInfo(nil, nil, nil, nil, rpcinfo.NewRPCStats())
st := &mockStream{
Expand Down Expand Up @@ -541,3 +569,18 @@ func Test_kClient_getStreamingMode(t *testing.T) {
test.Assert(t, mode == serviceinfo.StreamingBidirectional)
})
}

func Test_isRPCError(t *testing.T) {
t.Run("nil", func(t *testing.T) {
test.Assert(t, !isRPCError(nil))
})
t.Run("EOF", func(t *testing.T) {
test.Assert(t, !isRPCError(io.EOF))
})
t.Run("biz status error", func(t *testing.T) {
test.Assert(t, !isRPCError(kerrors.NewBizStatusError(100, "biz status error")))
})
t.Run("error", func(t *testing.T) {
test.Assert(t, isRPCError(errors.New("error")))
})
}

0 comments on commit 86284dd

Please # to comment.