Skip to content

Commit

Permalink
fix(server): fail pending request when quorumAckTracker closed (#608)
Browse files Browse the repository at this point in the history
### Motivation

This is the follow-up PR for #607. We should fail the pending request
when `quorumAckTracker` closes.

### Modification

- Introduce a callback
- fail pending request when `quorumAckTracker` closes.
  • Loading branch information
mattisonchao authored Feb 21, 2025
1 parent d9a4781 commit 0acbcaa
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 77 deletions.
38 changes: 38 additions & 0 deletions common/callback/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2025 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package callback

// Callback defines an interface for handling the completion of an asynchronous operation.
// It allows the caller to notify the system when an operation has completed successfully
// or when it has failed with an error. The interface provides two methods for handling
// both success and error cases.
//
// The generic type T represents the result type of the operation, which can vary depending
// on the specific use case.
//
// Methods:
//
// - Complete(t T): This method is called when the asynchronous operation completes successfully.
// It accepts a result of type T, which is the outcome of the operation.
//
// - CompleteError(err error): This method is called when the asynchronous operation fails.
// It accepts an error, which indicates the reason for the failure.
type Callback[T any] interface {
// Complete is invoked when the operation completes successfully with the result 't' of type T.
Complete(t T)

// CompleteError is invoked when the operation fails, providing an error 'err' indicating the failure reason.
CompleteError(err error)
}
63 changes: 63 additions & 0 deletions common/callback/once.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2025 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package callback

import "sync/atomic"

// Once ensures that a specific callback is executed only once, either on completion or on error.
// It prevents further execution of the callbacks after the first call and ensures atomicity
// of the operation using the atomic package.
//
// The generic type T represents the result type of the operation, and the callbacks
// provide the behavior for handling success or failure of the operation.
//
// Fields:
// - OnComplete: A function that gets called with the result of type T when the operation completes successfully.
// - OnCompleteError: A function that gets called with an error if the operation fails.
// - completed: An atomic boolean used to track if the operation has already completed, ensuring only one callback is executed.

type Once[T any] struct {
OnComplete func(t T) // Callback function called on successful completion
OnCompleteError func(err error) // Callback function called when an error occurs
completed atomic.Bool // Atomic flag to track completion status
}

// Complete is called to notify that the operation has completed successfully with the result 't'.
// It ensures that the 'OnComplete' callback is only called once.
func (c *Once[T]) Complete(t T) {
if !c.completed.CompareAndSwap(false, true) {
return
}
c.OnComplete(t)
}

// CompleteError is called to notify that the operation has failed with an error 'err'.
// It ensures that the 'OnCompleteError' callback is only called once.
func (c *Once[T]) CompleteError(err error) {
if !c.completed.CompareAndSwap(false, true) {
return
}
c.OnCompleteError(err)
}

// NewOnce creates a new instance of Once with the provided success and error callbacks.
// It ensures that the callbacks are invoked only once, either for success or failure.
func NewOnce[T any](onComplete func(t T), onError func(err error)) Callback[T] {
return &Once[T]{
onComplete,
onError,
atomic.Bool{},
}
}
100 changes: 100 additions & 0 deletions common/callback/once_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2025 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2025 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package callback

import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
)

func Test_Once_Complete_Concurrent(t *testing.T) {
callbackCounter := atomic.Int32{}
onceCallback := NewOnce[any](
func(t any) {
callbackCounter.Add(1)
},
func(err error) {
callbackCounter.Add(1)
})

group := sync.WaitGroup{}
for i := 0; i < 5; i++ {
group.Add(1)
go func() {
if i%2 == 0 {
onceCallback.Complete(nil)
} else {
onceCallback.CompleteError(errors.New("error"))
}
group.Done()
}()
}

group.Wait()
assert.Equal(t, int32(1), callbackCounter.Load())
}

func Test_Once_Complete(t *testing.T) {
var callbackError error
var callbackValue int32

onceCallback := NewOnce[int32](
func(t int32) {
callbackValue = t
},
func(err error) {
callbackError = err
})

onceCallback.Complete(1)

assert.Nil(t, callbackError)
assert.Equal(t, int32(1), callbackValue)
}

