Skip to content

Commit

Permalink
Add back option to set reading block duration to node options (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
nitinmohan87 authored Nov 14, 2024
1 parent be67ad4 commit f01bf50
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 20 deletions.
7 changes: 2 additions & 5 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ const (
evDispatchReturn string = "d"
)

// jobSinkBlockDuration is the max duration to block when reading from the job stream.
var jobSinkBlockDuration = 5 * time.Second

// pendingEventTTL is the TTL for pending events.
var pendingEventTTL = 2 * time.Minute

Expand Down Expand Up @@ -160,7 +157,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No
return nil, fmt.Errorf("AddNode: failed to join pool ticker replicated map %q: %w", tickerMapName(poolName), err)
}
poolSink, err = poolStream.NewSink(ctx, "events",
soptions.WithSinkBlockDuration(jobSinkBlockDuration),
soptions.WithSinkBlockDuration(o.jobSinkBlockDuration),
soptions.WithSinkAckGracePeriod(o.ackGracePeriod))
if err != nil {
return nil, fmt.Errorf("AddNode: failed to create events sink for stream %q: %w", poolStreamName(poolName), err)
Expand All @@ -170,7 +167,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No
if err != nil {
return nil, fmt.Errorf("AddNode: failed to create node event stream %q: %w", nodeStreamName(poolName, nodeID), err)
}
nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(jobSinkBlockDuration), soptions.WithReaderStartAtOldest())
nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(o.jobSinkBlockDuration), soptions.WithReaderStartAtOldest())
if err != nil {
return nil, fmt.Errorf("AddNode: failed to create node event reader for stream %q: %w", nodeStreamName(poolName, nodeID), err)
}
Expand Down
32 changes: 21 additions & 11 deletions pool/node_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ type (
NodeOption func(*nodeOptions)

nodeOptions struct {
workerTTL time.Duration
workerShutdownTTL time.Duration
maxQueuedJobs int
clientOnly bool
ackGracePeriod time.Duration
logger pulse.Logger
workerTTL time.Duration
workerShutdownTTL time.Duration
maxQueuedJobs int
clientOnly bool
jobSinkBlockDuration time.Duration
ackGracePeriod time.Duration
logger pulse.Logger
}
)

Expand All @@ -37,6 +38,14 @@ func WithWorkerShutdownTTL(ttl time.Duration) NodeOption {
}
}

// WithJobSinkBlockDuration sets the duration to block when reading from the
// job stream. The default is 5s. This option is mostly useful for testing.
func WithJobSinkBlockDuration(d time.Duration) NodeOption {
return func(o *nodeOptions) {
o.jobSinkBlockDuration = d
}
}

// WithMaxQueuedJobs sets the maximum number of jobs that can be queued in the pool.
// The default is 1000.
func WithMaxQueuedJobs(max int) NodeOption {
Expand Down Expand Up @@ -82,10 +91,11 @@ func parseOptions(opts ...NodeOption) *nodeOptions {
// defaultPoolOptions returns the default options.
func defaultPoolOptions() *nodeOptions {
return &nodeOptions{
workerTTL: 30 * time.Second,
workerShutdownTTL: 2 * time.Minute,
maxQueuedJobs: 1000,
ackGracePeriod: 20 * time.Second,
logger: pulse.NoopLogger(),
workerTTL: 30 * time.Second,
workerShutdownTTL: 2 * time.Minute,
jobSinkBlockDuration: 5 * time.Second,
maxQueuedJobs: 1000,
ackGracePeriod: 20 * time.Second,
logger: pulse.NoopLogger(),
}
}
5 changes: 1 addition & 4 deletions pool/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ const (
testAckGracePeriod = 50 * time.Millisecond
)

func init() {
jobSinkBlockDuration = 100 * time.Millisecond
}

// newTestNode creates a new Node instance for testing purposes.
// It configures the node with specific TTL and block duration settings
// suitable for testing, and uses the provided Redis client and name.
Expand All @@ -42,6 +38,7 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri
node, err := AddNode(ctx, name, rdb,
WithLogger(pulse.ClueLogger(ctx)),
WithWorkerShutdownTTL(testWorkerShutdownTTL),
WithJobSinkBlockDuration(testJobSinkBlockDuration),
WithWorkerTTL(testWorkerTTL),
WithAckGracePeriod(testAckGracePeriod))
require.NoError(t, err)
Expand Down

0 comments on commit f01bf50

Please # to comment.