Skip to content

Commit

Permalink
Handle prefixes when listing blocks from S3 and GCS (#3466) (#3502)
Browse files Browse the repository at this point in the history
* Handle prefixes when listing blocks from S3

fixes #3465

* Handle prefixes when listing blocks from GCS

* Add test for prefixes when listing blocks from Azure

* Update unit tests to check for actual block IDs instead of just length of the slices

Cleanup unit tests

* Further refine S3/GCS backend for ListBlocks

Brings logic more in line with Azure object parsing.
Also has the benefit of handling prefixes without a trailing slash.

* Update poller integration test to exercise prefixes

* Update e2e test to exercise prefixes

* Fix format check error

* Fix failing e2e tests

* Remove unnecessary prefix permutations from e2e test

* Remove unnecessary test config file copy

* Ignore lint

---------

Co-authored-by: Zach Leslie <zach.leslie@grafana.com>
(cherry picked from commit 8e6e7fe)

Co-authored-by: Ben Foster <bpfoster@gmail.com>
  • Loading branch information
joe-elliott and bpfoster authored Mar 19, 2024
1 parent ca6818a commit f5ab738
Show file tree
Hide file tree
Showing 12 changed files with 646 additions and 194 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
## main / unreleased

* [BUGFIX] Fix some instances where spanmetrics histograms could be inconsistent [#3412](https://github.com/grafana/tempo/pull/3412) (@mdisibio)
* [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno)
* [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio)
* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio)
* [BUGFIX] Fix metrics query results when series contain empty strings or nil values [#3429](https://github.com/grafana/tempo/issues/3429) (@mdisibio)
* [BUGFIX] Fix metrics query duration check, add per-tenant override for max metrics query duration [#3479](https://github.com/grafana/tempo/issues/3479) (@mdisibio)
* [BUGFIX] Return unfiltered results when a bad TraceQL query is provided in autocomplete. [#3426](https://github.com/grafana/tempo/pull/3426) (@mapno)
* [BUGFIX] Correctly handle 429s in GRPC search streaming. [#3469](https://github.com/grafana/tempo/pull/3469) (@joe-ellitot)
* [BUGFIX] Correctly cancel GRPC and HTTP contexts in the frontend to prevent having to rely on http write timeout. [#3443](https://github.com/grafana/tempo/pull/3443) (@joe-elliott)
* [BUGFIX] Fix compaction/retention in AWS S3 and GCS when a prefix is configured. [#3465](https://github.com/grafana/tempo/issues/3465) (@bpfoster)

## v2.4.0-rc.0

Expand Down
1 change: 1 addition & 0 deletions integration/e2e/config-all-in-one-azurite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ storage:
endpoint_suffix: tempo_e2e-azurite:10000
storage_account_name: "devstoreaccount1"
storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
prefix: {{ .Prefix }}
pool:
max_workers: 10
queue_depth: 100
Expand Down
1 change: 1 addition & 0 deletions integration/e2e/config-all-in-one-gcs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ storage:
bucket_name: tempo
endpoint: https://tempo_e2e-gcs:4443/storage/v1/
insecure: true
prefix: {{ .Prefix }}
pool:
max_workers: 10
queue_depth: 1000
Expand Down
1 change: 1 addition & 0 deletions integration/e2e/config-all-in-one-s3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ storage:
access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey
secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey
insecure: true
prefix: {{ .Prefix }}
pool:
max_workers: 10
queue_depth: 100
Expand Down
140 changes: 82 additions & 58 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,83 +58,103 @@ func TestAllInOne(t *testing.T) {
},
}

storageBackendTestPermutations := []struct {
name string
prefix string
}{
{
name: "no-prefix",
},
{
name: "prefix",
prefix: "a/b/c/",
},
}

for _, tc := range testBackends {
t.Run(tc.name, func(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()
for _, pc := range storageBackendTestPermutations {
t.Run(tc.name+"-"+pc.name, func(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

// set up the backend
cfg := app.Config{}
buff, err := os.ReadFile(tc.configFile)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)
// copy config template to shared directory and expand template variables
tmplConfig := map[string]any{"Prefix": pc.prefix}
configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig)
require.NoError(t, err)

require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))
// set up the backend
cfg := app.Config{}
buff, err := os.ReadFile(configFile)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)

// Get port for the Jaeger gRPC receiver endpoint
c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))
// Get port for the Jaeger gRPC receiver endpoint
c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)

expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)
info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))

// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

// test echo
assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo")
// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))

apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "")
// test echo
// nolint:goconst
assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo")

// query an in-memory trace
queryAndAssertTrace(t, apiClient, info)
apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "")

// wait trace_idle_time and ensure trace is created in ingester
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics))
// query an in-memory trace
queryAndAssertTrace(t, apiClient, info)

// flush trace to backend
callFlush(t, tempo)
// wait trace_idle_time and ensure trace is created in ingester
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics))

// search for trace in backend
util.SearchAndAssertTrace(t, apiClient, info)
util.SearchTraceQLAndAssertTrace(t, apiClient, info)
// flush trace to backend
callFlush(t, tempo)

// sleep
time.Sleep(10 * time.Second)
// search for trace in backend
util.SearchAndAssertTrace(t, apiClient, info)
util.SearchTraceQLAndAssertTrace(t, apiClient, info)

// force clear completed block
callFlush(t, tempo)
// sleep
time.Sleep(10 * time.Second)

// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total"))
// force clear completed block
callFlush(t, tempo)

// query trace - should fetch from backend
queryAndAssertTrace(t, apiClient, info)
// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total"))

// search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0
now := time.Now()
util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
// query trace - should fetch from backend
queryAndAssertTrace(t, apiClient, info)

util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix())
// search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0
now := time.Now()
util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())

// find the trace with streaming. using the http server b/c that's what Grafana will do
grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200))
require.NoError(t, err)
util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix())

util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
})
// find the trace with streaming. using the http server b/c that's what Grafana will do
grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200))
require.NoError(t, err)

util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
})
}
}
}

Expand Down Expand Up @@ -317,16 +337,20 @@ func TestShutdownDelay(t *testing.T) {
require.NoError(t, err)
defer s.Close()

// copy config template to shared directory and expand template variables
tmplConfig := map[string]any{"Prefix": ""}
configFile, err := util.CopyTemplateToSharedDir(s, configAllInOneS3, "config.yaml", tmplConfig)
require.NoError(t, err)

// set up the backend
cfg := app.Config{}
buff, err := os.ReadFile(configAllInOneS3)
buff, err := os.ReadFile(configFile)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)

require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml"))
tempo := util.NewTempoAllInOne("-shutdown-delay=5s")

// this line tests confirms that the readiness flag is up
Expand Down
8 changes: 6 additions & 2 deletions integration/e2e/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,20 @@ func TestOverrides(t *testing.T) {
require.NoError(t, err)
defer s.Close()

// copy config template to shared directory and expand template variables
tmplConfig := map[string]any{"Prefix": ""}
configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig)
require.NoError(t, err)

// set up the backend
cfg := app.Config{}
buff, err := os.ReadFile(tc.configFile)
buff, err := os.ReadFile(configFile)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)

require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

Expand Down
Loading

0 comments on commit f5ab738

Please # to comment.