Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
}

// convert prompb to cortexpb TimeSeries
tsList := []cortexpb.PreallocTimeseries(nil)
tsList := make([]cortexpb.PreallocTimeseries, 0, len(promTsList))
for _, v := range promTsList {
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
Labels: makeLabels(v.Labels),
Expand Down
168 changes: 167 additions & 1 deletion pkg/util/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func getOTLPHttpRequest(otlpRequest *pmetricotlp.ExportRequest, contentType, enc
return req, nil
}

func BenchmarkOTLPWriteHandler(b *testing.B) {
func BenchmarkOTLPWriteHandlerCompression(b *testing.B) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
Expand Down Expand Up @@ -695,6 +695,91 @@ func BenchmarkOTLPWriteHandler(b *testing.B) {
})
}

func BenchmarkOTLPWriteHandlerPush(b *testing.B) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
DisableTargetInfo: false,
}
overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)

mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}
handler := OTLPHandler(1000000, overrides, cfg, nil, mockPushFunc)

tests := []struct {
description string
numSeries int
samplesPerSeries int
numHistograms int
}{
{
numSeries: 1,
samplesPerSeries: 10,
numHistograms: 1,
},
{
numSeries: 1,
samplesPerSeries: 100,
numHistograms: 1,
},
{
numSeries: 1,
samplesPerSeries: 1000,
numHistograms: 1,
},
{
numSeries: 1,
samplesPerSeries: 1,
numHistograms: 10,
},
{
numSeries: 1,
samplesPerSeries: 1,
numHistograms: 100,
},
{
numSeries: 1,
samplesPerSeries: 1,
numHistograms: 1000,
},
{
numSeries: 10,
samplesPerSeries: 1,
numHistograms: 1,
},
{
numSeries: 100,
samplesPerSeries: 1,
numHistograms: 1,
},
{
numSeries: 1000,
samplesPerSeries: 1,
numHistograms: 1,
},
}

for _, test := range tests {
b.Run(fmt.Sprintf("numSeries:%d, samplesPerSeries:%d, numHistograms:%d", test.numSeries, test.samplesPerSeries, test.numHistograms), func(b *testing.B) {
exportRequest := generateOTLPWriteRequestWithSeries(test.numSeries, test.samplesPerSeries, test.numHistograms)
req, err := getOTLPHttpRequest(&exportRequest, pbContentType, "gzip")
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

resp := recorder.Result()
require.Equal(b, http.StatusOK, resp.StatusCode)
req.Body.(*resetReader).Reset()
}
})
}
}

func TestOTLPWriteHandler(t *testing.T) {
cfg := distributor.OTLPConfig{
ConvertAllAttributes: false,
Expand Down Expand Up @@ -800,6 +885,87 @@ func TestOTLPWriteHandler(t *testing.T) {
}
}

func generateOTLPWriteRequestWithSeries(numSeries, samplesPerSeries, numHistogram int) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

attributes := pcommon.NewMap()
attributes.PutStr("label1", "value1")
attributes.PutStr("label2", "value2")
attributes.PutStr("label3", "value3")

for i := 0; i < numSeries; i++ {
metricName := fmt.Sprintf("series_%d", i)
metricUnit := fmt.Sprintf("unit_%d", i)
metricDescription := fmt.Sprintf("description_%d", i)

resourceMetric := d.ResourceMetrics().AppendEmpty()
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")

scopeMetric := resourceMetric.ScopeMetrics()
metric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()

// set metadata
metric.SetName(metricName)
metric.SetDescription(metricDescription)
metric.SetUnit(metricUnit)
metric.SetEmptyGauge()

for j := 0; j < samplesPerSeries; j++ {
v := float64(j + i)
ts := time.Now().Add(time.Second * 30 * time.Duration(samplesPerSeries-j+1))
dataPoint := metric.Gauge().DataPoints().AppendEmpty()
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dataPoint.SetDoubleValue(v)
attributes.CopyTo(dataPoint.Attributes())

// exemplar
exemplar := dataPoint.Exemplars().AppendEmpty()
exemplar.SetTimestamp(pcommon.NewTimestampFromTime(ts))
exemplar.SetDoubleValue(v)
exemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7})
exemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
}

for j := 0; j < numHistogram; j++ {
ts := time.Now().Add(time.Second * 30 * time.Duration(numHistogram-j+1))
// Generate One Histogram
histogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()
histogramMetric.SetName(fmt.Sprintf("test-histogram_%d", j))
histogramMetric.SetDescription(fmt.Sprintf("test-histogram-description_%d", j))
histogramMetric.SetEmptyHistogram()
histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

histogramDataPoint := histogramMetric.Histogram().DataPoints().AppendEmpty()
histogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
histogramDataPoint.ExplicitBounds().FromRaw([]float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0})
histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2, 2})
histogramDataPoint.SetCount(10)
histogramDataPoint.SetSum(30.0)
attributes.CopyTo(histogramDataPoint.Attributes())

// Generate One Exponential-Histogram
exponentialHistogramMetric := scopeMetric.AppendEmpty().Metrics().AppendEmpty()
exponentialHistogramMetric.SetName(fmt.Sprintf("test-exponential-histogram_%d", j))
exponentialHistogramMetric.SetDescription(fmt.Sprintf("test-exponential-histogram-description_%d", j))
exponentialHistogramMetric.SetEmptyExponentialHistogram()
exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

exponentialHistogramDataPoint := exponentialHistogramMetric.ExponentialHistogram().DataPoints().AppendEmpty()
exponentialHistogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
exponentialHistogramDataPoint.SetScale(2.0)
exponentialHistogramDataPoint.Positive().BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2})
exponentialHistogramDataPoint.SetZeroCount(2)
exponentialHistogramDataPoint.SetCount(10)
exponentialHistogramDataPoint.SetSum(30.0)
attributes.CopyTo(exponentialHistogramDataPoint.Attributes())
}
}

return pmetricotlp.NewExportRequestFromMetrics(d)
}

func generateOTLPWriteRequest() pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

Expand Down
Loading