From 092f2f67e0cdb615b3356f335dba448af71a8378 Mon Sep 17 00:00:00 2001 From: Matthew Fenelon Date: Sat, 14 Sep 2024 19:35:54 +0100 Subject: [PATCH] Add thread pool stats broadcast for observability Collecting stats on the thread pool can be useful to understand the state of the threadpool. For example, are a lot of tasks having to be queued. --- lib/graphiti/scope.rb | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index bb9abfef..bdc4df75 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -3,6 +3,9 @@ class Scope attr_accessor :object, :unpaginated_object attr_reader :pagination + GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS = %w[ + length max_length queue_length max_queue completed_task_count largest_length scheduled_task_count synchronous + ] GLOBAL_THREAD_POOL_EXECUTOR = Concurrent::Promises.delay do if Graphiti.config.concurrency concurrency = Graphiti.config.concurrency_max_threads || 4 @@ -16,12 +19,18 @@ class Scope Concurrent::ThreadPoolExecutor.new(max_threads: 0, synchronous: true, fallback_policy: :caller_runs) end end - private_constant :GLOBAL_THREAD_POOL_EXECUTOR + private_constant :GLOBAL_THREAD_POOL_EXECUTOR, :GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS def self.global_thread_pool_executor GLOBAL_THREAD_POOL_EXECUTOR.value! end + def self.global_thread_pool_stats + GLOBAL_THREAD_POOL_EXECUTOR_BROADCAST_STATS.each_with_object({}) do |key, memo| + memo[key] = global_thread_pool_executor.send(key) + end + end + def initialize(object, resource, query, opts = {}) @object = object @resource = resource @@ -148,7 +157,9 @@ def future_with_fiber_locals(*args) end end - result = yield(*args) + result = Graphiti.broadcast(:global_thread_pool_task_run, self.class.global_thread_pool_stats) do + yield(*args) + end if execution_context_changed thread_storage&.keys&.each { |key| Thread.current[key] = nil }