Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add in-memory storage support for adaptive sampling #3335

Merged
merged 10 commits into from
Oct 25, 2021
12 changes: 12 additions & 0 deletions plugin/storage/memory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -79,6 +81,16 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store, nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return NewSamplingStore(), nil
}

// CreateLock implements storage.SamplingStoreFactory
func (f *Factory) CreateLock() (distributedlock.Lock, error) {
return &lock{}, nil
}

func (f *Factory) publishOpts() {
internalFactory := f.metricsFactory.Namespace(metrics.NSOptions{Name: "internal"})
internalFactory.Gauge(metrics.Options{Name: limit}).
Expand Down
30 changes: 30 additions & 0 deletions plugin/storage/memory/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import "time"

type lock struct{}

// Acquire always returns true for memory storage
func (l *lock) Acquire(resource string, ttl time.Duration) (bool, error) {
return true, nil
}

// Forfeit always returns true for memory storage
func (l *lock) Forfeit(resource string) (bool, error) {
return true, nil
}
37 changes: 37 additions & 0 deletions plugin/storage/memory/lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"testing"
"time"

"github.com/crossdock/crossdock-go/assert"
)

func TestAcquire(t *testing.T) {
l := &lock{}
acuired, err := l.Acquire("resource", time.Duration(1))
assert.True(t, acuired)
assert.NoError(t, err)
}

func TestForfeit(t *testing.T) {
l := &lock{}
acuired, err := l.Forfeit("resource")
assert.True(t, acuired)
assert.NoError(t, err)
}
119 changes: 119 additions & 0 deletions plugin/storage/memory/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"sync"
"time"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
)

// SamplingStroe is an in-memory store for sampling data
type SamplingStore struct {
sync.RWMutex
throughputs []*memoryThroughput
probabilitiesAndQPS []*memoryServiceOperationProbabilitiesAndQPS
}

type memoryThroughput struct {
throughput *model.Throughput
time time.Time
}

type memoryServiceOperationProbabilitiesAndQPS struct {
hostname string
probabilities model.ServiceOperationProbabilities
qps model.ServiceOperationQPS
time time.Time
}

// NewSamplingStore creates an in-memory sampling store.
func NewSamplingStore() *SamplingStore {
return &SamplingStore{throughputs: make([]*memoryThroughput, 0), probabilitiesAndQPS: make([]*memoryServiceOperationProbabilitiesAndQPS, 0)}
}

// InsertThroughput implements samplingstore.Store#InsertThroughput.
func (ss *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
ss.Lock()
defer ss.Unlock()
now := time.Now()
for _, t := range throughput {
ss.throughputs = append(ss.throughputs, &memoryThroughput{t, now})
}
return nil
}

// GetThroughput implements samplingstore.Store#GetThroughput.
func (ss *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
ss.Lock()
defer ss.Unlock()
ret := make([]*model.Throughput, 0)
for _, t := range ss.throughputs {
if t.time.After(start) && (t.time.Before(end) || t.time.Equal(end)) {
ret = append(ret, t.throughput)
}
}
return ret, nil
}

// InsertProbabilitiesAndQPS implements samplingstore.Store#InsertProbabilitiesAndQPS.
func (ss *SamplingStore) InsertProbabilitiesAndQPS(
hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS,
) error {
ss.Lock()
defer ss.Unlock()
ss.probabilitiesAndQPS = append(ss.probabilitiesAndQPS, &memoryServiceOperationProbabilitiesAndQPS{hostname, probabilities, qps, time.Now()})
return nil
}

// GetLatestProbabilities implements samplingstore.Store#GetLatestProbabilities.
func (ss *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
ss.Lock()
defer ss.Unlock()
if size := len(ss.probabilitiesAndQPS); size != 0 {
return ss.probabilitiesAndQPS[size-1].probabilities, nil
}
return model.ServiceOperationProbabilities{}, nil
}

// GetProbabilitiesAndQPS implements samplingstore.Store#GetProbabilitiesAndQPS.
func (ss *SamplingStore) GetProbabilitiesAndQPS(start, end time.Time) (map[string][]model.ServiceOperationData, error) {
ss.Lock()
defer ss.Unlock()
ret := make(map[string][]model.ServiceOperationData)
for _, i := range ss.probabilitiesAndQPS {
if i.time.After(start) && (i.time.Before(end) || i.time.Equal(end)) {
probabilitiesAndQPS := make(model.ServiceOperationData)
for svc, opProbabilities := range i.probabilities {
if _, ok := probabilitiesAndQPS[svc]; !ok {
probabilitiesAndQPS[svc] = make(map[string]*model.ProbabilityAndQPS)
}
for op, probability := range opProbabilities {
opQPS := 0.0
if _, ok := i.qps[svc]; ok {
opQPS = i.qps[svc][op]
}
probabilitiesAndQPS[svc][op] = &model.ProbabilityAndQPS{Probability: probability, QPS: opQPS}
}
}
ret[i.hostname] = append(ret[i.hostname], probabilitiesAndQPS)
}
}
return ret, nil
}
114 changes: 114 additions & 0 deletions plugin/storage/memory/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"testing"
"time"

