Skip to content

Commit 4776744

Browse files
authored
fix(client): broken HTTP sender retries (#39)
1 parent a562658 commit 4776744

File tree

4 files changed

+47
-35
lines changed

4 files changed

+47
-35
lines changed

http_sender.go

+20-24
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,7 @@ func (s *httpLineSender) Flush(ctx context.Context) error {
171171

172172
func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
173173
var (
174-
req *http.Request
175-
retryInterval time.Duration
176-
174+
retryInterval time.Duration
177175
maxRetryInterval = time.Second
178176
)
179177

@@ -198,25 +196,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
198196
// Always reset the buffer at the end of flush.
199197
defer s.buf.Reset()
200198

201-
// We rely on the following HTTP client implicit behavior:
202-
// s.buf implements WriteTo method which is used by the client.
203-
req, err = http.NewRequest(
204-
http.MethodPost,
205-
s.uri,
206-
bytes.NewReader(s.buf.Bytes()),
207-
)
208-
if err != nil {
209-
return err
210-
}
211-
req.ContentLength = int64(s.BufLen())
212-
213-
if s.user != "" && s.pass != "" {
214-
req.SetBasicAuth(s.user, s.pass)
215-
} else if s.token != "" {
216-
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token))
217-
}
218-
219-
retry, err := s.makeRequest(ctx, req)
199+
retry, err := s.makeRequest(ctx)
220200
if !retry {
221201
s.refreshFlushDeadline(err)
222202
return err
@@ -234,7 +214,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
234214
jitter := time.Duration(rand.Intn(10)) * time.Millisecond
235215
time.Sleep(retryInterval + jitter)
236216

237-
retry, err = s.makeRequest(ctx, req)
217+
retry, err = s.makeRequest(ctx)
238218
if !retry {
239219
s.refreshFlushDeadline(err)
240220
return err
@@ -358,7 +338,23 @@ func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
358338
}
359339

360340
// makeRequest returns a boolean if we need to retry the request
361-
func (s *httpLineSender) makeRequest(ctx context.Context, req *http.Request) (bool, error) {
341+
func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) {
342+
req, err := http.NewRequest(
343+
http.MethodPost,
344+
s.uri,
345+
bytes.NewReader(s.buf.Bytes()),
346+
)
347+
if err != nil {
348+
return false, err
349+
}
350+
req.ContentLength = int64(s.BufLen())
351+
352+
if s.user != "" && s.pass != "" {
353+
req.SetBasicAuth(s.user, s.pass)
354+
} else if s.token != "" {
355+
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token))
356+
}
357+
362358
// reqTimeout = ( request.len() / min_throughput ) + request_timeout
363359
// nb: conversion from int to time.Duration is in milliseconds
364360
reqTimeout := time.Duration(s.buf.Len()/s.minThroughputBytesPerSecond)*time.Second + s.requestTimeout

http_sender_test.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -580,17 +580,28 @@ func TestSuccessAfterRetries(t *testing.T) {
580580
assert.NoError(t, err)
581581
defer srv.Close()
582582

583-
sender, err := qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(srv.Addr()), qdb.WithRetryTimeout(time.Minute))
583+
sender, err := qdb.NewLineSender(
584+
ctx, qdb.WithHttp(),
585+
qdb.WithAddress(srv.Addr()),
586+
qdb.WithAutoFlushDisabled(),
587+
qdb.WithRetryTimeout(time.Minute),
588+
)
584589
assert.NoError(t, err)
585590
defer sender.Close(ctx)
586591

587-
err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx)
588-
assert.NoError(t, err)
592+
for i := 0; i < 10; i++ {
593+
err = sender.Table(testTable).Int64Column("foobar", int64(i)).AtNow(ctx)
594+
assert.NoError(t, err)
595+
}
589596

590597
err = sender.Flush(ctx)
591598
assert.NoError(t, err)
592599

593-
expectLines(t, srv.BackCh, []string{fmt.Sprintf("%s,abc=def", testTable)})
600+
expected := make([]string, 0)
601+
for i := 0; i < 10; i++ {
602+
expected = append(expected, fmt.Sprintf("%s foobar=%di", testTable, i))
603+
}
604+
expectLines(t, srv.BackCh, expected)
594605
assert.Zero(t, qdb.BufLen(sender))
595606
}
596607

sender.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ type LineSender interface {
8383
// TimestampColumn adds a timestamp column value to the ILP
8484
// message.
8585
//
86+
// Should be used only for non-designated timestamp column.
87+
// Designated timestamp column values should be passed to At/AtNow.
88+
//
8689
// Column name cannot contain any of the following characters:
8790
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
8891
// '-', '*' '%%', '~', or a non-printable char.
@@ -110,8 +113,8 @@ type LineSender interface {
110113
// '-', '*' '%%', '~', or a non-printable char.
111114
BoolColumn(name string, val bool) LineSender
112115

113-
// At sets the timestamp in Epoch nanoseconds and finalizes
114-
// the ILP message.
116+
// At sets the designated timestamp value and finalizes the ILP
117+
// message.
115118
//
116119
// If the underlying buffer reaches configured capacity or the
117120
// number of buffered messages exceeds the auto-flush trigger, this
@@ -120,9 +123,9 @@ type LineSender interface {
120123
// If ts.IsZero(), no timestamp is sent to the server.
121124
At(ctx context.Context, ts time.Time) error
122125

123-
// AtNow omits the timestamp and finalizes the ILP message.
124-
// The server will insert each message using the system clock
125-
// as the row timestamp.
126+
// AtNow omits designated timestamp value and finalizes the ILP
127+
// message. The server will insert each message using the system
128+
// clock as the row timestamp.
126129
//
127130
// If the underlying buffer reaches configured capacity or the
128131
// number of buffered messages exceeds the auto-flush trigger, this

utils_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func newTestServerWithProtocol(serverType serverType, protocol string) (*testSer
8282
addr: tcp.Addr().String(),
8383
tcpListener: tcp,
8484
serverType: serverType,
85-
BackCh: make(chan string, 5),
85+
BackCh: make(chan string, 1000),
8686
closeCh: make(chan struct{}),
8787
}
8888

@@ -197,6 +197,8 @@ func (s *testServer) serveHttp() {
197197
switch s.serverType {
198198
case failFirstThenSendToBackChannel:
199199
if atomic.AddInt64(&reqs, 1) == 1 {
200+
// Consume request body.
201+
_, err = io.Copy(io.Discard, r.Body)
200202
w.WriteHeader(http.StatusInternalServerError)
201203
} else {
202204
err = readAndSendToBackChannel(r, lineFeed)
@@ -265,5 +267,5 @@ func expectLines(t *testing.T, linesCh chan string, expected []string) {
265267
return false
266268
}
267269
return reflect.DeepEqual(expected, actual)
268-
}, 3*time.Second, 100*time.Millisecond)
270+
}, 10*time.Second, 100*time.Millisecond)
269271
}

0 commit comments

Comments
 (0)