Skip to content

fix(client): HTTP sender retries send empty request body #38

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"bytes"
"errors"
"fmt"
"io"
"math"
"math/big"
"strconv"
Expand Down Expand Up @@ -122,20 +121,6 @@ func (b *buffer) writeBigInt(i *big.Int) {
b.Write(s)
}

// WriteTo wraps the built-in bytes.Buffer.WriteTo method
// and writes the contents of the buffer to the provided
// io.Writer
func (b *buffer) WriteTo(w io.Writer) (int64, error) {
n, err := b.Buffer.WriteTo(w)
if err != nil {
b.lastMsgPos -= int(n)
return n, err
}
b.lastMsgPos = 0
b.msgCount = 0
return n, nil
}

func (b *buffer) writeTableName(str string) error {
if str == "" {
return fmt.Errorf("table name cannot be empty: %w", errInvalidMsg)
Expand Down Expand Up @@ -386,6 +371,17 @@ func (b *buffer) prepareForField() bool {
return true
}

func (b *buffer) Bytes() []byte {
return b.Buffer.Bytes()
}

func (b *buffer) Reset() {
b.Buffer.Reset()
b.lastMsgPos = 0
b.msgCount = 0
b.resetMsgFlags()
}

func (b *buffer) DiscardPendingMsg() {
b.Truncate(b.lastMsgPos)
b.resetMsgFlags()
Expand Down
6 changes: 5 additions & 1 deletion http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package questdb

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
Expand Down Expand Up @@ -194,17 +195,20 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
if s.buf.msgCount == 0 {
return nil
}
// Always reset the buffer at the end of flush.
defer s.buf.Reset()

// We rely on the following HTTP client implicit behavior:
// s.buf implements WriteTo method which is used by the client.
req, err = http.NewRequest(
http.MethodPost,
s.uri,
&s.buf,
bytes.NewReader(s.buf.Bytes()),
)
if err != nil {
return err
}
req.ContentLength = int64(s.BufLen())

if s.user != "" && s.pass != "" {
req.SetBasicAuth(s.user, s.pass)
Expand Down
21 changes: 21 additions & 0 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,27 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {
assert.Empty(t, qdb.Messages(sender))
}

func TestSuccessAfterRetries(t *testing.T) {
ctx := context.Background()

srv, err := newTestHttpServer(failFirstThenSendToBackChannel)
assert.NoError(t, err)
defer srv.Close()

sender, err := qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(srv.Addr()), qdb.WithRetryTimeout(time.Minute))
assert.NoError(t, err)
defer sender.Close(ctx)

err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx)
assert.NoError(t, err)

err = sender.Flush(ctx)
assert.NoError(t, err)

expectLines(t, srv.BackCh, []string{fmt.Sprintf("%s,abc=def", testTable)})
assert.Zero(t, qdb.BufLen(sender))
}

func TestBufferClearAfterFlush(t *testing.T) {
ctx := context.Background()

Expand Down
2 changes: 1 addition & 1 deletion integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func setupQuestDB0(ctx context.Context, auth ilpAuthType, setupProxy bool) (*que
return nil, err
}
req := testcontainers.ContainerRequest{
Image: "questdb/questdb:7.3.10",
Image: "questdb/questdb:7.4.2",
ExposedPorts: []string{"9000/tcp", "9009/tcp"},
WaitingFor: wait.ForHTTP("/").WithPort("9000"),
Networks: []string{networkName},
Expand Down
3 changes: 3 additions & 0 deletions tcp_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func (s *tcpLineSender) Flush(ctx context.Context) error {
s.conn.SetWriteDeadline(time.Time{})
}

// Always reset the buffer at the end of flush.
defer s.buf.Reset()

if _, err := s.buf.WriteTo(s.conn); err != nil {
return err
}
Expand Down
43 changes: 30 additions & 13 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"net/http"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -43,11 +44,12 @@ import (
type serverType int64

const (
sendToBackChannel serverType = 0
readAndDiscard serverType = 1
returning500 serverType = 2
returning403 serverType = 3
returning404 serverType = 4
sendToBackChannel serverType = 0
readAndDiscard serverType = 1
returning500 serverType = 2
returning403 serverType = 3
returning404 serverType = 4
failFirstThenSendToBackChannel serverType = 5
)

type testServer struct {
Expand Down Expand Up @@ -186,21 +188,21 @@ func (s *testServer) serveHttp() {
}
}()

var reqs int64
http.Serve(s.tcpListener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var (
err error
)

switch s.serverType {
case sendToBackChannel:
r := bufio.NewReader(r.Body)
var l string
for err == nil {
l, err = r.ReadString('\n')
if err == nil && len(l) > 0 {
lineFeed <- l[0 : len(l)-1]
}
case failFirstThenSendToBackChannel:
if atomic.AddInt64(&reqs, 1) == 1 {
w.WriteHeader(http.StatusInternalServerError)
} else {
err = readAndSendToBackChannel(r, lineFeed)
}
case sendToBackChannel:
err = readAndSendToBackChannel(r, lineFeed)
case readAndDiscard:
_, err = io.Copy(io.Discard, r.Body)
case returning500:
Expand Down Expand Up @@ -232,6 +234,21 @@ func (s *testServer) serveHttp() {
}))
}

func readAndSendToBackChannel(r *http.Request, lineFeed chan string) error {
read := bufio.NewReader(r.Body)
var (
l string
err error
)
for err == nil {
l, err = read.ReadString('\n')
if err == nil && len(l) > 0 {
lineFeed <- l[0 : len(l)-1]
}
}
return err
}

func (s *testServer) Close() {
close(s.closeCh)
s.tcpListener.Close()
Expand Down
Loading