diff --git a/lib/sidekiq-rate-limiter/basic_strategy.rb b/lib/sidekiq-rate-limiter/basic_strategy.rb index 9f68ab1..5e62153 100644 --- a/lib/sidekiq-rate-limiter/basic_strategy.rb +++ b/lib/sidekiq-rate-limiter/basic_strategy.rb @@ -1,6 +1,6 @@ module Sidekiq::RateLimiter class BasicStrategy - def call(work, klass, options) + def call(work, klass, args, options) Sidekiq.redis do |conn| lim = Limit.new(conn, options) if lim.exceeded?(klass) diff --git a/lib/sidekiq-rate-limiter/fetch.rb b/lib/sidekiq-rate-limiter/fetch.rb index f1ce2f2..43872ff 100644 --- a/lib/sidekiq-rate-limiter/fetch.rb +++ b/lib/sidekiq-rate-limiter/fetch.rb @@ -4,6 +4,7 @@ require 'redis_rate_limiter' require 'sidekiq-rate-limiter/basic_strategy' require 'sidekiq-rate-limiter/sleep_strategy' +require 'sidekiq-rate-limiter/schedule_in_future_strategy' module Sidekiq::RateLimiter DEFAULT_LIMIT_NAME = @@ -33,7 +34,7 @@ def limit(work) :name => (name.respond_to?(:call) ? name.call(*args) : name).to_s, } - fetch_strategy.call(work, klass, options) + fetch_strategy.call(work, klass, args, options) end def fetch_strategy diff --git a/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb new file mode 100644 index 0000000..7563c67 --- /dev/null +++ b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb @@ -0,0 +1,18 @@ +module Sidekiq::RateLimiter + class ScheduleInFutureStrategy + def call(work, klass, args, options) + Sidekiq.redis do |conn| + lim = Limit.new(conn, options) + if lim.exceeded?(klass) + # Schedule the job to be executed in the future, when we think the rate limit might be back to normal. + Sidekiq::Client.enqueue_to_in(work.queue_name, lim.retry_in?(klass), Object.const_get(klass), *args) + nil + else + lim.add(klass) + work + end + end + end + end +end + diff --git a/lib/sidekiq-rate-limiter/sleep_strategy.rb b/lib/sidekiq-rate-limiter/sleep_strategy.rb index 7dc0b16..5dfabe3 100644 --- a/lib/sidekiq-rate-limiter/sleep_strategy.rb +++ b/lib/sidekiq-rate-limiter/sleep_strategy.rb @@ -1,6 +1,6 @@ module Sidekiq::RateLimiter class SleepStrategy - def call(work, klass, options) + def call(work, klass, args, options) Sidekiq.redis do |conn| lim = Limit.new(conn, options) if lim.exceeded?(klass) diff --git a/spec/sidekiq-rate-limiter/fetch_spec.rb b/spec/sidekiq-rate-limiter/fetch_spec.rb index 52688e3..2206dcd 100644 --- a/spec/sidekiq-rate-limiter/fetch_spec.rb +++ b/spec/sidekiq-rate-limiter/fetch_spec.rb @@ -123,6 +123,35 @@ def perform(arg1, arg2); end end end + context 'with the schedule in the future strategy' do + before :each do + Sidekiq::RateLimiter.configure do |config| + config.fetch_strategy = Sidekiq::RateLimiter::ScheduleInFutureStrategy + end + end + + it 'should place rate-limited work at the back of the queue', queuing: true do + worker.perform_async(*args) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) + + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(100) + + fetch = described_class.new(options) + expect(fetch.retrieve_work).to be_nil + + # expect the job to move to being scheduled in the future + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq(0) + + ss = Sidekiq::ScheduledSet.new + expect(ss.size).to eq(1) + end + + after :each do + Sidekiq::RateLimiter.reset + end + end + it 'should accept procs for limit, name, and period config keys', queuing: true do proc_worker.perform_async(1,2)