Skip to content

Commit debed67

Browse files
committed
Add context to logger
This commit adds a `LoggerWithContext` interface that extends the `Logger` interface by a method `PrintfWithContext` that, when implemented, is called instead of the `Printf` method of the `Logger` interface. The purpose of `PrintfWithContext` is to receive the current context under which the logging happens. Notice that this doesn't always have to be request-scoped, i.e. an actual API call from a user. It may also be from an internal state or process, e.g. Bulk processor or node health. Close #1541
1 parent 9e03315 commit debed67

File tree

3 files changed

+52
-29
lines changed

3 files changed

+52
-29
lines changed

bulk_processor.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ func (w *bulkWorker) work(ctx context.Context) {
503503
w.flushAckC <- struct{}{}
504504
}
505505
if err != nil {
506-
w.p.c.errorf("elastic: bulk processor %q was unable to perform work: %v", w.p.name, err)
506+
w.p.c.errorf(ctx, "elastic: bulk processor %q was unable to perform work: %v", w.p.name, err)
507507
if !stop {
508508
waitForActive := func() {
509509
// Add back pressure to prevent Add calls from filling up the request queue
@@ -556,7 +556,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
556556
}
557557
// notifyFunc will be called if retry fails
558558
notifyFunc := func(err error) {
559-
w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
559+
w.p.c.errorf(ctx, "elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
560560
}
561561

562562
id := atomic.AddInt64(&w.p.executionId, 1)
@@ -580,7 +580,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
580580
err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
581581
w.updateStats(res)
582582
if err != nil {
583-
w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
583+
w.p.c.errorf(ctx, "elastic: bulk processor %q failed: %v", w.p.name, err)
584584
}
585585

586586
// Invoke after callback
@@ -599,14 +599,14 @@ func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {
599599

600600
client := w.p.c
601601
stopReconnC := w.p.stopReconnC
602-
w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)
602+
w.p.c.errorf(context.Background(), "elastic: bulk processor %q is waiting for an active connection", w.p.name)
603603

604604
// loop until a health check finds at least 1 active connection or the reconnection channel is closed
605605
for {
606606
select {
607607
case _, ok := <-stopReconnC:
608608
if !ok {
609-
w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
609+
w.p.c.errorf(context.Background(), "elastic: bulk processor %q active connection check interrupted", w.p.name)
610610
return
611611
}
612612
case <-t.C:

client.go

+39-24
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626

2727
const (
2828
// Version is the current version of Elastic.
29-
Version = "7.0.29"
29+
Version = "7.0.30"
3030

3131
// DefaultURL is the default endpoint of Elasticsearch on the local machine.
3232
// It is used e.g. when initializing a new Client without a specific URL.
@@ -84,6 +84,9 @@ const (
8484
)
8585

8686
var (
87+
// nilByte is used in JSON marshal/unmarshal
88+
nilByte = []byte("null")
89+
8790
// ErrNoClient is raised when no Elasticsearch node is available.
8891
ErrNoClient = errors.New("no Elasticsearch node available")
8992

@@ -798,7 +801,7 @@ func (c *Client) Start() {
798801
c.running = true
799802
c.mu.Unlock()
800803

801-
c.infof("elastic: client started")
804+
c.infof(context.Background(), "elastic: client started")
802805
}
803806

804807
// Stop stops the background processes that the client is running,
@@ -828,27 +831,39 @@ func (c *Client) Stop() {
828831
c.running = false
829832
c.mu.Unlock()
830833

831-
c.infof("elastic: client stopped")
834+
c.infof(context.Background(), "elastic: client stopped")
832835
}
833836

834837
// errorf logs to the error log.
835-
func (c *Client) errorf(format string, args ...interface{}) {
838+
func (c *Client) errorf(ctx context.Context, format string, args ...interface{}) {
836839
if c.errorlog != nil {
837-
c.errorlog.Printf(format, args...)
840+
if logger, ok := c.errorlog.(LoggerWithContext); ok {
841+
logger.PrintfWithContext(ctx, format, args...)
842+
} else {
843+
c.errorlog.Printf(format, args...)
844+
}
838845
}
839846
}
840847

841848
// infof logs informational messages.
842-
func (c *Client) infof(format string, args ...interface{}) {
849+
func (c *Client) infof(ctx context.Context, format string, args ...interface{}) {
843850
if c.infolog != nil {
844-
c.infolog.Printf(format, args...)
851+
if logger, ok := c.infolog.(LoggerWithContext); ok {
852+
logger.PrintfWithContext(ctx, format, args...)
853+
} else {
854+
c.infolog.Printf(format, args...)
855+
}
845856
}
846857
}
847858

848859
// tracef logs to the trace log.
849-
func (c *Client) tracef(format string, args ...interface{}) {
860+
func (c *Client) tracef(ctx context.Context, format string, args ...interface{}) {
850861
if c.tracelog != nil {
851-
c.tracelog.Printf(format, args...)
862+
if logger, ok := c.tracelog.(LoggerWithContext); ok {
863+
logger.PrintfWithContext(ctx, format, args...)
864+
} else {
865+
c.tracelog.Printf(format, args...)
866+
}
852867
}
853868
}
854869

@@ -857,7 +872,7 @@ func (c *Client) dumpRequest(r *http.Request) {
857872
if c.tracelog != nil {
858873
out, err := httputil.DumpRequestOut(r, true)
859874
if err == nil {
860-
c.tracef("%s\n", string(out))
875+
c.tracef(r.Context(), "%s\n", string(out))
861876
}
862877
}
863878
}
@@ -867,7 +882,7 @@ func (c *Client) dumpResponse(resp *http.Response) {
867882
if c.tracelog != nil {
868883
out, err := httputil.DumpResponse(resp, true)
869884
if err == nil {
870-
c.tracef("%s\n", string(out))
885+
c.tracef(context.Background(), "%s\n", string(out))
871886
}
872887
}
873888
}
@@ -1055,7 +1070,7 @@ func (c *Client) updateConns(conns []*conn) {
10551070
}
10561071
if !found {
10571072
// New connection didn't exist, so add it to our list of new conns.
1058-
c.infof("elastic: %s joined the cluster", conn.URL())
1073+
c.infof(context.Background(), "elastic: %s joined the cluster", conn.URL())
10591074
newConns = append(newConns, conn)
10601075
}
10611076
}
@@ -1147,19 +1162,19 @@ func (c *Client) healthcheck(parentCtx context.Context, timeout time.Duration, f
11471162
// Wait for the Goroutine (or its timeout)
11481163
select {
11491164
case <-ctx.Done(): // timeout
1150-
c.errorf("elastic: %s is dead", conn.URL())
1165+
c.errorf(ctx, "elastic: %s is dead", conn.URL())
11511166
conn.MarkAsDead()
11521167
case err := <-errc:
11531168
if err != nil {
1154-
c.errorf("elastic: %s is dead", conn.URL())
1169+
c.errorf(ctx, "elastic: %s is dead", conn.URL())
11551170
conn.MarkAsDead()
11561171
break
11571172
}
11581173
if status >= 200 && status < 300 {
11591174
conn.MarkAsAlive()
11601175
} else {
11611176
conn.MarkAsDead()
1162-
c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
1177+
c.errorf(ctx, "elastic: %s is dead [status=%d]", conn.URL(), status)
11631178
}
11641179
}
11651180
}
@@ -1256,7 +1271,7 @@ func (c *Client) next() (*conn, error) {
12561271
// So we are marking them as alive--if sniffing is disabled.
12571272
// They'll then be picked up in the next call to PerformRequest.
12581273
if !c.snifferEnabled {
1259-
c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
1274+
c.errorf(context.Background(), "elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
12601275
for _, conn := range c.conns {
12611276
conn.MarkAsAlive()
12621277
}
@@ -1380,13 +1395,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
13801395
continue // try again
13811396
}
13821397
if err != nil {
1383-
c.errorf("elastic: cannot get connection from pool")
1398+
c.errorf(ctx, "elastic: cannot get connection from pool")
13841399
return nil, err
13851400
}
13861401

13871402
req, err = NewRequest(opt.Method, conn.URL()+pathWithParams)
13881403
if err != nil {
1389-
c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
1404+
c.errorf(ctx, "elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
13901405
return nil, err
13911406
}
13921407
if basicAuth {
@@ -1415,7 +1430,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
14151430
if opt.Body != nil {
14161431
err = req.SetBody(opt.Body, gzipEnabled)
14171432
if err != nil {
1418-
c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err)
1433+
c.errorf(ctx, "elastic: couldn't set body %+v for request: %v", opt.Body, err)
14191434
return nil, err
14201435
}
14211436
}
@@ -1433,12 +1448,12 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
14331448
n++
14341449
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
14351450
if rerr != nil {
1436-
c.errorf("elastic: %s is dead", conn.URL())
1451+
c.errorf(ctx, "elastic: %s is dead", conn.URL())
14371452
conn.MarkAsDead()
14381453
return nil, rerr
14391454
}
14401455
if !ok {
1441-
c.errorf("elastic: %s is dead", conn.URL())
1456+
c.errorf(ctx, "elastic: %s is dead", conn.URL())
14421457
conn.MarkAsDead()
14431458
return nil, err
14441459
}
@@ -1450,7 +1465,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
14501465
n++
14511466
wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
14521467
if rerr != nil {
1453-
c.errorf("elastic: %s is dead", conn.URL())
1468+
c.errorf(ctx, "elastic: %s is dead", conn.URL())
14541469
conn.MarkAsDead()
14551470
return nil, rerr
14561471
}
@@ -1473,7 +1488,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
14731488
if len(res.Header["Warning"]) > 0 {
14741489
c.deprecationlog((*http.Request)(req), res)
14751490
for _, warning := range res.Header["Warning"] {
1476-
c.errorf("Deprecation warning: %s", warning)
1491+
c.errorf(ctx, "Deprecation warning: %s", warning)
14771492
}
14781493
}
14791494

@@ -1497,7 +1512,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions)
14971512
}
14981513

14991514
duration := time.Now().UTC().Sub(start)
1500-
c.infof("%s %s [status:%d, request:%.3fs]",
1515+
c.infof(ctx, "%s %s [status:%d, request:%.3fs]",
15011516
strings.ToUpper(opt.Method),
15021517
req.URL,
15031518
resp.StatusCode,

logger.go

+8
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,15 @@
44

55
package elastic
66

7+
import "context"
8+
79
// Logger specifies the interface for all log operations.
810
type Logger interface {
911
Printf(format string, v ...interface{})
1012
}
13+
14+
// LoggerWithContext extends the Logger interface by a context.
15+
type LoggerWithContext interface {
16+
Logger
17+
PrintfWithContext(ctx context.Context, format string, v ...interface{})
18+
}

0 commit comments

Comments
 (0)