diff --git a/lib/sidekiq-rate-limiter.rb b/lib/sidekiq-rate-limiter.rb index 31b358c..ef750c0 100644 --- a/lib/sidekiq-rate-limiter.rb +++ b/lib/sidekiq-rate-limiter.rb @@ -1,2 +1,22 @@ require 'sidekiq-rate-limiter/version' require 'sidekiq-rate-limiter/fetch' +require 'sidekiq-rate-limiter/configuration' + +module Sidekiq::RateLimiter + class << self + attr_writer :configuration + end + + def self.configuration + @configuration ||= Configuration.new + end + + def self.reset + @configuration = Configuration.new + end + + def self.configure + yield(configuration) + end + +end diff --git a/lib/sidekiq-rate-limiter/basic_strategy.rb b/lib/sidekiq-rate-limiter/basic_strategy.rb new file mode 100644 index 0000000..5e62153 --- /dev/null +++ b/lib/sidekiq-rate-limiter/basic_strategy.rb @@ -0,0 +1,16 @@ +module Sidekiq::RateLimiter + class BasicStrategy + def call(work, klass, args, options) + Sidekiq.redis do |conn| + lim = Limit.new(conn, options) + if lim.exceeded?(klass) + conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job) + nil + else + lim.add(klass) + work + end + end + end + end +end diff --git a/lib/sidekiq-rate-limiter/configuration.rb b/lib/sidekiq-rate-limiter/configuration.rb new file mode 100644 index 0000000..612344f --- /dev/null +++ b/lib/sidekiq-rate-limiter/configuration.rb @@ -0,0 +1,9 @@ +module Sidekiq::RateLimiter + class Configuration + attr_accessor :fetch_strategy + + def initialize + @fetch_strategy = Sidekiq::RateLimiter::BasicStrategy + end + end +end diff --git a/lib/sidekiq-rate-limiter/fetch.rb b/lib/sidekiq-rate-limiter/fetch.rb index 99cd784..43872ff 100644 --- a/lib/sidekiq-rate-limiter/fetch.rb +++ b/lib/sidekiq-rate-limiter/fetch.rb @@ -2,6 +2,9 @@ require 'celluloid' if Sidekiq::VERSION < "4" require 'sidekiq/fetch' require 'redis_rate_limiter' +require 'sidekiq-rate-limiter/basic_strategy' +require 'sidekiq-rate-limiter/sleep_strategy' +require 'sidekiq-rate-limiter/schedule_in_future_strategy' module Sidekiq::RateLimiter DEFAULT_LIMIT_NAME = @@ -31,18 +34,12 @@ def limit(work) :name => (name.respond_to?(:call) ? name.call(*args) : name).to_s, } - Sidekiq.redis do |conn| - lim = Limit.new(conn, options) - if lim.exceeded?(klass) - conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job) - nil - else - lim.add(klass) - work - end - end + fetch_strategy.call(work, klass, args, options) end + def fetch_strategy + Sidekiq::RateLimiter.configuration.fetch_strategy.new + end end class Rate diff --git a/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb new file mode 100644 index 0000000..f0e65da --- /dev/null +++ b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb @@ -0,0 +1,26 @@ +module Sidekiq::RateLimiter + class ScheduleInFutureStrategy + def call(work, klass, args, options) + Sidekiq.redis do |conn| + lim = Limit.new(conn, options) + if lim.exceeded?(klass) + + # add a random amount of jitter that is proportional to the length of time the retry is in the future. + # this helps us spread out the jobs more evenly, as clumps of jobs on the queue can interfere with normal + # throughput of non-rate limited jobs. This jitter is additive. It's also useful in cases where we would like + # to dump thousands of jobs onto the queue and eventually have them delivered. + retry_in = lim.retry_in?(klass) + retry_in = retry_in + rand(retry_in/5) if retry_in > 10 + + # Schedule the job to be executed in the future, when we think the rate limit might be back to normal. + Sidekiq::Client.enqueue_to_in(work.queue_name, retry_in, Object.const_get(klass), *args) + nil + else + lim.add(klass) + work + end + end + end + end +end + diff --git a/lib/sidekiq-rate-limiter/sleep_strategy.rb b/lib/sidekiq-rate-limiter/sleep_strategy.rb new file mode 100644 index 0000000..5dfabe3 --- /dev/null +++ b/lib/sidekiq-rate-limiter/sleep_strategy.rb @@ -0,0 +1,25 @@ +module Sidekiq::RateLimiter + class SleepStrategy + def call(work, klass, args, options) + Sidekiq.redis do |conn| + lim = Limit.new(conn, options) + if lim.exceeded?(klass) + # if this job is being rate-limited for longer than 1 second, sleep for that amount + # of time before putting the job back on the queue. + # + # This is undesirable as it ties up the sidekiq thread for one second, but it does help in situations where + # high redis/sidekiq CPU usage is causing problems. + + if lim.retry_in?(klass) > 1.0 + sleep(1) + end + conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job) + nil + else + lim.add(klass) + work + end + end + end + end +end diff --git a/spec/sidekiq-rate-limiter/configure_spec.rb b/spec/sidekiq-rate-limiter/configure_spec.rb new file mode 100644 index 0000000..1eb63d9 --- /dev/null +++ b/spec/sidekiq-rate-limiter/configure_spec.rb @@ -0,0 +1,25 @@ +require 'spec_helper' + +RSpec.describe "#configure" do + context 'default' do + it 'should have the basic strategy by default' do + expect(Sidekiq::RateLimiter.configuration.fetch_strategy).to eq(Sidekiq::RateLimiter::BasicStrategy) + end + end + + context 'with a strategy set' do + before :each do + Sidekiq::RateLimiter.configure do |config| + config.fetch_strategy = Sidekiq::RateLimiter::SleepStrategy + end + end + + it 'should have the sleep strategy if set' do + expect(Sidekiq::RateLimiter.configuration.fetch_strategy).to eq(Sidekiq::RateLimiter::SleepStrategy) + end + + after :each do + Sidekiq::RateLimiter.reset + end + end +end diff --git a/spec/sidekiq-rate-limiter/fetch_spec.rb b/spec/sidekiq-rate-limiter/fetch_spec.rb index 5a780fe..5ece981 100644 --- a/spec/sidekiq-rate-limiter/fetch_spec.rb +++ b/spec/sidekiq-rate-limiter/fetch_spec.rb @@ -51,22 +51,25 @@ def perform(arg1, arg2); end expect(fetch.queues_cmd).to eql(["queue:#{queue}", "queue:#{another_queue}", timeout]) end - it 'should retrieve work', queuing: true do - worker.perform_async(*args) - fetch = described_class.new(options) - work = fetch.retrieve_work - parsed = JSON.parse(work.respond_to?(:message) ? work.message : work.job) + shared_examples 'retrieve_work' do |parameter| - expect(work).not_to be_nil - expect(work.queue_name).to eql(queue) - expect(work.acknowledge).to be_nil + it 'should retrieve work', queuing: true do + worker.perform_async(*args) + fetch = described_class.new(options) + work = fetch.retrieve_work + parsed = JSON.parse(work.respond_to?(:message) ? work.message : work.job) - expect(parsed).to include(worker.get_sidekiq_options) - expect(parsed).to include("class" => worker.to_s, "args" => args) - expect(parsed).to include("jid", "enqueued_at") + expect(work).not_to be_nil + expect(work.queue_name).to eql(queue) + expect(work.acknowledge).to be_nil - q = Sidekiq::Queue.new(queue) - expect(q.size).to eq 0 + expect(parsed).to include(worker.get_sidekiq_options) + expect(parsed).to include("class" => worker.to_s, "args" => args) + expect(parsed).to include("jid", "enqueued_at") + + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq 0 + end end it 'should place rate-limited work at the back of the queue', queuing: true do @@ -81,6 +84,85 @@ def perform(arg1, arg2); end expect(q.size).to eq 1 end + context 'with the basic strategy' do + include_examples 'retrieve_work' + end + + context 'with the sleep strategy' do + before :each do + Sidekiq::RateLimiter.configure do |config| + config.fetch_strategy = Sidekiq::RateLimiter::SleepStrategy + end + end + + include_examples 'retrieve_work' + + it 'should place rate-limited work at the back of the queue immediately when retry_in? is less than 1.0', queuing: true do + worker.perform_async(*args) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) + expect_any_instance_of(redis_class).to receive(:lpush).exactly(:once).and_call_original + + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(0.1) + expect_any_instance_of(Sidekiq::RateLimiter::SleepStrategy).to_not receive(:sleep) + + fetch = described_class.new(options) + expect(fetch.retrieve_work).to be_nil + + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq 1 + end + + it 'should call sleep when appropriate' do + worker.perform_async(*args) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(2.0) + expect_any_instance_of(Sidekiq::RateLimiter::SleepStrategy).to receive(:sleep).with(1).and_return(true) + + expect_any_instance_of(redis_class).to receive(:lpush).exactly(:once).and_call_original + + fetch = described_class.new(options) + expect(fetch.retrieve_work).to be_nil + + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq 1 + end + + after :each do + Sidekiq::RateLimiter.reset + end + end + + context 'with the schedule in the future strategy' do + before :each do + Sidekiq::RateLimiter.configure do |config| + config.fetch_strategy = Sidekiq::RateLimiter::ScheduleInFutureStrategy + end + end + + include_examples 'retrieve_work' + + it 'should place rate-limited work at the back of the queue', queuing: true do + worker.perform_async(*args) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) + + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(100) + + fetch = described_class.new(options) + expect(fetch.retrieve_work).to be_nil + + # expect the job to move to being scheduled in the future + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq(0) + + ss = Sidekiq::ScheduledSet.new + expect(ss.size).to eq(1) + end + + after :each do + Sidekiq::RateLimiter.reset + end + end + it 'should accept procs for limit, name, and period config keys', queuing: true do proc_worker.perform_async(1,2)