Skip to content

Commit

Permalink
Add a notion of Request to otlp to decouple data from protocol specif…
Browse files Browse the repository at this point in the history
…ic metadata (open-telemetry#4050)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Sep 16, 2021
1 parent bcbaeaf commit f3cd422
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 67 deletions.
7 changes: 3 additions & 4 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

func TestDefaultGrpcClientSettings(t *testing.T) {
Expand Down Expand Up @@ -481,7 +480,7 @@ func TestHttpReception(t *testing.T) {
assert.NoError(t, errDial)
client := otlpgrpc.NewTracesClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, pdata.NewTraces(), grpc.WaitForReady(true))
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
if tt.hasError {
assert.Error(t, errResp)
} else {
Expand Down Expand Up @@ -528,7 +527,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
assert.NoError(t, errDial)
client := otlpgrpc.NewTracesClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, pdata.NewTraces(), grpc.WaitForReady(true))
resp, errResp := client.Export(ctx, otlpgrpc.NewTracesRequest(), grpc.WaitForReady(true))
assert.NoError(t, errResp)
assert.NotNil(t, resp)
cancelFunc()
Expand All @@ -537,7 +536,7 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {

type grpcTraceServer struct{}

func (gts *grpcTraceServer) Export(context.Context, pdata.Traces) (otlpgrpc.TracesResponse, error) {
func (gts *grpcTraceServer) Export(context.Context, otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
return otlpgrpc.NewTracesResponse(), nil
}

Expand Down
12 changes: 9 additions & 3 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,23 @@ func (gs *grpcSender) stop() error {
}

func (gs *grpcSender) exportTrace(ctx context.Context, td pdata.Traces) error {
_, err := gs.traceExporter.Export(gs.enhanceContext(ctx), td, gs.callOptions...)
req := otlpgrpc.NewTracesRequest()
req.SetTraces(td)
_, err := gs.traceExporter.Export(gs.enhanceContext(ctx), req, gs.callOptions...)
return processError(err)
}

func (gs *grpcSender) exportMetrics(ctx context.Context, md pdata.Metrics) error {
_, err := gs.metricExporter.Export(gs.enhanceContext(ctx), md, gs.callOptions...)
req := otlpgrpc.NewMetricsRequest()
req.SetMetrics(md)
_, err := gs.metricExporter.Export(gs.enhanceContext(ctx), req, gs.callOptions...)
return processError(err)
}

func (gs *grpcSender) exportLogs(ctx context.Context, ld pdata.Logs) error {
_, err := gs.logExporter.Export(gs.enhanceContext(ctx), ld, gs.callOptions...)
req := otlpgrpc.NewLogsRequest()
req.SetLogs(ld)
_, err := gs.logExporter.Export(gs.enhanceContext(ctx), req, gs.callOptions...)
return processError(err)
}

Expand Down
9 changes: 6 additions & 3 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ type mockTracesReceiver struct {
lastRequest pdata.Traces
}

func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (otlpgrpc.TracesResponse, error) {
func (r *mockTracesReceiver) Export(ctx context.Context, req otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
td := req.Traces()
atomic.AddInt32(&r.totalItems, int32(td.SpanCount()))
r.mux.Lock()
defer r.mux.Unlock()
Expand Down Expand Up @@ -110,8 +111,9 @@ type mockLogsReceiver struct {
lastRequest pdata.Logs
}

func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (otlpgrpc.LogsResponse, error) {
func (r *mockLogsReceiver) Export(ctx context.Context, req otlpgrpc.LogsRequest) (otlpgrpc.LogsResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
ld := req.Logs()
atomic.AddInt32(&r.totalItems, int32(ld.LogRecordCount()))
r.mux.Lock()
defer r.mux.Unlock()
Expand Down Expand Up @@ -147,7 +149,8 @@ type mockMetricsReceiver struct {
lastRequest pdata.Metrics
}

func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
func (r *mockMetricsReceiver) Export(ctx context.Context, req otlpgrpc.MetricsRequest) (otlpgrpc.MetricsResponse, error) {
md := req.Metrics()
atomic.AddInt32(&r.requestCount, 1)
atomic.AddInt32(&r.totalItems, int32(md.DataPointCount()))
r.mux.Lock()
Expand Down
28 changes: 23 additions & 5 deletions model/otlpgrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ func NewLogsResponse() LogsResponse {
return LogsResponse{orig: &otlpcollectorlog.ExportLogsServiceResponse{}}
}

// LogsRequest represents the response for gRPC client/server.
type LogsRequest struct {
orig *otlpcollectorlog.ExportLogsServiceRequest
}

// NewLogsRequest returns an empty LogsRequest.
func NewLogsRequest() LogsRequest {
return LogsRequest{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
}

func (lr LogsRequest) SetLogs(ld pdata.Logs) {
lr.orig.ResourceLogs = internal.LogsToOtlp(ld.InternalRep()).ResourceLogs
}

func (lr LogsRequest) Logs() pdata.Logs {
return pdata.LogsFromInternalRep(internal.LogsFromOtlp(lr.orig))
}

// LogsClient is the client API for OTLP-GRPC Logs service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
Expand All @@ -44,7 +62,7 @@ type LogsClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error)
Export(ctx context.Context, request LogsRequest, opts ...grpc.CallOption) (LogsResponse, error)
}

type logsClient struct {
Expand All @@ -56,8 +74,8 @@ func NewLogsClient(cc *grpc.ClientConn) LogsClient {
return &logsClient{rawClient: otlpcollectorlog.NewLogsServiceClient(cc)}
}

func (c *logsClient) Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.LogsToOtlp(in.InternalRep()), opts...)
func (c *logsClient) Export(ctx context.Context, request LogsRequest, opts ...grpc.CallOption) (LogsResponse, error) {
rsp, err := c.rawClient.Export(ctx, request.orig, opts...)
return LogsResponse{orig: rsp}, err
}

Expand All @@ -67,7 +85,7 @@ type LogsServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Logs) (LogsResponse, error)
Export(context.Context, LogsRequest) (LogsResponse, error)
}

// RegisterLogsServer registers the LogsServer to the grpc.Server.
Expand All @@ -80,6 +98,6 @@ type rawLogsServer struct {
}

func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.ExportLogsServiceRequest) (*otlpcollectorlog.ExportLogsServiceResponse, error) {
rsp, err := s.srv.Export(ctx, pdata.LogsFromInternalRep(internal.LogsFromOtlp(request)))
rsp, err := s.srv.Export(ctx, LogsRequest{orig: request})
return rsp.orig, err
}
28 changes: 23 additions & 5 deletions model/otlpgrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ func NewMetricsResponse() MetricsResponse {
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
}

// MetricsRequest represents the response for gRPC client/server.
type MetricsRequest struct {
orig *otlpcollectormetrics.ExportMetricsServiceRequest
}

// NewMetricsRequest returns an empty MetricsRequest.
func NewMetricsRequest() MetricsRequest {
return MetricsRequest{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
}

func (lr MetricsRequest) SetMetrics(ld pdata.Metrics) {
lr.orig.ResourceMetrics = internal.MetricsToOtlp(ld.InternalRep()).ResourceMetrics
}

func (lr MetricsRequest) Metrics() pdata.Metrics {
return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(lr.orig))
}

// MetricsClient is the client API for OTLP-GRPC Metrics service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
Expand All @@ -44,7 +62,7 @@ type MetricsClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error)
Export(ctx context.Context, request MetricsRequest, opts ...grpc.CallOption) (MetricsResponse, error)
}

type metricsClient struct {
Expand All @@ -56,8 +74,8 @@ func NewMetricsClient(cc *grpc.ClientConn) MetricsClient {
return &metricsClient{rawClient: otlpcollectormetrics.NewMetricsServiceClient(cc)}
}

func (c *metricsClient) Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.MetricsToOtlp(in.InternalRep()), opts...)
func (c *metricsClient) Export(ctx context.Context, request MetricsRequest, opts ...grpc.CallOption) (MetricsResponse, error) {
rsp, err := c.rawClient.Export(ctx, request.orig, opts...)
return MetricsResponse{orig: rsp}, err
}

Expand All @@ -67,7 +85,7 @@ type MetricsServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Metrics) (MetricsResponse, error)
Export(context.Context, MetricsRequest) (MetricsResponse, error)
}

// RegisterMetricsServer registers the MetricsServer to the grpc.Server.
Expand All @@ -80,6 +98,6 @@ type rawMetricsServer struct {
}

func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) {
rsp, err := s.srv.Export(ctx, pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(request)))
rsp, err := s.srv.Export(ctx, MetricsRequest{orig: request})
return rsp.orig, err
}
28 changes: 23 additions & 5 deletions model/otlpgrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ func NewTracesResponse() TracesResponse {
return TracesResponse{orig: &otlpcollectortrace.ExportTraceServiceResponse{}}
}

