From 39312ee3675389479f3c0a18bed2038384007270 Mon Sep 17 00:00:00 2001 From: Nathan Woodhull Date: Mon, 1 May 2017 18:27:38 -0400 Subject: [PATCH 1/4] adds a sleep strategy and ability to configure the strategy. --- lib/sidekiq-rate-limiter.rb | 20 ++++++++++ lib/sidekiq-rate-limiter/basic_strategy.rb | 16 ++++++++ lib/sidekiq-rate-limiter/configuration.rb | 9 +++++ lib/sidekiq-rate-limiter/fetch.rb | 16 +++----- lib/sidekiq-rate-limiter/sleep_strategy.rb | 25 ++++++++++++ spec/sidekiq-rate-limiter/configure_spec.rb | 25 ++++++++++++ spec/sidekiq-rate-limiter/fetch_spec.rb | 42 +++++++++++++++++++++ 7 files changed, 143 insertions(+), 10 deletions(-) create mode 100644 lib/sidekiq-rate-limiter/basic_strategy.rb create mode 100644 lib/sidekiq-rate-limiter/configuration.rb create mode 100644 lib/sidekiq-rate-limiter/sleep_strategy.rb create mode 100644 spec/sidekiq-rate-limiter/configure_spec.rb diff --git a/lib/sidekiq-rate-limiter.rb b/lib/sidekiq-rate-limiter.rb index 31b358c..ef750c0 100644 --- a/lib/sidekiq-rate-limiter.rb +++ b/lib/sidekiq-rate-limiter.rb @@ -1,2 +1,22 @@ require 'sidekiq-rate-limiter/version' require 'sidekiq-rate-limiter/fetch' +require 'sidekiq-rate-limiter/configuration' + +module Sidekiq::RateLimiter + class << self + attr_writer :configuration + end + + def self.configuration + @configuration ||= Configuration.new + end + + def self.reset + @configuration = Configuration.new + end + + def self.configure + yield(configuration) + end + +end diff --git a/lib/sidekiq-rate-limiter/basic_strategy.rb b/lib/sidekiq-rate-limiter/basic_strategy.rb new file mode 100644 index 0000000..9f68ab1 --- /dev/null +++ b/lib/sidekiq-rate-limiter/basic_strategy.rb @@ -0,0 +1,16 @@ +module Sidekiq::RateLimiter + class BasicStrategy + def call(work, klass, options) + Sidekiq.redis do |conn| + lim = Limit.new(conn, options) + if lim.exceeded?(klass) + conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job) + nil + else + lim.add(klass) + work + end + end + end + end +end diff --git a/lib/sidekiq-rate-limiter/configuration.rb b/lib/sidekiq-rate-limiter/configuration.rb new file mode 100644 index 0000000..612344f --- /dev/null +++ b/lib/sidekiq-rate-limiter/configuration.rb @@ -0,0 +1,9 @@ +module Sidekiq::RateLimiter + class Configuration + attr_accessor :fetch_strategy + + def initialize + @fetch_strategy = Sidekiq::RateLimiter::BasicStrategy + end + end +end diff --git a/lib/sidekiq-rate-limiter/fetch.rb b/lib/sidekiq-rate-limiter/fetch.rb index 99cd784..f1ce2f2 100644 --- a/lib/sidekiq-rate-limiter/fetch.rb +++ b/lib/sidekiq-rate-limiter/fetch.rb @@ -2,6 +2,8 @@ require 'celluloid' if Sidekiq::VERSION < "4" require 'sidekiq/fetch' require 'redis_rate_limiter' +require 'sidekiq-rate-limiter/basic_strategy' +require 'sidekiq-rate-limiter/sleep_strategy' module Sidekiq::RateLimiter DEFAULT_LIMIT_NAME = @@ -31,18 +33,12 @@ def limit(work) :name => (name.respond_to?(:call) ? name.call(*args) : name).to_s, } - Sidekiq.redis do |conn| - lim = Limit.new(conn, options) - if lim.exceeded?(klass) - conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job) - nil - else - lim.add(klass) - work - end - end + fetch_strategy.call(work, klass, options) end + def fetch_strategy + Sidekiq::RateLimiter.configuration.fetch_strategy.new + end end class Rate diff --git a/lib/sidekiq-rate-limiter/sleep_strategy.rb b/lib/sidekiq-rate-limiter/sleep_strategy.rb new file mode 100644 index 0000000..7dc0b16 --- /dev/null +++ b/lib/sidekiq-rate-limiter/sleep_strategy.rb @@ -0,0 +1,25 @@ +module Sidekiq::RateLimiter + class SleepStrategy + def call(work, klass, options) + Sidekiq.redis do |conn| + lim = Limit.new(conn, options) + if lim.exceeded?(klass) + # if this job is being rate-limited for longer than 1 second, sleep for that amount + # of time before putting the job back on the queue. + # + # This is undesirable as it ties up the sidekiq thread for one second, but it does help in situations where + # high redis/sidekiq CPU usage is causing problems. + + if lim.retry_in?(klass) > 1.0 + sleep(1) + end + conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job) + nil + else + lim.add(klass) + work + end + end + end + end +end diff --git a/spec/sidekiq-rate-limiter/configure_spec.rb b/spec/sidekiq-rate-limiter/configure_spec.rb new file mode 100644 index 0000000..1eb63d9 --- /dev/null +++ b/spec/sidekiq-rate-limiter/configure_spec.rb @@ -0,0 +1,25 @@ +require 'spec_helper' + +RSpec.describe "#configure" do + context 'default' do + it 'should have the basic strategy by default' do + expect(Sidekiq::RateLimiter.configuration.fetch_strategy).to eq(Sidekiq::RateLimiter::BasicStrategy) + end + end + + context 'with a strategy set' do + before :each do + Sidekiq::RateLimiter.configure do |config| + config.fetch_strategy = Sidekiq::RateLimiter::SleepStrategy + end + end + + it 'should have the sleep strategy if set' do + expect(Sidekiq::RateLimiter.configuration.fetch_strategy).to eq(Sidekiq::RateLimiter::SleepStrategy) + end + + after :each do + Sidekiq::RateLimiter.reset + end + end +end diff --git a/spec/sidekiq-rate-limiter/fetch_spec.rb b/spec/sidekiq-rate-limiter/fetch_spec.rb index 5a780fe..52688e3 100644 --- a/spec/sidekiq-rate-limiter/fetch_spec.rb +++ b/spec/sidekiq-rate-limiter/fetch_spec.rb @@ -81,6 +81,48 @@ def perform(arg1, arg2); end expect(q.size).to eq 1 end + context 'with the sleep strategy' do + before :each do + Sidekiq::RateLimiter.configure do |config| + config.fetch_strategy = Sidekiq::RateLimiter::SleepStrategy + end + end + + it 'should place rate-limited work at the back of the queue immediately when retry_in? is less than 1.0', queuing: true do + worker.perform_async(*args) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) + expect_any_instance_of(redis_class).to receive(:lpush).exactly(:once).and_call_original + + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(0.1) + expect_any_instance_of(Sidekiq::RateLimiter::SleepStrategy).to_not receive(:sleep) + + fetch = described_class.new(options) + expect(fetch.retrieve_work).to be_nil + + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq 1 + end + + it 'should call sleep when appropriate' do + worker.perform_async(*args) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(2.0) + expect_any_instance_of(Sidekiq::RateLimiter::SleepStrategy).to receive(:sleep).with(1).and_return(true) + + expect_any_instance_of(redis_class).to receive(:lpush).exactly(:once).and_call_original + + fetch = described_class.new(options) + expect(fetch.retrieve_work).to be_nil + + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq 1 + end + + after :each do + Sidekiq::RateLimiter.reset + end + end + it 'should accept procs for limit, name, and period config keys', queuing: true do proc_worker.perform_async(1,2) From e308e546839ef97b6f72b3ba5675a7f2ee226164 Mon Sep 17 00:00:00 2001 From: Nathan Woodhull Date: Tue, 2 May 2017 18:35:12 -0400 Subject: [PATCH 2/4] adds a schedule in future strategy. --- lib/sidekiq-rate-limiter/basic_strategy.rb | 2 +- lib/sidekiq-rate-limiter/fetch.rb | 3 +- .../schedule_in_future_strategy.rb | 18 ++++++++++++ lib/sidekiq-rate-limiter/sleep_strategy.rb | 2 +- spec/sidekiq-rate-limiter/fetch_spec.rb | 29 +++++++++++++++++++ 5 files changed, 51 insertions(+), 3 deletions(-) create mode 100644 lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb diff --git a/lib/sidekiq-rate-limiter/basic_strategy.rb b/lib/sidekiq-rate-limiter/basic_strategy.rb index 9f68ab1..5e62153 100644 --- a/lib/sidekiq-rate-limiter/basic_strategy.rb +++ b/lib/sidekiq-rate-limiter/basic_strategy.rb @@ -1,6 +1,6 @@ module Sidekiq::RateLimiter class BasicStrategy - def call(work, klass, options) + def call(work, klass, args, options) Sidekiq.redis do |conn| lim = Limit.new(conn, options) if lim.exceeded?(klass) diff --git a/lib/sidekiq-rate-limiter/fetch.rb b/lib/sidekiq-rate-limiter/fetch.rb index f1ce2f2..43872ff 100644 --- a/lib/sidekiq-rate-limiter/fetch.rb +++ b/lib/sidekiq-rate-limiter/fetch.rb @@ -4,6 +4,7 @@ require 'redis_rate_limiter' require 'sidekiq-rate-limiter/basic_strategy' require 'sidekiq-rate-limiter/sleep_strategy' +require 'sidekiq-rate-limiter/schedule_in_future_strategy' module Sidekiq::RateLimiter DEFAULT_LIMIT_NAME = @@ -33,7 +34,7 @@ def limit(work) :name => (name.respond_to?(:call) ? name.call(*args) : name).to_s, } - fetch_strategy.call(work, klass, options) + fetch_strategy.call(work, klass, args, options) end def fetch_strategy diff --git a/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb new file mode 100644 index 0000000..7563c67 --- /dev/null +++ b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb @@ -0,0 +1,18 @@ +module Sidekiq::RateLimiter + class ScheduleInFutureStrategy + def call(work, klass, args, options) + Sidekiq.redis do |conn| + lim = Limit.new(conn, options) + if lim.exceeded?(klass) + # Schedule the job to be executed in the future, when we think the rate limit might be back to normal. + Sidekiq::Client.enqueue_to_in(work.queue_name, lim.retry_in?(klass), Object.const_get(klass), *args) + nil + else + lim.add(klass) + work + end + end + end + end +end + diff --git a/lib/sidekiq-rate-limiter/sleep_strategy.rb b/lib/sidekiq-rate-limiter/sleep_strategy.rb index 7dc0b16..5dfabe3 100644 --- a/lib/sidekiq-rate-limiter/sleep_strategy.rb +++ b/lib/sidekiq-rate-limiter/sleep_strategy.rb @@ -1,6 +1,6 @@ module Sidekiq::RateLimiter class SleepStrategy - def call(work, klass, options) + def call(work, klass, args, options) Sidekiq.redis do |conn| lim = Limit.new(conn, options) if lim.exceeded?(klass) diff --git a/spec/sidekiq-rate-limiter/fetch_spec.rb b/spec/sidekiq-rate-limiter/fetch_spec.rb index 52688e3..2206dcd 100644 --- a/spec/sidekiq-rate-limiter/fetch_spec.rb +++ b/spec/sidekiq-rate-limiter/fetch_spec.rb @@ -123,6 +123,35 @@ def perform(arg1, arg2); end end end + context 'with the schedule in the future strategy' do + before :each do + Sidekiq::RateLimiter.configure do |config| + config.fetch_strategy = Sidekiq::RateLimiter::ScheduleInFutureStrategy + end + end + + it 'should place rate-limited work at the back of the queue', queuing: true do + worker.perform_async(*args) + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) + + expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(100) + + fetch = described_class.new(options) + expect(fetch.retrieve_work).to be_nil + + # expect the job to move to being scheduled in the future + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq(0) + + ss = Sidekiq::ScheduledSet.new + expect(ss.size).to eq(1) + end + + after :each do + Sidekiq::RateLimiter.reset + end + end + it 'should accept procs for limit, name, and period config keys', queuing: true do proc_worker.perform_async(1,2) From ea6912b6410f6bdbffc06111f2ec4b462686c964 Mon Sep 17 00:00:00 2001 From: Nathan Woodhull Date: Tue, 2 May 2017 21:16:39 -0400 Subject: [PATCH 3/4] spec that each of the strategies can still retrieve work --- spec/sidekiq-rate-limiter/fetch_spec.rb | 37 ++++++++++++++++--------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/spec/sidekiq-rate-limiter/fetch_spec.rb b/spec/sidekiq-rate-limiter/fetch_spec.rb index 2206dcd..5ece981 100644 --- a/spec/sidekiq-rate-limiter/fetch_spec.rb +++ b/spec/sidekiq-rate-limiter/fetch_spec.rb @@ -51,22 +51,25 @@ def perform(arg1, arg2); end expect(fetch.queues_cmd).to eql(["queue:#{queue}", "queue:#{another_queue}", timeout]) end - it 'should retrieve work', queuing: true do - worker.perform_async(*args) - fetch = described_class.new(options) - work = fetch.retrieve_work - parsed = JSON.parse(work.respond_to?(:message) ? work.message : work.job) + shared_examples 'retrieve_work' do |parameter| - expect(work).not_to be_nil - expect(work.queue_name).to eql(queue) - expect(work.acknowledge).to be_nil + it 'should retrieve work', queuing: true do + worker.perform_async(*args) + fetch = described_class.new(options) + work = fetch.retrieve_work + parsed = JSON.parse(work.respond_to?(:message) ? work.message : work.job) - expect(parsed).to include(worker.get_sidekiq_options) - expect(parsed).to include("class" => worker.to_s, "args" => args) - expect(parsed).to include("jid", "enqueued_at") + expect(work).not_to be_nil + expect(work.queue_name).to eql(queue) + expect(work.acknowledge).to be_nil - q = Sidekiq::Queue.new(queue) - expect(q.size).to eq 0 + expect(parsed).to include(worker.get_sidekiq_options) + expect(parsed).to include("class" => worker.to_s, "args" => args) + expect(parsed).to include("jid", "enqueued_at") + + q = Sidekiq::Queue.new(queue) + expect(q.size).to eq 0 + end end it 'should place rate-limited work at the back of the queue', queuing: true do @@ -81,6 +84,10 @@ def perform(arg1, arg2); end expect(q.size).to eq 1 end + context 'with the basic strategy' do + include_examples 'retrieve_work' + end + context 'with the sleep strategy' do before :each do Sidekiq::RateLimiter.configure do |config| @@ -88,6 +95,8 @@ def perform(arg1, arg2); end end end + include_examples 'retrieve_work' + it 'should place rate-limited work at the back of the queue immediately when retry_in? is less than 1.0', queuing: true do worker.perform_async(*args) expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) @@ -130,6 +139,8 @@ def perform(arg1, arg2); end end end + include_examples 'retrieve_work' + it 'should place rate-limited work at the back of the queue', queuing: true do worker.perform_async(*args) expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true) From 210e629a15c9dc0f84c8dbe3a7d7468cb9cb5469 Mon Sep 17 00:00:00 2001 From: Nathan Woodhull Date: Wed, 3 May 2017 16:14:02 -0400 Subject: [PATCH 4/4] add jitter to the time when retries are scheduled. this is helpful in our particular use case --- .../schedule_in_future_strategy.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb index 7563c67..f0e65da 100644 --- a/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb +++ b/lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb @@ -4,8 +4,16 @@ def call(work, klass, args, options) Sidekiq.redis do |conn| lim = Limit.new(conn, options) if lim.exceeded?(klass) + + # add a random amount of jitter that is proportional to the length of time the retry is in the future. + # this helps us spread out the jobs more evenly, as clumps of jobs on the queue can interfere with normal + # throughput of non-rate limited jobs. This jitter is additive. It's also useful in cases where we would like + # to dump thousands of jobs onto the queue and eventually have them delivered. + retry_in = lim.retry_in?(klass) + retry_in = retry_in + rand(retry_in/5) if retry_in > 10 + # Schedule the job to be executed in the future, when we think the rate limit might be back to normal. - Sidekiq::Client.enqueue_to_in(work.queue_name, lim.retry_in?(klass), Object.const_get(klass), *args) + Sidekiq::Client.enqueue_to_in(work.queue_name, retry_in, Object.const_get(klass), *args) nil else lim.add(klass)