diff --git a/examples/pool/scheduler/main.go b/examples/pool/scheduler/main.go index 70e9e77..be7fe5d 100644 --- a/examples/pool/scheduler/main.go +++ b/examples/pool/scheduler/main.go @@ -28,10 +28,7 @@ func main() { } // Create node for pool "example". - node, err := pool.AddNode(ctx, "example", rdb, - pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster - pool.WithLogger(logger), - ) + node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger)) if err != nil { panic(err) } diff --git a/examples/pool/worker/main.go b/examples/pool/worker/main.go index 3611fa9..f8722f7 100644 --- a/examples/pool/worker/main.go +++ b/examples/pool/worker/main.go @@ -51,10 +51,7 @@ func main() { } // Create node for pool "example". - node, err := pool.AddNode(ctx, "example", rdb, - pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster - pool.WithLogger(logger), - ) + node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger)) if err != nil { panic(err) } diff --git a/pool/README.md b/pool/README.md index 3716bfb..bcb2236 100644 --- a/pool/README.md +++ b/pool/README.md @@ -48,22 +48,48 @@ flowchart LR ## Usage -Pulse dedicated worker pools are generally valuable when workers require -state which depends on the jobs they perform. - -To illustrate, let's consider the scenario of a multitenant system that requires -managing a collection of background tasks for each tenant. In this case, -utilizing a Pulse worker pool proves to be highly beneficial. The system can -create a dedicated worker pool and create one job per tenant, utilizing the -unique tenant identifier as the job key. This approach ensures that only one -worker handles the background task for a specific tenant at any given time. As -new tenants are added or old ones are removed, jobs can be started or stopped -accordingly. Similarly, workers can be added or removed based on performance -requirements. - -Pulse dedicated worker pools are not needed when workers are stateless and can -be scaled horizontally. In such cases, any standard load balancing solution can -be used. +Job producer: +```go + rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) + node, err := pool.AddNode(ctx, "example", rdb, pool.WithClientOnly()) + if err != nil { + panic(err) + } + if err := node.DispatchJob(ctx, "key", []byte("payload")); err != nil { + panic(err) + } +``` + +Worker: +```go + rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) + node, err := pool.AddNode(ctx, "example", rdb) + if err != nil { + panic(err) + } + handler := &JobHandler{} + _, err := node.AddWorker(context.Background(), handler) + if err != nil { + panic(err) + } +``` + +Job handler: +```go +type JobHandler struct { + // ... +} + +// Pulse calls this method to start a job that was assigned to this worker. +func (h *JobHandler) Start(ctx context.Context, key string, payload []byte) error { + // ... +} + +// Pulse calls this method to stop a job that was assigned to this worker. +func (h *JobHandler) Stop(ctx context.Context, key string) error { + // ... +} +``` ### Creating A Pool @@ -78,24 +104,28 @@ should be closed when it is no longer needed (see below). The options are used to configure the pool node. The following options are available: +* `WithClientOnly` - specifies that this node will only be used to dispatch jobs to + workers in other nodes, and will not run any workers itself. * `WithLogger` - sets the logger to be used by the pool node. -* `WithWorkerTTL` - sets the worker time-to-live (TTL) in seconds. The TTL - defines the maximum delay between two health-checks before a worker is removed - from the pool. The default value is 10 seconds. -* `WithPendingJobTTL` - sets the pending job time-to-live (TTL) in seconds. The - TTL defines the maximum delay between a worker picking up the job and - successfully starting it. The default value is 20 seconds. +* `WithWorkerTTL` - sets the worker time-to-live (TTL). This is the maximum duration + a worker can go without sending a health check before it's considered inactive + and removed from the pool. If a worker doesn't report its status within this + time frame, it will be removed, allowing the pool to reassign its jobs to other + active workers. The default value is 30 seconds. * `WithWorkerShutdownTTL` - specifies the maximum time to wait for a worker to - shutdown gracefully. The default value is 2 minutes. + shutdown gracefully. This is the duration the pool will wait for a worker to + finish its current job and perform any cleanup operations before forcefully + terminating it. If the worker doesn't shut down within this time, it will be + forcefully stopped. The default value is 2 minutes. * `WithMaxQueuedJobs` - sets the maximum number of jobs that can be queued - before the pool starts rejecting new jobs. The default value is 1000. -* `WithClientOnly` - specifies that the pool node should not starts - background goroutines to manage the pool and thus not allow creating workers. - This option is useful when the pool is used only to dispatch jobs to workers - that are created in other nodes. -* `WithJobSinkBlockDuration` - sets the max poll duration for new jobs. This - value is mostly used by tests to accelerate the pool shutdown process. The - default value is 5 seconds. + before the pool starts rejecting new jobs. This limit applies to the entire + pool across all nodes. When this limit is reached, any attempt to dispatch + new jobs will result in an error. The default value is 1000 jobs. +* `WithAckGracePeriod` - sets the grace period for job acknowledgment. If a + worker doesn't acknowledge starting a job within this duration, the job + becomes available for other workers to claim. This prevents jobs from being + stuck if a worker fails to start processing them. The default value is 20 + seconds. ### Closing A Node @@ -168,165 +198,3 @@ a list of jobs to be started and stopped. `Schedule` makes it possible to maintain a pool of jobs for example in a multi-tenant system. See the [examples](../examples/pool) for more details. - -## Data Flows - -The following sections provide additional details on the internal data flows -involved in creating and using a Pulse worker pool. They are provided for -informational purposes only and are not required reading for simply using the -package. - -### Adding A New Job - -The following diagram illustrates the data flow involved in adding a new job to -a Pulse worker pool: - -* The producer calls `DispatchJob` which adds an event to the pool job stream. -* The pool job stream is read by the pool sink running in one of the pool nodes. - The routing node records the event so it can ack it later and routes the event - to the proper worker stream using a consistent hashing algorithm. -* The dedicated worker stream is read by the worker which starts the job by - calling the `Start` method on the worker job handler. Once `Start` returns - successfully the worker sends an event back to the original pool node. -* Upon getting the event, the pool node acks the job with the - pool job stream and removes it from its pending jobs map. - - -```mermaid -%%{ init: { 'flowchart': { 'curve': 'basis' } } }%% -%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%% -flowchart TD - subgraph w[Worker Node] - r[Reader] - u[User code] - end - subgraph rdb[Redis] - js(["Pool Job Stream (shared)"]) - ws(["Worker Stream (dedicated)"]) - rs(["Routing Node Stream (dedicated)"]) - end - subgraph p[Producer Node] - pr[User code] - no[Client Node] - end - subgraph ro[Routing Node] - ps[Pool Sink] - nr[Routing Node Reader] - end - pr --1. DispatchJob--> no - no --2. Add Job--> js - js --3. Job--> ps - ps --4. Add Job--> ws - ws --5. Job--> r - r --6. Start Job--> u - r --7. Add Ack--> rs - rs --7. Ack--> nr - nr --8. Ack Add Job Event--> js - - classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC; - classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF; - classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6; - classDef background fill:#7A7A7A, color:#F2F2F2; - - class pr,u userCode; - class pj,js,ws,rs redis; - class no,ps,r,c,nr producer; - class p,w,rdb,ro background; - - linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; -``` - -The worker pool uses a job stream so that jobs that do not get acknowledged in time -are automatically re-queued. This is useful in case of worker failure or -network partitioning. The pool sink applies the consistent hashing algorithm -to the job key to determine which worker stream the job should be added to. This -ensures that unhealthy workers are properly ignored when requeuing jobs. - -### Shutdown and Cleanup - -The following diagram illustrates the data flow involved in shutting down a -Pulse worker pool: - -* The producer calls `Shutdown` which adds a shutdown event to the pool stream. -* Upon receving the shutdown event the pool node closes the pool stream to avoid - accepting new jobs and sets a flag in the pool shutdown replicated map. -* The pool nodes get notified and stop accepting new jobs (`DispatchJob` - returns an error if called). -* The pool nodes add a stop event to the worker streams for all the workers - they own. -* Upon receiving the event, the workers remove themselves from the pool - workers replicated map, destroy their stream and exit. Note that any job that - was enqueued before the shutdown event still gets processed. -* Once the workers have stopped, the producer that initiated the - shutdown cleans up the pool resources (jobs sink, jobs stream, replicated maps) - and the pool nodes exit. - -```mermaid -%%{ init: { 'flowchart': { 'curve': 'basis' } } }%% -%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%% - -flowchart TD - subgraph pn1[Pool Node 1] - u[User code] - po1[Pool 1] - w1[Worker 1] - end - subgraph pn2[Pool Node 2] - po2[Pool 2] - w2[Worker 2] - end - subgraph rdb[Redis] - sr[(Shutdown
Replicated Map)] - wr[(Worker
Replicated Map)] - ws1(["Worker 1 Stream"]) - ws2(["Worker 2 Stream"]) - end - u[User code] --1. Shutdown--> po1[Pool 1] - po1 --2. Set Shutdown Flag--> sr[(Shutdown
Replicated Map)] - sr --3. Shutdown Flag--> po1 - sr --3. Shutdown Flag--> po2 - po1 --4. Add Stop--> ws1 - po2 --4. Add Stop--> ws2 - ws1 --5. Stop--> w1 - ws2 --5. Stop--> w2 - w1 --6. Remove Worker--> wr - w2 --6. Remove Worker--> wr - w1 --7. Delete--> ws1 - w2 --7. Delete--> ws2 - wr --8. Workers Empty--> po1 - po1 --9. Delete --> sr - po1 --10. Delete --> wr - - classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC; - classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF; - classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6; - classDef background fill:#7A7A7A, color:#F2F2F2; - - class u userCode; - class wr,sr,ws1,ws2 redis; - class po1,po2,w1,w2 producer; - class rdb,pn1,pn2 background; - - linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 8 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 9 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 10 stroke:#FF8888,color:#FF8888,stroke-width:3px; - linkStyle 11 stroke:#FF8888,color:#FF8888,stroke-width:3px; - linkStyle 12 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px; - linkStyle 13 stroke:#FF8888,color:#FF8888,stroke-width:3px; - linkStyle 14 stroke:#FF8888,color:#FF8888,stroke-width:3px; -``` diff --git a/pool/node.go b/pool/node.go index 5719a69..c133b61 100644 --- a/pool/node.go +++ b/pool/node.go @@ -41,7 +41,6 @@ type ( tickerMap *rmap.Map // ticker next tick time indexed by name workerTTL time.Duration // Worker considered dead if keep-alive not updated after this duration workerShutdownTTL time.Duration // Worker considered dead if not shutdown after this duration - pendingJobTTL time.Duration // Job lease expires if not acked after this duration ackGracePeriod time.Duration // Wait for return status up to this duration clientOnly bool logger pulse.Logger @@ -85,6 +84,12 @@ const ( evDispatchReturn string = "d" ) +// jobSinkBlockDuration is the max duration to block when reading from the job stream. +var jobSinkBlockDuration = 5 * time.Second + +// pendingEventTTL is the TTL for pending events. +var pendingEventTTL = 2 * time.Minute + // AddNode adds a new node to the pool with the given name and returns it. The // node can be used to dispatch jobs and add new workers. A node also routes // dispatched jobs to the proper worker and acks the corresponding events once @@ -107,8 +112,6 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No "max_queued_jobs", o.maxQueuedJobs, "worker_ttl", o.workerTTL, "worker_shutdown_ttl", o.workerShutdownTTL, - "pending_job_ttl", o.pendingJobTTL, - "job_sink_block_duration", o.jobSinkBlockDuration, "ack_grace_period", o.ackGracePeriod) wsm, err := rmap.Join(ctx, shutdownMapName(poolName), rdb, rmap.WithLogger(logger)) if err != nil { @@ -157,7 +160,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No return nil, fmt.Errorf("AddNode: failed to join pool ticker replicated map %q: %w", tickerMapName(poolName), err) } poolSink, err = poolStream.NewSink(ctx, "events", - soptions.WithSinkBlockDuration(o.jobSinkBlockDuration), + soptions.WithSinkBlockDuration(jobSinkBlockDuration), soptions.WithSinkAckGracePeriod(o.ackGracePeriod)) if err != nil { return nil, fmt.Errorf("AddNode: failed to create events sink for stream %q: %w", poolStreamName(poolName), err) @@ -167,7 +170,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No if err != nil { return nil, fmt.Errorf("AddNode: failed to create node event stream %q: %w", nodeStreamName(poolName, nodeID), err) } - nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(o.jobSinkBlockDuration), soptions.WithReaderStartAtOldest()) + nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(jobSinkBlockDuration), soptions.WithReaderStartAtOldest()) if err != nil { return nil, fmt.Errorf("AddNode: failed to create node event reader for stream %q: %w", nodeStreamName(poolName, nodeID), err) } @@ -191,7 +194,6 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No clientOnly: o.clientOnly, workerTTL: o.workerTTL, workerShutdownTTL: o.workerShutdownTTL, - pendingJobTTL: o.pendingJobTTL, ackGracePeriod: o.ackGracePeriod, h: jumpHash{crc64.New(crc64.MakeTable(crc64.ECMA))}, stop: make(chan struct{}), @@ -398,6 +400,12 @@ func (node *Node) Shutdown(ctx context.Context) error { node.logger.Error(fmt.Errorf("Shutdown: failed to destroy pool stream: %w", err)) } + // Cleanup the jobs payloads map as ongoing requeues could prevent it + // from being cleaned up by the workers. + if err := node.jobPayloadsMap.Reset(ctx); err != nil { + node.logger.Error(fmt.Errorf("Shutdown: failed to reset job payloads map: %w", err)) + } + // Now clean up the shutdown replicated map. wsm, err := rmap.Join(ctx, shutdownMapName(node.PoolName), node.rdb, rmap.WithLogger(node.logger)) if err != nil { @@ -446,38 +454,68 @@ func (node *Node) close(ctx context.Context, requeue bool) error { node.logger.Info("closing") + // If we're not requeuing then stop all the jobs. + if !requeue { + node.logger.Info("stopping all jobs") + var wg sync.WaitGroup + var total atomic.Int32 + node.localWorkers.Range(func(key, value any) bool { + wg.Add(1) + worker := value.(*Worker) + pulse.Go(ctx, func() { + defer wg.Done() + for _, job := range worker.Jobs() { + if err := worker.stopJob(ctx, job.Key, false); err != nil { + node.logger.Error(fmt.Errorf("Close: failed to stop job %q for worker %q: %w", job.Key, worker.ID, err)) + } + total.Add(1) + } + }) + return true + }) + wg.Wait() + node.logger.Info("stopped all jobs", "total", total.Load()) + } + // Need to stop workers before requeueing jobs to prevent // requeued jobs from being handled by this node. var wg sync.WaitGroup node.localWorkers.Range(func(key, value any) bool { wg.Add(1) - go func(w *Worker) { + worker := value.(*Worker) + pulse.Go(ctx, func() { defer wg.Done() - w.stopAndWait(ctx) - }(value.(*Worker)) + worker.stopAndWait(ctx) + }) return true }) wg.Wait() node.logger.Debug("workers stopped") // Requeue jobs. - node.localWorkers.Range(func(key, value any) bool { - wg.Add(1) - go func(w *Worker) { - defer wg.Done() - if requeue { - if err := w.requeueJobs(ctx); err != nil { - node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", w.ID, err)) + if requeue { + var wg sync.WaitGroup + node.localWorkers.Range(func(key, value any) bool { + wg.Add(1) + worker := value.(*Worker) + pulse.Go(ctx, func() { + defer wg.Done() + if err := worker.requeueJobs(ctx); err != nil { + node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", worker.ID, err)) return } - } - w.cleanup(ctx) - }(value.(*Worker)) - return true - }) - wg.Wait() + }) + return true + }) + wg.Wait() + } // Cleanup + node.localWorkers.Range(func(key, value any) bool { + worker := value.(*Worker) + worker.cleanup(ctx) + return true + }) if !node.clientOnly { node.poolSink.Close() node.tickerMap.Close() @@ -508,7 +546,7 @@ func (node *Node) handlePoolEvents(c <-chan *streaming.Event) { } node.logger.Debug("routing", "event", ev.EventName, "id", ev.ID) if err := node.routeWorkerEvent(ctx, ev); err != nil { - node.logger.Error(fmt.Errorf("handlePoolEvents: failed to route event: %w, will retry after %v", err, node.pendingJobTTL), "event", ev.EventName, "id", ev.ID) + node.logger.Error(fmt.Errorf("handlePoolEvents: failed to route event: %w, will retry after %v", err, node.ackGracePeriod), "event", ev.EventName, "id", ev.ID) } } } @@ -609,13 +647,13 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) { var staleKeys []string node.pendingEvents.Range(func(key, value any) bool { ev := value.(*streaming.Event) - if time.Since(ev.CreatedAt()) > 2*node.pendingJobTTL { + if time.Since(ev.CreatedAt()) > pendingEventTTL { staleKeys = append(staleKeys, key.(string)) } return true }) for _, key := range staleKeys { - node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", 2*node.pendingJobTTL) + node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", pendingEventTTL) node.pendingEvents.Delete(key) } } @@ -673,6 +711,7 @@ func (node *Node) handleWorkerMapUpdate(ctx context.Context) { // If it's not in the worker map, then it's not active and its jobs // have already been requeued. node.logger.Info("handleWorkerMapUpdate: removing inactive local worker", "worker", worker.ID) + node.deleteWorker(ctx, worker.ID) worker.stopAndWait(ctx) node.localWorkers.Delete(key) return true @@ -918,7 +957,7 @@ func (node *Node) activeWorkers() []string { return activeIDs } -// deleteWorker removes a worker from the pool deleting the worker stream. +// deleteWorker removes a remote worker from the pool deleting the worker stream. func (node *Node) deleteWorker(ctx context.Context, id string) error { node.logger.Debug("deleteWorker: deleting worker", "worker", id) if _, err := node.keepAliveMap.Delete(ctx, id); err != nil { @@ -927,9 +966,12 @@ func (node *Node) deleteWorker(ctx context.Context, id string) error { if _, err := node.workerMap.Delete(ctx, id); err != nil { node.logger.Error(fmt.Errorf("deleteWorker: failed to delete worker %q from workers map: %w", id, err)) } + if _, err := node.jobsMap.Delete(ctx, id); err != nil { + node.logger.Error(fmt.Errorf("deleteWorker: failed to delete worker %q from jobs map: %w", id, err)) + } stream, err := node.workerStream(ctx, id) if err != nil { - return err + return fmt.Errorf("deleteWorker: failed to retrieve worker stream for %q: %w", id, err) } if err := stream.Destroy(ctx); err != nil { node.logger.Error(fmt.Errorf("deleteWorker: failed to delete worker stream: %w", err)) diff --git a/pool/node_options.go b/pool/node_options.go index d2a1637..d49b3ce 100644 --- a/pool/node_options.go +++ b/pool/node_options.go @@ -11,14 +11,12 @@ type ( NodeOption func(*nodeOptions) nodeOptions struct { - workerTTL time.Duration - pendingJobTTL time.Duration - workerShutdownTTL time.Duration - maxQueuedJobs int - clientOnly bool - jobSinkBlockDuration time.Duration - ackGracePeriod time.Duration - logger pulse.Logger + workerTTL time.Duration + workerShutdownTTL time.Duration + maxQueuedJobs int + clientOnly bool + ackGracePeriod time.Duration + logger pulse.Logger } ) @@ -31,14 +29,6 @@ func WithWorkerTTL(ttl time.Duration) NodeOption { } } -// WithPendingJobTTL sets the duration after which a job is made available to -// other workers if it wasn't started. The default is 20s. -func WithPendingJobTTL(ttl time.Duration) NodeOption { - return func(o *nodeOptions) { - o.pendingJobTTL = ttl - } -} - // WithWorkerShutdownTTL sets the maximum time to wait for workers to // shutdown. The default is 2 minutes. func WithWorkerShutdownTTL(ttl time.Duration) NodeOption { @@ -64,14 +54,6 @@ func WithClientOnly() NodeOption { } } -// WithJobSinkBlockDuration sets the duration to block when reading from the -// job stream. The default is 5s. This option is mostly useful for testing. -func WithJobSinkBlockDuration(d time.Duration) NodeOption { - return func(o *nodeOptions) { - o.jobSinkBlockDuration = d - } -} - // WithAckGracePeriod sets the duration after which a job is made available to // other workers if it wasn't started. The default is 20s. func WithAckGracePeriod(ttl time.Duration) NodeOption { @@ -100,12 +82,10 @@ func parseOptions(opts ...NodeOption) *nodeOptions { // defaultPoolOptions returns the default options. func defaultPoolOptions() *nodeOptions { return &nodeOptions{ - workerTTL: 10 * time.Second, - pendingJobTTL: 20 * time.Second, - workerShutdownTTL: 2 * time.Minute, - maxQueuedJobs: 1000, - jobSinkBlockDuration: 5 * time.Second, - ackGracePeriod: 20 * time.Second, - logger: pulse.NoopLogger(), + workerTTL: 30 * time.Second, + workerShutdownTTL: 2 * time.Minute, + maxQueuedJobs: 1000, + ackGracePeriod: 20 * time.Second, + logger: pulse.NoopLogger(), } } diff --git a/pool/node_test.go b/pool/node_test.go index dee594b..c7dd201 100644 --- a/pool/node_test.go +++ b/pool/node_test.go @@ -510,7 +510,7 @@ func TestStaleEventsAreRemoved(t *testing.T) { defer func() { assert.NoError(t, node.Shutdown(ctx)) }() // Add a stale event manually - staleEventID := fmt.Sprintf("%d-0", time.Now().Add(-2*node.pendingJobTTL).UnixNano()/int64(time.Millisecond)) + staleEventID := fmt.Sprintf("%d-0", time.Now().Add(-2*pendingEventTTL).UnixNano()/int64(time.Millisecond)) staleEvent := &streaming.Event{ ID: staleEventID, EventName: "test-event", diff --git a/pool/testing.go b/pool/testing.go index 75476d7..571210d 100644 --- a/pool/testing.go +++ b/pool/testing.go @@ -30,6 +30,10 @@ const ( testAckGracePeriod = 50 * time.Millisecond ) +func init() { + jobSinkBlockDuration = 100 * time.Millisecond +} + // newTestNode creates a new Node instance for testing purposes. // It configures the node with specific TTL and block duration settings // suitable for testing, and uses the provided Redis client and name. @@ -38,7 +42,6 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri node, err := AddNode(ctx, name, rdb, WithLogger(pulse.ClueLogger(ctx)), WithWorkerShutdownTTL(testWorkerShutdownTTL), - WithJobSinkBlockDuration(testJobSinkBlockDuration), WithWorkerTTL(testWorkerTTL), WithAckGracePeriod(testAckGracePeriod)) require.NoError(t, err) diff --git a/pool/ticker_test.go b/pool/ticker_test.go index 9088518..14b887e 100644 --- a/pool/ticker_test.go +++ b/pool/ticker_test.go @@ -77,7 +77,9 @@ func TestReplaceTickerTimer(t *testing.T) { require.NotNil(t, ticker2) // Verify second ticker properties + ticker2.lock.Lock() nextTick, tickDuration = deserialize(ticker2.next) + ticker2.lock.Unlock() assert.WithinDuration(t, now.Add(longDuration), nextTick, time.Second, "Second ticker: invalid next tick time") assert.Equal(t, longDuration, tickDuration, "Second ticker: invalid duration") diff --git a/pool/worker.go b/pool/worker.go index e3b50d4..6d93c65 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -6,7 +6,6 @@ import ( "fmt" "sort" "strconv" - "strings" "sync" "time" @@ -185,7 +184,7 @@ func (w *Worker) handleEvents(c <-chan *streaming.Event) { err = w.startJob(ctx, unmarshalJob(payload)) case evStopJob: w.logger.Debug("handleEvents: received stop job", "event", ev.EventName, "id", ev.ID) - err = w.stopJob(ctx, unmarshalJobKey(payload)) + err = w.stopJob(ctx, unmarshalJobKey(payload), false) case evNotify: w.logger.Debug("handleEvents: received notify", "event", ev.EventName, "id", ev.ID) key, payload := unmarshalNotification(payload) @@ -271,21 +270,23 @@ func (w *Worker) startJob(ctx context.Context, job *Job) error { } // stopJob stops a job. -func (w *Worker) stopJob(ctx context.Context, key string) error { +func (w *Worker) stopJob(ctx context.Context, key string, forRequeue bool) error { if _, ok := w.jobs.Load(key); !ok { - return fmt.Errorf("job %s not found", key) + return fmt.Errorf("job %s not found in local worker", key) } if err := w.handler.Stop(key); err != nil { return fmt.Errorf("failed to stop job %q: %w", key, err) } + w.jobs.Delete(key) if _, _, err := w.jobsMap.RemoveValues(ctx, w.ID, key); err != nil { w.logger.Error(fmt.Errorf("stop job: failed to remove job %q from jobs map: %w", key, err)) } - if _, err := w.jobPayloadsMap.Delete(ctx, key); err != nil { - w.logger.Error(fmt.Errorf("stop job: failed to remove job payload %q from job payloads map: %w", key, err)) + if !forRequeue { + if _, err := w.jobPayloadsMap.Delete(ctx, key); err != nil { + w.logger.Error(fmt.Errorf("stop job: failed to remove job payload %q from job payloads map: %w", key, err)) + } } - w.logger.Info("stopped job", "job", key) - w.jobs.Delete(key) + w.logger.Info("stopped job", "job", key, "for_requeue", forRequeue) return nil } @@ -482,16 +483,14 @@ func (w *Worker) attemptRequeue(ctx context.Context, jobsToRequeue map[string]*J // requeueJob requeues a job. func (w *Worker) requeueJob(ctx context.Context, job *Job) error { - err := w.stopJob(ctx, job.Key) - if err != nil { - return fmt.Errorf("failed to stop job: %w", err) - } - eventID, err := w.Node.poolStream.Add(ctx, evStartJob, marshalJob(job)) if err != nil { return fmt.Errorf("requeueJob: failed to add job to pool stream: %w", err) } w.Node.pendingJobs.Store(eventID, nil) + if err := w.stopJob(ctx, job.Key, true); err != nil { + return fmt.Errorf("failed to stop job: %w", err) + } return nil } @@ -503,17 +502,10 @@ func (w *Worker) cleanup(ctx context.Context) { if _, err := w.keepAliveMap.Delete(ctx, w.ID); err != nil { w.logger.Error(fmt.Errorf("failed to remove worker from keep alive map: %w", err)) } - keys, err := w.jobsMap.Delete(ctx, w.ID) + _, err := w.jobsMap.Delete(ctx, w.ID) if err != nil { w.logger.Error(fmt.Errorf("failed to remove worker from jobs map: %w", err)) } - if keys != "" { - for _, key := range strings.Split(keys, ",") { - if _, err := w.jobPayloadsMap.Delete(ctx, key); err != nil { - w.logger.Error(fmt.Errorf("worker stop: failed to remove job payload %q from job payloads map: %w", key, err)) - } - } - } } // workerStreamName returns the name of the stream used to communicate with the