Skip to content

[DRAFT]: feat: use pooled http transport for grafana client #1982

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
13 changes: 13 additions & 0 deletions controllers/client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"

v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -34,3 +35,15 @@ func GetValueFromSecretKey(ctx context.Context, ref *v1.SecretKeySelector, c cli

return nil, fmt.Errorf("credentials not found in secret: %v/%v", namespace, ref.Name)
}

type syncPool[T any] struct {
sync.Pool
}

func (s *syncPool[T]) Get() T {
return s.Pool.Get().(T) //nolint:errcheck
}

func (s *syncPool[T]) Put(t T) {
s.Pool.Put(t)
}
213 changes: 117 additions & 96 deletions controllers/client/grafana_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"net/url"
"sync"
"time"

genapi "github.com/grafana/grafana-openapi-client-go/client"
Expand All @@ -16,14 +17,119 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
httpTransportPool = syncPool[*http.Transport]{
Pool: sync.Pool{
New: func() interface{} {
return defaultPooledTransport()
},
},
}
genTransportCfgPool = syncPool[*genapi.TransportConfig]{
Pool: sync.Pool{
New: func() interface{} {
return &genapi.TransportConfig{}
},
},
}
)

type grafanaAdminCredentials struct {
username string
password string
apikey string
}

func NewGeneratedGrafanaClient(ctx context.Context, c client.Client, grafana *v1beta1.Grafana) (*genapi.GrafanaHTTPAPI, error) {
var timeout time.Duration
if grafana.Spec.Client != nil && grafana.Spec.Client.TimeoutSeconds != nil {
timeout = time.Duration(*grafana.Spec.Client.TimeoutSeconds)
if timeout < 0 {
timeout = 0
}
} else {
timeout = 10
}

tlsConfig, err := buildTLSConfiguration(ctx, c, grafana)
if err != nil {
return nil, err
}

gURL, err := ParseAdminURL(grafana.Status.AdminURL)
if err != nil {
return nil, err
}

tp := httpTransportPool.Get()
defer func() {
httpTransportPool.Put(tp)
}()
Comment on lines +64 to +67
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is coming from a limited understanding of sync.Pool, so take it with some salt.
But this defer looks "off".

Every example I see for using sync.Pool always finishes the work before returning the object to the pool.
But here the object is returned to the pool before the client and the containing transports are used.
Could another Goroutine Get() the httpTransport and cfg during a preemption outside of NewGeneratedGrafanaClient and overwrite both?

We have multiple controllers making more than one request during a single reconcile resulting in a lot of preemption opportunities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point. Thanks for noticing that. I reckon, I didn't completely check the control flows and solely focused on my use case. Which was maxConcurrentReconcilation:1 for datasources that was simply serialized execution 🤦

I totally forgot about we can set maxConcurrentReconcilation to more or even different control flows from diff resources here.

I will go for a long weekend vacation and will work on this pooling(sync.Pool) more seriously(not the part for http transport that is pretty much necessary) when I back. Will ping u later on.

Thank u so much for spending ur time on the review, appreciated 🙏


transport := newInstrumentedRoundTripper(tp, grafana.IsExternal(), tlsConfig, metrics.GrafanaAPIRequests.MustCurryWith(prometheus.Labels{
"instance_namespace": grafana.Namespace,
"instance_name": grafana.Name,
}))
if grafana.Spec.Client != nil && grafana.Spec.Client.Headers != nil {
transport.(*instrumentedRoundTripper).addHeaders(grafana.Spec.Client.Headers) //nolint:errcheck
}

cred, err := getAdminCredentials(ctx, c, grafana)
if err != nil {
return nil, err
}

cfg := genTransportCfgPool.Get()
defer func() {
genTransportCfgPool.Put(cfg)
}()

cfg.Schemes = []string{gURL.Scheme}
cfg.BasePath = gURL.Path
cfg.Host = gURL.Host
cfg.APIKey = cred.apikey
cfg.TLSConfig = tlsConfig
cfg.Client = &http.Client{
Transport: transport,
Timeout: timeout * time.Second,
}
if cred.username != "" {
cfg.BasicAuth = url.UserPassword(cred.username, cred.password)
}

cl := genapi.NewHTTPClientWithConfig(nil, cfg)
return cl, nil
}

