Skip to content

Commit

Permalink
De-flake setup of mem WQ restart test
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Dec 18, 2024
1 parent d323e23 commit f70721d
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10729,27 +10729,42 @@ func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *t
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", n),
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
Replicas: 3,
}, nats.MaxWait(30*time.Second))
require_NoError(t, err)
checkFor(t, 5*time.Second, time.Second, func() error {
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", n),
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
Replicas: 3,
}, nats.MaxWait(time.Second))
return err
})

subj := fmt.Sprintf("foo.%d.bar", n)
for i := 0; i < 22; i++ {
js.Publish(subj, nil)
checkFor(t, 5*time.Second, time.Second, func() error {
_, err := js.Publish(subj, nil)
return err
})
}
// Now consumer them all as well.
sub, err := js.PullSubscribe(subj, "wq")
require_NoError(t, err)
msgs, err := sub.Fetch(22, nats.MaxWait(20*time.Second))
require_NoError(t, err)
// Now consume them all as well.
var err error
var sub *nats.Subscription
checkFor(t, 5*time.Second, time.Second, func() error {
sub, err = js.PullSubscribe(subj, "wq")
return err
})

var msgs []*nats.Msg
checkFor(t, 5*time.Second, time.Second, func() error {
msgs, err = sub.Fetch(22, nats.MaxWait(time.Second))
return err
})
require_Equal(t, len(msgs), 22)
for _, m := range msgs {
err := m.AckSync()
require_NoError(t, err)
checkFor(t, 5*time.Second, time.Second, func() error {
return m.AckSync()
})
}
}(i)
}
Expand Down

0 comments on commit f70721d

Please # to comment.