Skip to content

Commit

Permalink
broker: add optional ReadRequest.BeginModTime constraint
Browse files Browse the repository at this point in the history
If provided, the broker read API will skip the read offset over
persisted fragments having a modification time that falls before the
bound. This can be used to efficiently skip over undesired historical
prefixes of journal content given a wall-time lower bound.
  • Loading branch information
jgraettinger committed Aug 16, 2023
1 parent 8560660 commit 6fa41ac
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 175 deletions.
13 changes: 12 additions & 1 deletion broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,18 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
found = true
}

if found {
if !found {
// Pass.
} else if f := fi.set[ind].Fragment; f.ModTime != 0 && f.ModTime < req.BeginModTime {
// This fragment was modified before the requested lower bound.
// Skip the read offset over its content.
addTrace(ctx, "Index.Query(%s) => skip offsets [%d, %d) because ModTime %d < BeginModTime %d",
req, f.Begin, f.End, f.ModTime, req.BeginModTime)

resp.Offset = fi.set[ind].End
continue
} else {
// We found a covering fragment.
resp.Status = pb.Status_OK
resp.WriteHead = fi.set.EndOffset()
resp.Fragment = new(pb.Fragment)
Expand Down
40 changes: 39 additions & 1 deletion broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (s *IndexSuite) TestQueryAtHead(c *gc.C) {
WriteHead: 250,
Fragment: &pb.Fragment{Begin: 200, End: 250},
})
c.Check(err, gc.IsNil)
}

func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) {
Expand Down Expand Up @@ -159,6 +160,43 @@ func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) {
c.Check(resp.Status, gc.Equals, pb.Status_OK)
}

func (s *IndexSuite) TestQueryWithBeginModTimeConstraint(c *gc.C) {
const beginTime int64 = 1500000000

var set = buildSet(c, 100, 200, 200, 300, 300, 400, 400, 500, 500, 600)
set[0].ModTime = beginTime - 10 // 100-200 not matched.
set[1].ModTime = beginTime // 200-300 matched
set[2].ModTime = beginTime - 1 // 300-400 not matched.
set[3].ModTime = beginTime + 100 // 400-500 matched.

var ind = NewIndex(context.Background())
ind.ReplaceRemote(set[:4])
ind.SpoolCommit(set[4]) // 500-600 has no ModTime.

for _, tc := range []struct {
offset, ind int64
}{
{50, 1},
{150, 1},
{200, 1},
{350, 3},
{400, 3},
{500, 4},
} {
var resp, _, _ = ind.Query(context.Background(),
&pb.ReadRequest{Offset: tc.offset, BeginModTime: beginTime})
c.Check(resp, gc.DeepEquals, &pb.ReadResponse{
Offset: set[tc.ind].Begin,
WriteHead: 600,
Fragment: &pb.Fragment{
Begin: set[tc.ind].Begin,
End: set[tc.ind].End,
ModTime: set[tc.ind].ModTime,
},
})
}
}

func (s *IndexSuite) TestBlockedContextCancelled(c *gc.C) {
var indCtx, indCancel = context.WithCancel(context.Background())
var reqCtx, reqCancel = context.WithCancel(context.Background())
Expand All @@ -174,7 +212,7 @@ func (s *IndexSuite) TestBlockedContextCancelled(c *gc.C) {
c.Check(err, gc.Equals, context.Canceled)

// Cancel the Index's context. Same deal.
reqCtx, reqCancel = context.WithCancel(context.Background())
reqCtx = context.Background()
go indCancel()

resp, _, err = ind.Query(reqCtx, &pb.ReadRequest{Offset: -1, Block: true})
Expand Down
Loading

0 comments on commit 6fa41ac

Please # to comment.