func InjectAuthHeaders(ctx context.Context, c client.Client, grafana *v1beta1.Grafana, req *http.Request) error {
creds, err := getAdminCredentials(ctx, c, grafana)
if err != nil {
return fmt.Errorf("fetching admin credentials: %w", err)
}
if creds.apikey != "" {
req.Header.Add("Authorization", "Bearer "+creds.apikey)
} else {
req.SetBasicAuth(creds.username, creds.password)
}
return nil
}

func ParseAdminURL(adminURL string) (*url.URL, error) {
gURL, err := url.Parse(adminURL)
if err != nil {
return nil, fmt.Errorf("parsing url for client: %w", err)
}

if gURL.Host == "" {
return nil, fmt.Errorf("invalid Grafana adminURL, url must contain protocol and host")
}

gURL = gURL.JoinPath("/api")
return gURL, nil
}

func getAdminCredentials(ctx context.Context, c client.Client, grafana *v1beta1.Grafana) (*grafanaAdminCredentials, error) {
credentials := &grafanaAdminCredentials{}
cred := &grafanaAdminCredentials{}

if grafana.IsExternal() {
// prefer api key if present
Expand All @@ -32,8 +138,8 @@ func getAdminCredentials(ctx context.Context, c client.Client, grafana *v1beta1.
if err != nil {
return nil, err
}
credentials.apikey = string(apikey)
return credentials, nil
cred.apikey = string(apikey)
return cred, nil
}

// rely on username and password otherwise
Expand All @@ -47,9 +153,9 @@ func getAdminCredentials(ctx context.Context, c client.Client, grafana *v1beta1.
return nil, err
}

credentials.username = string(username)
credentials.password = string(password)
return credentials, nil
cred.username = string(username)
cred.password = string(password)
return cred, nil
}

deployment := model.GetGrafanaDeployment(grafana, nil)
Expand All @@ -67,7 +173,7 @@ func getAdminCredentials(ctx context.Context, c client.Client, grafana *v1beta1.
for _, env := range container.Env {
if env.Name == config.GrafanaAdminUserEnvVar {
if env.Value != "" {
credentials.username = env.Value
cred.username = env.Value
continue
}

Expand All @@ -77,13 +183,13 @@ func getAdminCredentials(ctx context.Context, c client.Client, grafana *v1beta1.
if err != nil {
return nil, err
}
credentials.username = string(usernameFromSecret)
cred.username = string(usernameFromSecret)
}
}
}
if env.Name == config.GrafanaAdminPasswordEnvVar {
if env.Value != "" {
credentials.password = env.Value
cred.password = env.Value
continue
}

Expand All @@ -93,96 +199,11 @@ func getAdminCredentials(ctx context.Context, c client.Client, grafana *v1beta1.
if err != nil {
return nil, err
}
credentials.password = string(passwordFromSecret)
cred.password = string(passwordFromSecret)
}
}
}
}
}
return credentials, nil
}

func InjectAuthHeaders(ctx context.Context, c client.Client, grafana *v1beta1.Grafana, req *http.Request) error {
creds, err := getAdminCredentials(ctx, c, grafana)
if err != nil {
return fmt.Errorf("fetching admin credentials: %w", err)
}
if creds.apikey != "" {
req.Header.Add("Authorization", "Bearer "+creds.apikey)
} else {
req.SetBasicAuth(creds.username, creds.password)
}
return nil
}

func ParseAdminURL(adminURL string) (*url.URL, error) {
gURL, err := url.Parse(adminURL)
if err != nil {
return nil, fmt.Errorf("parsing url for client: %w", err)
}

if gURL.Host == "" {
return nil, fmt.Errorf("invalid Grafana adminURL, url must contain protocol and host")
}

gURL = gURL.JoinPath("/api")
return gURL, nil
}