func Test_Once_Complete_Error(t *testing.T) {
var callbackError error
var callbackValue *int32

onceCallback := NewOnce[int32](
func(t int32) {
callbackValue = &t
},
func(err error) {
callbackError = err
})

e1 := errors.New("error")
onceCallback.CompleteError(e1)

assert.Equal(t, e1, callbackError)
assert.Nil(t, callbackValue)
}
2 changes: 1 addition & 1 deletion common/error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
ErrorInvalidTerm = status.Error(CodeInvalidTerm, "oxia: invalid term")
ErrorInvalidStatus = status.Error(CodeInvalidStatus, "oxia: invalid status")
ErrorLeaderAlreadyConnected = status.Error(CodeLeaderAlreadyConnected, "oxia: leader is already connected")
ErrorAlreadyClosed = status.Error(CodeAlreadyClosed, "oxia: node is shutting down")
ErrorAlreadyClosed = status.Error(CodeAlreadyClosed, "oxia: resource is already closed")
ErrorNodeIsNotLeader = status.Error(CodeNodeIsNotLeader, "oxia: node is not leader for shard")
ErrorNodeIsNotFollower = status.Error(CodeNodeIsNotFollower, "oxia: node is not follower for shard")
ErrorInvalidSession = status.Error(CodeInvalidSession, "oxia: session not found")
Expand Down
54 changes: 29 additions & 25 deletions server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"fmt"
"github.com/streamnative/oxia/common/callback"
"io"
"log/slog"
"sync"
Expand Down Expand Up @@ -349,7 +350,7 @@ func (lc *leaderController) BecomeLeader(ctx context.Context, req *proto.BecomeL
// committed in the quorum, to avoid missing any entries in the DB
// by the moment we make the leader controller accepting new write/read
// requests
if _, err = lc.quorumAckTracker.WaitForCommitOffset(ctx, lc.leaderElectionHeadEntryId.Offset, nil); err != nil {
if err = lc.quorumAckTracker.WaitForCommitOffset(ctx, lc.leaderElectionHeadEntryId.Offset); err != nil {
return nil, err
}

Expand Down Expand Up @@ -796,10 +797,11 @@ func (lc *leaderController) write(ctx context.Context, request func(int64) *prot
return wal.InvalidOffset, nil, err
}

resp, err := lc.quorumAckTracker.WaitForCommitOffset(ctx, newOffset, func() (*proto.WriteResponse, error) {
return lc.db.ProcessWrite(actualRequest, newOffset, timestamp, WrapperUpdateOperationCallback)
})
return newOffset, resp, err
if err := lc.quorumAckTracker.WaitForCommitOffset(ctx, newOffset); err != nil {
return wal.InvalidOffset, nil, err
}
writeResponse, err := lc.db.ProcessWrite(actualRequest, newOffset, timestamp, WrapperUpdateOperationCallback)
return newOffset, writeResponse, err
}

func (lc *leaderController) appendToWal(ctx context.Context, request func(int64) *proto.WriteRequest) (actualRequest *proto.WriteRequest, offset int64, timestamp uint64, err error) {
Expand Down Expand Up @@ -914,31 +916,33 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS
return
}

lc.quorumAckTracker.WaitForCommitOffsetAsync(offset, func() (*proto.WriteResponse, error) {
return lc.db.ProcessWrite(req, offset, timestamp, WrapperUpdateOperationCallback)
}, func(response *proto.WriteResponse, err error) {
if err != nil {
timer.Done()
sendNonBlocking(closeCh, err)
return
}

if err = stream.Send(response); err != nil {
timer.Done()
lc.quorumAckTracker.WaitForCommitOffsetAsync(context.Background(), offset, callback.NewOnce[any](
func(_ any) {
defer timer.Done()
localResponse, err := lc.db.ProcessWrite(req, offset, timestamp, WrapperUpdateOperationCallback)
if err != nil {
sendNonBlocking(closeCh, err)
return
}
if err = stream.Send(localResponse); err != nil {
sendNonBlocking(closeCh, err)
return
}
},
func(err error) {
defer timer.Done()
sendNonBlocking(closeCh, err)
return
}
timer.Done()
})
},
))
}

func (lc *leaderController) appendToWalStreamRequest(request *proto.WriteRequest,
callback func(offset int64, timestamp uint64, err error)) {
cb func(offset int64, timestamp uint64, err error)) {
lc.Lock()

if err := checkStatusIsLeader(lc.status); err != nil {
lc.Unlock()
callback(wal.InvalidOffset, 0, err)
cb(wal.InvalidOffset, 0, err)
return
}

Expand All @@ -961,7 +965,7 @@ func (lc *leaderController) appendToWalStreamRequest(request *proto.WriteRequest
value, err := logEntryValue.MarshalVT()
if err != nil {
lc.Unlock()
callback(wal.InvalidOffset, timestamp, err)
cb(wal.InvalidOffset, timestamp, err)
return
}
logEntry := &proto.LogEntry{
Expand All @@ -973,10 +977,10 @@ func (lc *leaderController) appendToWalStreamRequest(request *proto.WriteRequest

lc.wal.AppendAndSync(logEntry, func(err error) {
if err != nil {
callback(wal.InvalidOffset, timestamp, errors.Wrap(err, "oxia: failed to append to wal"))
cb(wal.InvalidOffset, timestamp, errors.Wrap(err, "oxia: failed to append to wal"))
} else {
lc.quorumAckTracker.AdvanceHeadOffset(newOffset)
callback(newOffset, timestamp, nil)
cb(newOffset, timestamp, nil)
}
})
lc.Unlock()
Expand Down
Loading

0 comments on commit 0acbcaa

Please # to comment.