Skip to content

AppendService should retry on JOURNAL_NOT_FOUND #359

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions broker/client/append_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
// pipelined and batched to amortize the cost of broker Append RPCs. It may
// also simplify implementations for clients who would prefer to simply have
// writes block until successfully committed, as opposed to handling errors
// and retries themselves. AppendService will retry all errors except for
// context cancellation and ErrJournalNotFound.
// and retries themselves.
//
// For each journal, AppendService manages an ordered list of AsyncAppends,
// each having buffered content to be appended. The list is dispatched in
Expand Down Expand Up @@ -279,8 +278,7 @@ func (p *AsyncAppend) Writer() *bufio.Writer { return p.fb.buf }
// also roll back any writes queued by the caller, aborting the append
// transaction. Require is valid for use only until Release is called.
// Require returns itself, allowing uses like:
//
// Require(maybeErrors()).Release()
// Require(maybeErrors()).Release()
func (p *AsyncAppend) Require(err error) *AsyncAppend {
if err != nil && p.op.err == nil {
p.op.err = err
Expand Down Expand Up @@ -394,7 +392,7 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) {
err2 = aa.app.Close()
}

if err2 == context.Canceled || err2 == context.DeadlineExceeded || err2 == ErrJournalNotFound {
if err2 == context.Canceled || err2 == context.DeadlineExceeded {
err = err2
return nil // Break retry loop.
} else if err2 != nil {
Expand Down
18 changes: 0 additions & 18 deletions broker/client/append_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,6 @@ func (s *AppendServiceSuite) TestBasicAppendWithRetry(c *gc.C) {
aaNext.mu.Unlock()

c.Check(as.PendingExcept(""), gc.HasLen, 0)

// Case: broker responds with a terminal JOURNAL_NOT_FOUND error.
aa = as.StartAppend(pb.AppendRequest{Journal: "a/journal"}, nil)
_, _ = aa.Writer().WriteString("hello, world")
c.Assert(aa.Release(), gc.IsNil)

readHelloWorldAppendRequest(c, broker) // RPC is dispatched to broker.
broker.AppendRespCh <- buildNotFoundFixture(broker)

c.Check(aa.Err(), gc.Equals, ErrJournalNotFound)
c.Check(aa.Response().Status, gc.DeepEquals, pb.Status_JOURNAL_NOT_FOUND)
}

func (s *AppendServiceSuite) TestAppendPipelineWithAborts(c *gc.C) {
Expand Down Expand Up @@ -622,11 +611,4 @@ func buildAppendResponseFixture(ep interface{ Endpoint() pb.Endpoint }) pb.Appen
}
}

func buildNotFoundFixture(ep interface{ Endpoint() pb.Endpoint }) pb.AppendResponse {
return pb.AppendResponse{
Status: pb.Status_JOURNAL_NOT_FOUND,
Header: *buildHeaderFixture(ep),
}
}

var _ = gc.Suite(&AppendServiceSuite{})