func NewGeneratedGrafanaClient(ctx context.Context, c client.Client, grafana *v1beta1.Grafana) (*genapi.GrafanaHTTPAPI, error) {
var timeout time.Duration
if grafana.Spec.Client != nil && grafana.Spec.Client.TimeoutSeconds != nil {
timeout = time.Duration(*grafana.Spec.Client.TimeoutSeconds)
if timeout < 0 {
timeout = 0
}
} else {
timeout = 10
}

tlsConfig, err := buildTLSConfiguration(ctx, c, grafana)
if err != nil {
return nil, err
}

gURL, err := ParseAdminURL(grafana.Status.AdminURL)
if err != nil {
return nil, err
}

transport := NewInstrumentedRoundTripper(grafana.IsExternal(), tlsConfig, metrics.GrafanaAPIRequests.MustCurryWith(prometheus.Labels{
"instance_namespace": grafana.Namespace,
"instance_name": grafana.Name,
}))
if grafana.Spec.Client != nil && grafana.Spec.Client.Headers != nil {
transport.(*instrumentedRoundTripper).addHeaders(grafana.Spec.Client.Headers) //nolint:errcheck
}

// Secrets and ConfigMaps are not cached by default, get credentials as the last step.
credentials, err := getAdminCredentials(ctx, c, grafana)
if err != nil {
return nil, err
}

cfg := &genapi.TransportConfig{
Schemes: []string{gURL.Scheme},
BasePath: gURL.Path,
Host: gURL.Host,
// APIKey is an optional API key or service account token.
APIKey: credentials.apikey,
// NumRetries contains the optional number of attempted retries
NumRetries: 0,
TLSConfig: tlsConfig,
Client: &http.Client{
Transport: transport,
Timeout: timeout * time.Second,
},
}
if credentials.username != "" {
cfg.BasicAuth = url.UserPassword(credentials.username, credentials.password)
}

cl := genapi.NewHTTPClientWithConfig(nil, cfg)

return cl, nil
return cred, nil
}
60 changes: 60 additions & 0 deletions controllers/client/grafana_client_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package client

import (
"context"
"testing"

"github.com/grafana/grafana-operator/v5/api/v1beta1"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type fakeClient struct {
client.Client
}

func (c fakeClient) Get(_ context.Context, _ client.ObjectKey, ref client.Object, _ ...client.GetOption) error {
s := ref.(*v1.Secret) //nolint:errcheck
s.Data = map[string][]byte{
"fake": []byte("something"),
}
return nil
}

func Benchmark_GenGrafanaClient(b *testing.B) {
ctx := context.TODO()
var (
fc fakeClient
grf = &v1beta1.Grafana{
Spec: v1beta1.GrafanaSpec{
External: &v1beta1.External{
APIKey: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{
Name: "secret",
},
Key: "fake",
},
TLS: &v1beta1.TLSConfig{
InsecureSkipVerify: true,
},
},
},
Status: v1beta1.GrafanaStatus{
AdminURL: "https://grafana.example.com",
},
}
)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
cl, err := NewGeneratedGrafanaClient(ctx, fc, grf)
if err != nil {
b.Fatal(err)
}
if cl == nil {
b.Fatal("client is nil")
}
}
}
21 changes: 21 additions & 0 deletions controllers/client/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"net/http"
"runtime"
"time"

"github.com/grafana/grafana-operator/v5/api/v1beta1"
Expand Down Expand Up @@ -40,3 +41,23 @@ func NewHTTPClient(ctx context.Context, c client.Client, grafana *v1beta1.Grafan
Timeout: time.Second * timeout,
}, nil
}

// defaultTransport returns a new http.Transport with similar default values to
// http.DefaultTransport, but with idle connections and keepalives disabled.
func defaultTransport() *http.Transport {
tp := defaultPooledTransport()
tp.DisableKeepAlives = true
tp.MaxIdleConnsPerHost = -1
return tp
}

// defaultPooledTransport returns a new http.Transport with similar default
// values to http.DefaultTransport. Do not use this for transient transports as
// it can leak file descriptors over time. Only use this for transports that
// will be re-used for the same host(s).
func defaultPooledTransport() *http.Transport {
tp := http.DefaultTransport.(*http.Transport).Clone() //nolint:errcheck
tp.DisableKeepAlives = false
tp.MaxIdleConnsPerHost = runtime.GOMAXPROCS(0) + 1
return tp
}
Loading
Loading