Skip to content

Commit

Permalink
fix: streaming span writer is not working in grpc based remote storag…
Browse files Browse the repository at this point in the history
…e plugin (#3887)

* plugin/storage: streaming span writer is not assigned for grpc remote plugin

Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>

* storage/grpc: propagate grpc library errors

grpc_handler.go handles only io.EOF and skipping other errors causes crash.

Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>

Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
  • Loading branch information
arajkumar authored Aug 26, 2022
1 parent 538d96c commit 514e27a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
5 changes: 3 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices,
grpcClient := shared.NewGRPCClient(conn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
},
Capabilities: grpcClient,
}, nil
Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/grpc/shared/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func (s *GRPCHandler) WriteSpanStream(stream storage_v1.StreamingSpanWriterPlugi
if err == io.EOF {
break
}
if err != nil {
return err
}
err = writer.WriteSpan(stream.Context(), in.Span)
if err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions plugin/storage/grpc/shared/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ func TestGRPCServerWriteSpanStream(t *testing.T) {
})
}

func TestGRPCServerWriteSpanStreamWithGRPCError(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
stream := new(grpcMocks.StreamingSpanWriterPlugin_WriteSpanStreamServer)
stream.On("Recv").Return(&storage_v1.WriteSpanRequest{Span: &mockTraceSpans[0]}, nil).Twice().
On("Recv").Return(nil, context.DeadlineExceeded).Once()
stream.On("SendAndClose", &storage_v1.WriteSpanResponse{}).Return(nil)
stream.On("Context").Return(context.Background())
r.impl.streamWriter.On("WriteSpan", context.Background(), &mockTraceSpans[0]).Return(nil)

err := r.server.WriteSpanStream(stream)
assert.ErrorContains(t, err, context.DeadlineExceeded.Error())
})
}

func TestGRPCServerGetDependencies(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
lookback := time.Duration(1 * time.Second)
Expand Down

0 comments on commit 514e27a

Please # to comment.