// TracesRequest represents the response for gRPC client/server.
type TracesRequest struct {
orig *otlpcollectortrace.ExportTraceServiceRequest
}

// NewTracesRequest returns an empty TracesRequest.
func NewTracesRequest() TracesRequest {
return TracesRequest{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

func (lr TracesRequest) SetTraces(ld pdata.Traces) {
lr.orig.ResourceSpans = internal.TracesToOtlp(ld.InternalRep()).ResourceSpans
}

func (lr TracesRequest) Traces() pdata.Traces {
return pdata.TracesFromInternalRep(internal.TracesFromOtlp(lr.orig))
}

// TracesClient is the client API for OTLP-GRPC Traces service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
Expand All @@ -44,7 +62,7 @@ type TracesClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error)
Export(ctx context.Context, request TracesRequest, opts ...grpc.CallOption) (TracesResponse, error)
}

type tracesClient struct {
Expand All @@ -57,8 +75,8 @@ func NewTracesClient(cc *grpc.ClientConn) TracesClient {
}

// Export implements the TracesClient interface.
func (c *tracesClient) Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.TracesToOtlp(in.InternalRep()), opts...)
func (c *tracesClient) Export(ctx context.Context, request TracesRequest, opts ...grpc.CallOption) (TracesResponse, error) {
rsp, err := c.rawClient.Export(ctx, request.orig, opts...)
return TracesResponse{orig: rsp}, err
}

