Skip to content

Commit

Permalink
Add thread pool stats broadcast for observability
Browse files Browse the repository at this point in the history
Collecting stats on the thread pool can be useful to understand the
state of the threadpool. For example, are a lot of tasks having to be
queued.
  • Loading branch information
MattFenelon committed Sep 14, 2024
1 parent 4cf866b commit 092f2f6
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions lib/graphiti/scope.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ class Scope
attr_accessor :object, :unpaginated_object
attr_reader :pagination

GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS = %w[
length max_length queue_length max_queue completed_task_count largest_length scheduled_task_count synchronous
]
GLOBAL_THREAD_POOL_EXECUTOR = Concurrent::Promises.delay do
if Graphiti.config.concurrency
concurrency = Graphiti.config.concurrency_max_threads || 4
Expand All @@ -16,12 +19,18 @@ class Scope
Concurrent::ThreadPoolExecutor.new(max_threads: 0, synchronous: true, fallback_policy: :caller_runs)
end
end
private_constant :GLOBAL_THREAD_POOL_EXECUTOR
private_constant :GLOBAL_THREAD_POOL_EXECUTOR, :GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS

def self.global_thread_pool_executor
GLOBAL_THREAD_POOL_EXECUTOR.value!
end

def self.global_thread_pool_stats
GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS.each_with_object({}) do |key, memo|
memo[key] = global_thread_pool_executor.send(key)
end
end

def initialize(object, resource, query, opts = {})
@object = object
@resource = resource
Expand Down Expand Up @@ -148,7 +157,9 @@ def future_with_fiber_locals(*args)
end
end

result = yield(*args)
result = Graphiti.broadcast(:global_thread_pool_task_run, self.class.global_thread_pool_stats) do
yield(*args)
end

if execution_context_changed
thread_storage&.keys&.each { |key| Thread.current[key] = nil }
Expand Down

0 comments on commit 092f2f6

Please # to comment.