"github.com/crossdock/crossdock-go/assert"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
)

func withPopulatedSamplingStore(f func(samplingStore *SamplingStore)) {
now := time.Now()
millisAfter := now.Add(time.Millisecond * time.Duration(100))
secondsAfter := now.Add(time.Second * time.Duration(2))
throughputs := []*memoryThroughput{
{&model.Throughput{Service: "svc-1", Operation: "op-1", Count: 1}, now},
{&model.Throughput{Service: "svc-1", Operation: "op-2", Count: 1}, millisAfter},
{&model.Throughput{Service: "svc-2", Operation: "op-3", Count: 1}, secondsAfter},
}
pQPS := []*memoryServiceOperationProbabilitiesAndQPS{
{hostname: "guntur38ab8928", probabilities: model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}}, qps: model.ServiceOperationQPS{"svc-1": {"op-1": 10.0}}, time: now},
{hostname: "peta0242ac130003", probabilities: model.ServiceOperationProbabilities{"svc-1": {"op-2": 0.008}}, qps: model.ServiceOperationQPS{"svc-1": {"op-2": 4.0}}, time: millisAfter},
{hostname: "tenali11ec8d3d", probabilities: model.ServiceOperationProbabilities{"svc-2": {"op-3": 0.003}}, qps: model.ServiceOperationQPS{"svc-1": {"op-1": 7.0}}, time: secondsAfter},
}
samplingStore := &SamplingStore{throughputs: throughputs, probabilitiesAndQPS: pQPS}
f(samplingStore)
}

func withMemorySamplingStore(f func(samplingStore *SamplingStore)) {
f(NewSamplingStore())
}

func TestInsertThroughtput(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
throughputs := []*model.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}
assert.NoError(t, samplingStore.InsertThroughput(throughputs))
assert.Equal(t, 2, len(samplingStore.throughputs))
})
}

func TestGetThroughput(t *testing.T) {
withPopulatedSamplingStore(func(samplingStore *SamplingStore) {
start := time.Now()
ret, err := samplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(1)))
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
ret1, _ := samplingStore.GetThroughput(start, start)
assert.Equal(t, 0, len(ret1))
ret2, _ := samplingStore.GetThroughput(start, start.Add(time.Hour*time.Duration(1)))
assert.Equal(t, 2, len(ret2))
})
}

func TestInsertProbabilitiesAndQPS(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("dell11eg843d", model.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, model.ServiceOperationQPS{"new-srv": {"op": 4}}))
assert.Equal(t, 1, len(samplingStore.probabilitiesAndQPS))
// Insert one more
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("lncol73", model.ServiceOperationProbabilities{"my-app": {"hello": 0.3}}, model.ServiceOperationQPS{"new-srv": {"op": 7}}))
assert.Equal(t, 2, len(samplingStore.probabilitiesAndQPS))
})
}

func TestGetLatestProbability(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
// No priod data
ret, err := samplingStore.GetLatestProbabilities()
assert.NoError(t, err)
assert.Empty(t, ret)
})

withPopulatedSamplingStore(func(samplingStore *SamplingStore) {
// With some pregenerated data
ret, err := samplingStore.GetLatestProbabilities()
assert.NoError(t, err)
assert.Equal(t, ret, model.ServiceOperationProbabilities{"svc-2": {"op-3": 0.003}})
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("utfhyolf", model.ServiceOperationProbabilities{"another-service": {"hello": 0.009}}, model.ServiceOperationQPS{"new-srv": {"op": 5}}))
ret, _ = samplingStore.GetLatestProbabilities()
assert.NotEqual(t, ret, model.ServiceOperationProbabilities{"svc-2": {"op-3": 0.003}})
})
}

func TestGetProbabilitiesAndQPS(t *testing.T) {
withPopulatedSamplingStore(func(samplingStore *SamplingStore) {
start := time.Now()
ret, err := samplingStore.GetProbabilitiesAndQPS(start, start.Add(time.Second*time.Duration(1)))
assert.NoError(t, err)
assert.NotEmpty(t, ret)
assert.Len(t, ret, 1)
assert.Equal(t, &model.ProbabilityAndQPS{Probability: 0.008, QPS: 4.0}, ret["peta0242ac130003"][0]["svc-1"]["op-2"])
ret, _ = samplingStore.GetProbabilitiesAndQPS(start, start)
assert.Len(t, ret, 0)
ret, _ = samplingStore.GetProbabilitiesAndQPS(start.Add(time.Second*time.Duration(-1)), start.Add(time.Second*time.Duration(10)))
assert.Len(t, ret, 3)
})
}