Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

broker: add optional ReadRequest.BeginModTime constraint #346

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion broker/fragment/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,18 @@ func (fi *Index) Query(ctx context.Context, req *pb.ReadRequest) (*pb.ReadRespon
found = true
}

if found {
if !found {
// Pass.
} else if f := fi.set[ind].Fragment; f.ModTime != 0 && f.ModTime < req.BeginModTime {
// This fragment was modified before the requested lower bound.
// Skip the read offset over its content.
addTrace(ctx, "Index.Query(%s) => skip offsets [%d, %d) because ModTime %d < BeginModTime %d",
req, f.Begin, f.End, f.ModTime, req.BeginModTime)

resp.Offset = fi.set[ind].End
continue
} else {
// We found a covering fragment.
resp.Status = pb.Status_OK
resp.WriteHead = fi.set.EndOffset()
resp.Fragment = new(pb.Fragment)
Expand Down
40 changes: 39 additions & 1 deletion broker/fragment/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (s *IndexSuite) TestQueryAtHead(c *gc.C) {
WriteHead: 250,
Fragment: &pb.Fragment{Begin: 200, End: 250},
})
c.Check(err, gc.IsNil)
}

func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) {
Expand Down Expand Up @@ -159,6 +160,43 @@ func (s *IndexSuite) TestQueryAtMissingMiddle(c *gc.C) {
c.Check(resp.Status, gc.Equals, pb.Status_OK)
}

func (s *IndexSuite) TestQueryWithBeginModTimeConstraint(c *gc.C) {
const beginTime int64 = 1500000000

var set = buildSet(c, 100, 200, 200, 300, 300, 400, 400, 500, 500, 600)
set[0].ModTime = beginTime - 10 // 100-200 not matched.
set[1].ModTime = beginTime // 200-300 matched
set[2].ModTime = beginTime - 1 // 300-400 not matched.
set[3].ModTime = beginTime + 100 // 400-500 matched.

var ind = NewIndex(context.Background())
ind.ReplaceRemote(set[:4])
ind.SpoolCommit(set[4]) // 500-600 has no ModTime.

for _, tc := range []struct {
offset, ind int64
}{
{50, 1},
{150, 1},
{200, 1},
{350, 3},
{400, 3},
{500, 4},
} {
var resp, _, _ = ind.Query(context.Background(),
&pb.ReadRequest{Offset: tc.offset, BeginModTime: beginTime})
c.Check(resp, gc.DeepEquals, &pb.ReadResponse{
Offset: set[tc.ind].Begin,
WriteHead: 600,
Fragment: &pb.Fragment{
Begin: set[tc.ind].Begin,
End: set[tc.ind].End,
ModTime: set[tc.ind].ModTime,
},
})
}
}

func (s *IndexSuite) TestBlockedContextCancelled(c *gc.C) {
var indCtx, indCancel = context.WithCancel(context.Background())
var reqCtx, reqCancel = context.WithCancel(context.Background())
Expand All @@ -174,7 +212,7 @@ func (s *IndexSuite) TestBlockedContextCancelled(c *gc.C) {
c.Check(err, gc.Equals, context.Canceled)

// Cancel the Index's context. Same deal.
reqCtx, reqCancel = context.WithCancel(context.Background())
reqCtx = context.Background()
go indCancel()

resp, _, err = ind.Query(reqCtx, &pb.ReadRequest{Offset: -1, Block: true})
Expand Down
374 changes: 205 additions & 169 deletions broker/protocol/protocol.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions broker/protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ message ReadRequest {
bool metadata_only = 6;
// Offset to read through. If zero, then the read end offset is unconstrained.
int64 end_offset = 7 [ (gogoproto.casttype) = "Offset" ];
// BeginModTime is an optional inclusive lower bound on the modification
// timestamps of fragments read from the backing store, represented as
// seconds since the epoch. The request Offset will be advanced as-needed
// to skip persisted Fragments having a modication time before the bound.
int64 begin_mod_time = 8;
}

// ReadResponse is the streamed response message of the broker Read RPC.
Expand Down
10 changes: 6 additions & 4 deletions cmd/gazctl/gazctlcmd/journals_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type cmdJournalRead struct {
OffsetsPath string `long:"offsets" description:"Path from which initial journal offsets are read at startup"`
OffsetsOutPath string `long:"offsets-out" description:"Path to which final journal offsets are written at exit"`
FileRoot string `long:"file-root" description:"Filesystem path which roots file:// fragment store"`
FromUnix int64 `long:"from" description:"Skip over fragments persisted before this time, in unix seconds since epoch"`

pumpCh chan pumpResult // Chan into which completed read pumps are sent.
beginOffsets map[pb.Journal]int64 // Contents of initial --offsets.
Expand Down Expand Up @@ -218,10 +219,11 @@ func (cmd *cmdJournalRead) listRefreshed(ctx context.Context, rjc pb.RoutedJourn
var subCtx, fn = context.WithCancel(ctx)

go pumpReader(client.NewRetryReader(subCtx, rjc, pb.ReadRequest{
Journal: j.Spec.Name,
Offset: offset,
Block: cmd.Block,
DoNotProxy: !rjc.IsNoopRouter(),
Journal: j.Spec.Name,
Offset: offset,
Block: cmd.Block,
DoNotProxy: !rjc.IsNoopRouter(),
BeginModTime: cmd.FromUnix,
}), cmd.pumpCh)
nextFns[j.Spec.Name] = fn
}
Expand Down
22 changes: 16 additions & 6 deletions message/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
// Instead, Gazette uses clock sequence bits of UUIDs it generates in the
// following way:
//
// * The first 2 bits are reserved to represent the variant, as per RFC 4122.
// * The next 4 bits extend the 60 bit timestamp with a counter, which allows
// for a per-producer UUID generation rate of 160M UUIDs / second before
// running ahead of wall-clock time. The timestamp and counter are monotonic,
// and together provide a total ordering of UUIDs from each ProducerID.
// * The remaining 10 bits are flags, eg for representing transaction semantics.
// - The first 2 bits are reserved to represent the variant, as per RFC 4122.
// - The next 4 bits extend the 60 bit timestamp with a counter, which allows
// for a per-producer UUID generation rate of 160M UUIDs / second before
// running ahead of wall-clock time. The timestamp and counter are monotonic,
// and together provide a total ordering of UUIDs from each ProducerID.
// - The remaining 10 bits are flags, eg for representing transaction semantics.
type UUID = uuid.UUID

// ProducerID is the unique node identifier portion of a v1 UUID.
Expand Down Expand Up @@ -89,6 +89,16 @@ func (c *Clock) Tick() Clock {
return Clock(atomic.AddUint64((*uint64)(c), 1))
}

// AsTime maps the Clock into an equivalent time.Time.
func (c Clock) AsTime() time.Time {
var (
ticks = int64((c >> 4) - g1582ns100) // Each tick is 100ns relative to unix epoch.
seconds = ticks / 10_000_000
nanos = (ticks % 10_000_000) * 100
)
return time.Unix(seconds, nanos)
}

// GetClock returns the clock timestamp and sequence as a Clock.
func GetClock(uuid UUID) Clock {
var t = uint64(binary.BigEndian.Uint32(uuid[0:4])) << 4 // Clock low bits.
Expand Down
3 changes: 3 additions & 0 deletions message/uuid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestUUIDClock(t *testing.T) {

// Clock representation is precise to 100ns.
require.Equal(t, clock, NewClock(time.Unix(12, 399)))
require.Equal(t, clock.AsTime(), time.Unix(12, 300))
require.True(t, clock < NewClock(time.Unix(12, 400)))
require.True(t, clock > NewClock(time.Unix(12, 299)))

Expand All @@ -45,6 +46,7 @@ func TestUUIDClock(t *testing.T) {
}
clock.Tick() // 16th tick.
require.Equal(t, clock, NewClock(time.Unix(12, 400)))
require.Equal(t, clock.AsTime(), time.Unix(12, 400))

// Update must never decrease the clock value.
clock.Update(time.Unix(11, 100))
Expand All @@ -58,6 +60,7 @@ func TestUUIDClock(t *testing.T) {
// Sequence bits are reset if the clock timestamp is updated.
clock.Update(time.Unix(12, 500))
require.Equal(t, clock, NewClock(time.Unix(12, 500)))
require.Equal(t, clock.AsTime(), time.Unix(12, 500))
}

func TestUUIDBuilding(t *testing.T) {
Expand Down