diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 993c3f1e3..ee7b0676a 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -144,6 +144,8 @@ module Concurrent # subsequent tasks will be rejected in accordance with the configured `fallback_policy`. # * `auto_terminate`: When true (default), the threads started will be marked as daemon. # * `fallback_policy`: The policy defining how rejected tasks are handled. + # * `prestart`: Defaults to false. When true, the minimum number of threads + # will be started when the pool is created. # # Three fallback policies are supported: # diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 9375acf38..d120cddfb 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -119,6 +119,14 @@ def prune_pool synchronize { ns_prune_pool } end + def prestartCoreThread + ns_add_idle_worker + end + + def prestartAllCoreThreads + @min_length.times { ns_add_idle_worker } + end + private # @!visibility private @@ -149,6 +157,10 @@ def ns_initialize(opts) @gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented @next_gc_time = Concurrent.monotonic_time + @gc_interval + + if opts.fetch(:prestart, false) + prestartAllCoreThreads + end end # @!visibility private @@ -247,6 +259,20 @@ def ns_add_busy_worker worker end + # creates new worker which is ready + # @return [nil, Worker] nil of max capacity is reached + # + # @!visibility private + def ns_add_idle_worker + return if @pool.size >= @max_length + + @workers_counter += 1 + @pool << (worker = Worker.new(self, @workers_counter)) + @largest_length = @pool.length if @pool.length > @largest_length + @ready << [worker, Concurrent.monotonic_time] + worker + end + # handle ready worker, giving it new job or assigning back to @ready # # @!visibility private diff --git a/spec/concurrent/executor/fixed_thread_pool_spec.rb b/spec/concurrent/executor/fixed_thread_pool_spec.rb index c7f7db3be..00862adfd 100644 --- a/spec/concurrent/executor/fixed_thread_pool_spec.rb +++ b/spec/concurrent/executor/fixed_thread_pool_spec.rb @@ -177,6 +177,19 @@ module Concurrent end end + context 'prestart' do + + it 'starts threads when prestart is called' do + pool = described_class.new(5, prestart: false) + expect(pool.length).to eq 0 + pool.prestartAllCoreThreads + expect(pool.length).to eq 5 + pool.shutdown + expect(pool.wait_for_termination(pool_termination_timeout)).to eq true + end + + end + context 'worker creation and caching' do it 'never creates more than :num_threads threads' do @@ -189,6 +202,14 @@ module Concurrent pool.shutdown expect(pool.wait_for_termination(pool_termination_timeout)).to eq true end + + it 'creates threads on creation if :prestart is true' do + pool = described_class.new(5, prestart: true) + expect(pool.length).to eq 5 + pool.shutdown + expect(pool.wait_for_termination(pool_termination_timeout)).to eq true + end + end context 'fallback policy' do