Skip to content

Commit

Permalink
Fix/lost job on requeue (#45)
Browse files Browse the repository at this point in the history
* Do not remove requeued jobs from Redis

* Simplify usage of pool package

Remove options that should not be changed.
Fix a potential race condition on requeue where a job payload may get deleted too early.

* Fix race condition in test
  • Loading branch information
raphael authored Oct 23, 2024
1 parent 869949d commit be67ad4
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 282 deletions.
5 changes: 1 addition & 4 deletions examples/pool/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ func main() {
}

// Create node for pool "example".
node, err := pool.AddNode(ctx, "example", rdb,
pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster
pool.WithLogger(logger),
)
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
if err != nil {
panic(err)
}
Expand Down
5 changes: 1 addition & 4 deletions examples/pool/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ func main() {
}

// Create node for pool "example".
node, err := pool.AddNode(ctx, "example", rdb,
pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster
pool.WithLogger(logger),
)
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
if err != nil {
panic(err)
}
Expand Down
254 changes: 61 additions & 193 deletions pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,48 @@ flowchart LR

## Usage

Pulse dedicated worker pools are generally valuable when workers require
state which depends on the jobs they perform.

To illustrate, let's consider the scenario of a multitenant system that requires
managing a collection of background tasks for each tenant. In this case,
utilizing a Pulse worker pool proves to be highly beneficial. The system can
create a dedicated worker pool and create one job per tenant, utilizing the
unique tenant identifier as the job key. This approach ensures that only one
worker handles the background task for a specific tenant at any given time. As
new tenants are added or old ones are removed, jobs can be started or stopped
accordingly. Similarly, workers can be added or removed based on performance
requirements.

Pulse dedicated worker pools are not needed when workers are stateless and can
be scaled horizontally. In such cases, any standard load balancing solution can
be used.
Job producer:
```go
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
node, err := pool.AddNode(ctx, "example", rdb, pool.WithClientOnly())
if err != nil {
panic(err)
}
if err := node.DispatchJob(ctx, "key", []byte("payload")); err != nil {
panic(err)
}
```

Worker:
```go
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
node, err := pool.AddNode(ctx, "example", rdb)
if err != nil {
panic(err)
}
handler := &JobHandler{}
_, err := node.AddWorker(context.Background(), handler)
if err != nil {
panic(err)
}
```

Job handler:
```go
type JobHandler struct {
// ...
}

// Pulse calls this method to start a job that was assigned to this worker.
func (h *JobHandler) Start(ctx context.Context, key string, payload []byte) error {
// ...
}

// Pulse calls this method to stop a job that was assigned to this worker.
func (h *JobHandler) Stop(ctx context.Context, key string) error {
// ...
}
```

### Creating A Pool

Expand All @@ -78,24 +104,28 @@ should be closed when it is no longer needed (see below).
The options are used to configure the pool node. The following options are
available:

* `WithClientOnly` - specifies that this node will only be used to dispatch jobs to
workers in other nodes, and will not run any workers itself.
* `WithLogger` - sets the logger to be used by the pool node.
* `WithWorkerTTL` - sets the worker time-to-live (TTL) in seconds. The TTL
defines the maximum delay between two health-checks before a worker is removed
from the pool. The default value is 10 seconds.
* `WithPendingJobTTL` - sets the pending job time-to-live (TTL) in seconds. The
TTL defines the maximum delay between a worker picking up the job and
successfully starting it. The default value is 20 seconds.
* `WithWorkerTTL` - sets the worker time-to-live (TTL). This is the maximum duration
a worker can go without sending a health check before it's considered inactive
and removed from the pool. If a worker doesn't report its status within this
time frame, it will be removed, allowing the pool to reassign its jobs to other
active workers. The default value is 30 seconds.
* `WithWorkerShutdownTTL` - specifies the maximum time to wait for a worker to
shutdown gracefully. The default value is 2 minutes.
shutdown gracefully. This is the duration the pool will wait for a worker to
finish its current job and perform any cleanup operations before forcefully
terminating it. If the worker doesn't shut down within this time, it will be
forcefully stopped. The default value is 2 minutes.
* `WithMaxQueuedJobs` - sets the maximum number of jobs that can be queued
before the pool starts rejecting new jobs. The default value is 1000.
* `WithClientOnly` - specifies that the pool node should not starts
background goroutines to manage the pool and thus not allow creating workers.
This option is useful when the pool is used only to dispatch jobs to workers
that are created in other nodes.
* `WithJobSinkBlockDuration` - sets the max poll duration for new jobs. This
value is mostly used by tests to accelerate the pool shutdown process. The
default value is 5 seconds.
before the pool starts rejecting new jobs. This limit applies to the entire
pool across all nodes. When this limit is reached, any attempt to dispatch
new jobs will result in an error. The default value is 1000 jobs.
* `WithAckGracePeriod` - sets the grace period for job acknowledgment. If a
worker doesn't acknowledge starting a job within this duration, the job
becomes available for other workers to claim. This prevents jobs from being
stuck if a worker fails to start processing them. The default value is 20
seconds.

### Closing A Node

Expand Down Expand Up @@ -168,165 +198,3 @@ a list of jobs to be started and stopped.

`Schedule` makes it possible to maintain a pool of jobs for example in a
multi-tenant system. See the [examples](../examples/pool) for more details.

## Data Flows

The following sections provide additional details on the internal data flows
involved in creating and using a Pulse worker pool. They are provided for
informational purposes only and are not required reading for simply using the
package.

### Adding A New Job

The following diagram illustrates the data flow involved in adding a new job to
a Pulse worker pool:

* The producer calls `DispatchJob` which adds an event to the pool job stream.
* The pool job stream is read by the pool sink running in one of the pool nodes.
The routing node records the event so it can ack it later and routes the event
to the proper worker stream using a consistent hashing algorithm.
* The dedicated worker stream is read by the worker which starts the job by
calling the `Start` method on the worker job handler. Once `Start` returns
successfully the worker sends an event back to the original pool node.
* Upon getting the event, the pool node acks the job with the
pool job stream and removes it from its pending jobs map.


```mermaid
%%{ init: { 'flowchart': { 'curve': 'basis' } } }%%
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart TD
subgraph w[Worker Node]
r[Reader]
u[User code]
end
subgraph rdb[Redis]
js(["Pool Job Stream (shared)"])
ws(["Worker Stream (dedicated)"])
rs(["Routing Node Stream (dedicated)"])
end
subgraph p[Producer Node]
pr[User code]
no[Client Node]
end
subgraph ro[Routing Node]
ps[Pool Sink]
nr[Routing Node Reader]
end
pr --1. DispatchJob--> no
no --2. Add Job--> js
js --3. Job--> ps
ps --4. Add Job--> ws
ws --5. Job--> r
r --6. Start Job--> u
r --7. Add Ack--> rs
rs --7. Ack--> nr
nr --8. Ack Add Job Event--> js
classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF;
classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
classDef background fill:#7A7A7A, color:#F2F2F2;
class pr,u userCode;
class pj,js,ws,rs redis;
class no,ps,r,c,nr producer;
class p,w,rdb,ro background;
linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
```

The worker pool uses a job stream so that jobs that do not get acknowledged in time
are automatically re-queued. This is useful in case of worker failure or
network partitioning. The pool sink applies the consistent hashing algorithm
to the job key to determine which worker stream the job should be added to. This
ensures that unhealthy workers are properly ignored when requeuing jobs.

### Shutdown and Cleanup

The following diagram illustrates the data flow involved in shutting down a
Pulse worker pool:

* The producer calls `Shutdown` which adds a shutdown event to the pool stream.
* Upon receving the shutdown event the pool node closes the pool stream to avoid
accepting new jobs and sets a flag in the pool shutdown replicated map.
* The pool nodes get notified and stop accepting new jobs (`DispatchJob`
returns an error if called).
* The pool nodes add a stop event to the worker streams for all the workers
they own.
* Upon receiving the event, the workers remove themselves from the pool
workers replicated map, destroy their stream and exit. Note that any job that
was enqueued before the shutdown event still gets processed.
* Once the workers have stopped, the producer that initiated the
shutdown cleans up the pool resources (jobs sink, jobs stream, replicated maps)
and the pool nodes exit.

```mermaid
%%{ init: { 'flowchart': { 'curve': 'basis' } } }%%
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart TD
subgraph pn1[Pool Node 1]
u[User code]
po1[Pool 1]
w1[Worker 1]
end
subgraph pn2[Pool Node 2]
po2[Pool 2]
w2[Worker 2]
end
subgraph rdb[Redis]
sr[(Shutdown <br/> Replicated Map)]
wr[(Worker </br/> Replicated Map)]
ws1(["Worker 1 Stream"])
ws2(["Worker 2 Stream"])
end
u[User code] --1. Shutdown--> po1[Pool 1]
po1 --2. Set Shutdown Flag--> sr[(Shutdown <br/> Replicated Map)]
sr --3. Shutdown Flag--> po1
sr --3. Shutdown Flag--> po2
po1 --4. Add Stop--> ws1
po2 --4. Add Stop--> ws2
ws1 --5. Stop--> w1
ws2 --5. Stop--> w2
w1 --6. Remove Worker--> wr
w2 --6. Remove Worker--> wr
w1 --7. Delete--> ws1
w2 --7. Delete--> ws2
wr --8. Workers Empty--> po1
po1 --9. Delete --> sr
po1 --10. Delete --> wr
classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF;
classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
classDef background fill:#7A7A7A, color:#F2F2F2;
class u userCode;
class wr,sr,ws1,ws2 redis;
class po1,po2,w1,w2 producer;
class rdb,pn1,pn2 background;
linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 8 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 9 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 10 stroke:#FF8888,color:#FF8888,stroke-width:3px;
linkStyle 11 stroke:#FF8888,color:#FF8888,stroke-width:3px;
linkStyle 12 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
linkStyle 13 stroke:#FF8888,color:#FF8888,stroke-width:3px;
linkStyle 14 stroke:#FF8888,color:#FF8888,stroke-width:3px;
```
Loading

0 comments on commit be67ad4

Please # to comment.