Expand All @@ -68,7 +86,7 @@ type TracesServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Traces) (TracesResponse, error)
Export(context.Context, TracesRequest) (TracesResponse, error)
}

// RegisterTracesServer registers the TracesServer to the grpc.Server.
Expand All @@ -81,6 +99,6 @@ type rawTracesServer struct {
}

func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortrace.ExportTraceServiceRequest) (*otlpcollectortrace.ExportTraceServiceResponse, error) {
rsp, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
rsp, err := s.srv.Export(ctx, TracesRequest{orig: request})
return rsp.orig, err
}
4 changes: 2 additions & 2 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
)

Expand All @@ -45,7 +44,8 @@ func New(id config.ComponentID, nextConsumer consumer.Logs) *Receiver {
}

// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (otlpgrpc.LogsResponse, error) {
func (r *Receiver) Export(ctx context.Context, req otlpgrpc.LogsRequest) (otlpgrpc.LogsResponse, error) {
ld := req.Logs()
numSpans := ld.LogRecordCount()
if numSpans == 0 {
return otlpgrpc.NewLogsResponse(), nil
Expand Down
15 changes: 8 additions & 7 deletions receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)

func TestExport(t *testing.T) {
// given

logSink := new(consumertest.LogsSink)

addr, doneFn := otlpReceiverOnGRPCServer(t, logSink)
Expand All @@ -44,10 +41,12 @@ func TestExport(t *testing.T) {
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
defer traceClientDoneFn()

req := testdata.GenerateLogsOneLogRecord()
ld := testdata.GenerateLogsOneLogRecord()
// Keep log data to compare the test result against it
// Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream
logData := req.Clone()
logData := ld.Clone()
req := otlpgrpc.NewLogsRequest()
req.SetLogs(ld)

resp, err := traceClient.Export(context.Background(), req)
require.NoError(t, err, "Failed to export trace: %v", err)
Expand All @@ -68,7 +67,7 @@ func TestExport_EmptyRequest(t *testing.T) {
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
defer logClientDoneFn()

resp, err := logClient.Export(context.Background(), pdata.NewLogs())
resp, err := logClient.Export(context.Background(), otlpgrpc.NewLogsRequest())
assert.NoError(t, err, "Failed to export trace: %v", err)
assert.NotNil(t, resp, "The response is missing")
}
Expand All @@ -81,7 +80,9 @@ func TestExport_ErrorConsumer(t *testing.T) {
require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err)
defer logClientDoneFn()

req := testdata.GenerateLogsOneLogRecord()
ld := testdata.GenerateLogsOneLogRecord()
req := otlpgrpc.NewLogsRequest()
req.SetLogs(ld)

resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
Expand Down
4 changes: 2 additions & 2 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
)

Expand All @@ -45,7 +44,8 @@ func New(id config.ComponentID, nextConsumer consumer.Metrics) *Receiver {
}

// Export implements the service Export metrics func.
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
func (r *Receiver) Export(ctx context.Context, req otlpgrpc.MetricsRequest) (otlpgrpc.MetricsResponse, error) {
md := req.Metrics()
dataPointCount := md.DataPointCount()
if dataPointCount == 0 {
return otlpgrpc.NewMetricsResponse(), nil
Expand Down
Loading

0 comments on commit f3cd422

Please # to comment.