diff --git a/pool/node.go b/pool/node.go index c133b61..200f1f5 100644 --- a/pool/node.go +++ b/pool/node.go @@ -84,9 +84,6 @@ const ( evDispatchReturn string = "d" ) -// jobSinkBlockDuration is the max duration to block when reading from the job stream. -var jobSinkBlockDuration = 5 * time.Second - // pendingEventTTL is the TTL for pending events. var pendingEventTTL = 2 * time.Minute @@ -160,7 +157,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No return nil, fmt.Errorf("AddNode: failed to join pool ticker replicated map %q: %w", tickerMapName(poolName), err) } poolSink, err = poolStream.NewSink(ctx, "events", - soptions.WithSinkBlockDuration(jobSinkBlockDuration), + soptions.WithSinkBlockDuration(o.jobSinkBlockDuration), soptions.WithSinkAckGracePeriod(o.ackGracePeriod)) if err != nil { return nil, fmt.Errorf("AddNode: failed to create events sink for stream %q: %w", poolStreamName(poolName), err) @@ -170,7 +167,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No if err != nil { return nil, fmt.Errorf("AddNode: failed to create node event stream %q: %w", nodeStreamName(poolName, nodeID), err) } - nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(jobSinkBlockDuration), soptions.WithReaderStartAtOldest()) + nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(o.jobSinkBlockDuration), soptions.WithReaderStartAtOldest()) if err != nil { return nil, fmt.Errorf("AddNode: failed to create node event reader for stream %q: %w", nodeStreamName(poolName, nodeID), err) } diff --git a/pool/node_options.go b/pool/node_options.go index d49b3ce..77ba37d 100644 --- a/pool/node_options.go +++ b/pool/node_options.go @@ -11,12 +11,13 @@ type ( NodeOption func(*nodeOptions) nodeOptions struct { - workerTTL time.Duration - workerShutdownTTL time.Duration - maxQueuedJobs int - clientOnly bool - ackGracePeriod time.Duration - logger pulse.Logger + workerTTL time.Duration + workerShutdownTTL time.Duration + maxQueuedJobs int + clientOnly bool + jobSinkBlockDuration time.Duration + ackGracePeriod time.Duration + logger pulse.Logger } ) @@ -37,6 +38,14 @@ func WithWorkerShutdownTTL(ttl time.Duration) NodeOption { } } +// WithJobSinkBlockDuration sets the duration to block when reading from the +// job stream. The default is 5s. This option is mostly useful for testing. +func WithJobSinkBlockDuration(d time.Duration) NodeOption { + return func(o *nodeOptions) { + o.jobSinkBlockDuration = d + } +} + // WithMaxQueuedJobs sets the maximum number of jobs that can be queued in the pool. // The default is 1000. func WithMaxQueuedJobs(max int) NodeOption { @@ -82,10 +91,11 @@ func parseOptions(opts ...NodeOption) *nodeOptions { // defaultPoolOptions returns the default options. func defaultPoolOptions() *nodeOptions { return &nodeOptions{ - workerTTL: 30 * time.Second, - workerShutdownTTL: 2 * time.Minute, - maxQueuedJobs: 1000, - ackGracePeriod: 20 * time.Second, - logger: pulse.NoopLogger(), + workerTTL: 30 * time.Second, + workerShutdownTTL: 2 * time.Minute, + jobSinkBlockDuration: 5 * time.Second, + maxQueuedJobs: 1000, + ackGracePeriod: 20 * time.Second, + logger: pulse.NoopLogger(), } } diff --git a/pool/testing.go b/pool/testing.go index 571210d..75476d7 100644 --- a/pool/testing.go +++ b/pool/testing.go @@ -30,10 +30,6 @@ const ( testAckGracePeriod = 50 * time.Millisecond ) -func init() { - jobSinkBlockDuration = 100 * time.Millisecond -} - // newTestNode creates a new Node instance for testing purposes. // It configures the node with specific TTL and block duration settings // suitable for testing, and uses the provided Redis client and name. @@ -42,6 +38,7 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri node, err := AddNode(ctx, name, rdb, WithLogger(pulse.ClueLogger(ctx)), WithWorkerShutdownTTL(testWorkerShutdownTTL), + WithJobSinkBlockDuration(testJobSinkBlockDuration), WithWorkerTTL(testWorkerTTL), WithAckGracePeriod(testAckGracePeriod)) require.NoError(t, err)