Skip to content

Commit

Permalink
Implement service workers
Browse files Browse the repository at this point in the history
Ref: #110

It's not uncommon for applications to want to have some background
threads doing regular work for various purposes (e.g. emitting metrics,
prewarming node local caches, etc).

Currently the best way to do it, is to spawn some background threads
in the `after_mold_fork`, but this isn't great as the mold may fork
at any moment to spawn a new worker, and forking while background
threads may be doing work is risky.

Instead we can provide a dedicated service worker process to run these
threads.
  • Loading branch information
byroot committed May 27, 2024
1 parent 3de195e commit 27b071d
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 35 deletions.
33 changes: 33 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,39 @@ after_request_complete do |server, worker, env|
end
```

### `before_service_worker_ready` (experimental)

Experimental and may change at any point.

If defined, Pitchfork will spawn one extra worker, called a service worker
which doesn't accept incoming requests, but allows to perform service tasks
such as warming node local caches or emitting metrics.

Service workers are never promoted to molds, so it is safe to use threads and
other fork unsafe APIs.

This callback MUST not block. It should start one or multiple background threads
to perform tasks at regular intervals.

```ruby
before_service_worker_ready do |server, service_worker|
Thread.new do
loop do
MyApp.emit_utilization_metrics
sleep 1
end
end
end
```

### `before_service_worker_exit` (experimental)

Experimental and may change at any point.

Optional.

Called whenever the service worker is exiting. This allow to do a clean shutdown.

## Reforking

### `refork_after`
Expand Down
27 changes: 27 additions & 0 deletions examples/pitchfork.conf.service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true
# Minimal sample configuration file for Pitchfork

# listen 2007 # by default Pitchfork listens on port 8080
worker_processes 4 # this should be >= nr_cpus
refork_after [50, 100, 1000]

service_thread = nil
service_shutdown = false

before_service_worker_ready do |server, service|
service_thread = Thread.new do
server.logger.info "Service: start"
count = 1
until service_shutdown
server.logger.info "Service: ping count=#{count}"
count += 1
sleep 1
end
end
end

before_service_worker_exit do |server, service|
server.logger.info "Service: shutting down"
service_shutdown = true
service_thread&.join(2)
end
22 changes: 20 additions & 2 deletions lib/pitchfork/children.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@
module Pitchfork
# This class keep tracks of the state of all the master children.
class Children
attr_reader :mold
attr_reader :mold, :service
attr_accessor :last_generation

def initialize
@last_generation = 0
@children = {} # All children, including molds, indexed by PID.
@children = {} # All children, including molds and services, indexed by PID.
@workers = {} # Workers indexed by their `nr`.
@molds = {} # Molds, index by PID.
@mold = nil # The latest mold, if any.
@service = nil
@pending_workers = {} # Pending workers indexed by their `nr`.
@pending_molds = {} # Worker promoted to mold, not yet acknowledged
end

def register(child)
# Children always start as workers, never molds, so we know they have a `#nr`.
unless child.nr
raise "[BUG] Trying to register a child without an `nr`: #{child.inspect}"
end
@pending_workers[child.nr] = @workers[child.nr] = child
end

def register_service(service)
@service = service
end

def register_mold(mold)
@pending_molds[mold.pid] = mold
@children[mold.pid] = mold
Expand All @@ -39,6 +47,11 @@ def update(message)
@pending_molds[mold.pid] = mold
@children[mold.pid] = mold
return mold
when Message::ServiceSpawned
service = @service
service.update(message)
@children[service.pid] = service
return service
end

child = @children[message.pid] || (message.nr && @workers[message.nr])
Expand Down Expand Up @@ -73,12 +86,17 @@ def reap(pid)
@pending_molds.delete(child.pid)
@molds.delete(child.pid)
@workers.delete(child.nr)

if @mold == child
@pending_workers.reject! do |nr, worker|
worker.generation == @mold.generation
end
@mold = nil
end

if @service == child
@service = nil
end
end
child
end
Expand Down
12 changes: 12 additions & 0 deletions lib/pitchfork/configurator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class Configurator
"repead unknown process (#{status.inspect})"
elsif worker.mold?
"mold pid=#{worker.pid rescue 'unknown'} gen=#{worker.generation rescue 'unknown'} reaped (#{status.inspect})"
elsif worker.service?
"service pid=#{worker.pid rescue 'unknown'} gen=#{worker.generation rescue 'unknown'} reaped (#{status.inspect})"
else
"worker=#{worker.nr rescue 'unknown'} pid=#{worker.pid rescue 'unknown'} gen=#{worker.generation rescue 'unknown'} reaped (#{status.inspect})"
end
Expand All @@ -75,6 +77,8 @@ class Configurator
:check_client_connection => false,
:rewindable_input => true,
:client_body_buffer_size => Pitchfork::Const::MAX_BODY,
:before_service_worker_ready => nil,
:before_service_worker_exit => nil,
}
#:startdoc:

Expand Down Expand Up @@ -176,6 +180,14 @@ def after_request_complete(*args, &block)
set_hook(:after_request_complete, block_given? ? block : args[0], 3)
end

def before_service_worker_ready(&block)
set_hook(:before_service_worker_ready, block, 2)
end

def before_service_worker_exit(&block)
set_hook(:before_service_worker_exit, block, 2)
end

def timeout(seconds, cleanup: 2)
soft_timeout = set_int(:soft_timeout, seconds, 3)
cleanup_timeout = set_int(:cleanup_timeout, cleanup, 2)
Expand Down
Loading

0 comments on commit 27b071d

Please # to comment.