Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Pluggable fetch strategies #24

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/sidekiq-rate-limiter.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,22 @@
require 'sidekiq-rate-limiter/version'
require 'sidekiq-rate-limiter/fetch'
require 'sidekiq-rate-limiter/configuration'

module Sidekiq::RateLimiter
class << self
attr_writer :configuration
end

def self.configuration
@configuration ||= Configuration.new
end

def self.reset
@configuration = Configuration.new
end

def self.configure
yield(configuration)
end

end
16 changes: 16 additions & 0 deletions lib/sidekiq-rate-limiter/basic_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module Sidekiq::RateLimiter
class BasicStrategy
def call(work, klass, args, options)
Sidekiq.redis do |conn|
lim = Limit.new(conn, options)
if lim.exceeded?(klass)
conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job)
nil
else
lim.add(klass)
work
end
end
end
end
end
9 changes: 9 additions & 0 deletions lib/sidekiq-rate-limiter/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Sidekiq::RateLimiter
class Configuration
attr_accessor :fetch_strategy

def initialize
@fetch_strategy = Sidekiq::RateLimiter::BasicStrategy
end
end
end
17 changes: 7 additions & 10 deletions lib/sidekiq-rate-limiter/fetch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
require 'celluloid' if Sidekiq::VERSION < "4"
require 'sidekiq/fetch'
require 'redis_rate_limiter'
require 'sidekiq-rate-limiter/basic_strategy'
require 'sidekiq-rate-limiter/sleep_strategy'
require 'sidekiq-rate-limiter/schedule_in_future_strategy'

module Sidekiq::RateLimiter
DEFAULT_LIMIT_NAME =
Expand Down Expand Up @@ -31,18 +34,12 @@ def limit(work)
:name => (name.respond_to?(:call) ? name.call(*args) : name).to_s,
}

Sidekiq.redis do |conn|
lim = Limit.new(conn, options)
if lim.exceeded?(klass)
conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job)
nil
else
lim.add(klass)
work
end
end
fetch_strategy.call(work, klass, args, options)
end

def fetch_strategy
Sidekiq::RateLimiter.configuration.fetch_strategy.new
end
end

class Rate
Expand Down
26 changes: 26 additions & 0 deletions lib/sidekiq-rate-limiter/schedule_in_future_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module Sidekiq::RateLimiter
class ScheduleInFutureStrategy
def call(work, klass, args, options)
Sidekiq.redis do |conn|
lim = Limit.new(conn, options)
if lim.exceeded?(klass)

# add a random amount of jitter that is proportional to the length of time the retry is in the future.
# this helps us spread out the jobs more evenly, as clumps of jobs on the queue can interfere with normal
# throughput of non-rate limited jobs. This jitter is additive. It's also useful in cases where we would like
# to dump thousands of jobs onto the queue and eventually have them delivered.
retry_in = lim.retry_in?(klass)
retry_in = retry_in + rand(retry_in/5) if retry_in > 10

# Schedule the job to be executed in the future, when we think the rate limit might be back to normal.
Sidekiq::Client.enqueue_to_in(work.queue_name, retry_in, Object.const_get(klass), *args)
nil
else
lim.add(klass)
work
end
end
end
end
end

25 changes: 25 additions & 0 deletions lib/sidekiq-rate-limiter/sleep_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module Sidekiq::RateLimiter
class SleepStrategy
def call(work, klass, args, options)
Sidekiq.redis do |conn|
lim = Limit.new(conn, options)
if lim.exceeded?(klass)
# if this job is being rate-limited for longer than 1 second, sleep for that amount
# of time before putting the job back on the queue.
#
# This is undesirable as it ties up the sidekiq thread for one second, but it does help in situations where
# high redis/sidekiq CPU usage is causing problems.

if lim.retry_in?(klass) > 1.0
sleep(1)
end
conn.lpush("queue:#{work.queue_name}", work.respond_to?(:message) ? work.message : work.job)
nil
else
lim.add(klass)
work
end
end
end
end
end
25 changes: 25 additions & 0 deletions spec/sidekiq-rate-limiter/configure_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require 'spec_helper'

RSpec.describe "#configure" do
context 'default' do
it 'should have the basic strategy by default' do
expect(Sidekiq::RateLimiter.configuration.fetch_strategy).to eq(Sidekiq::RateLimiter::BasicStrategy)
end
end

context 'with a strategy set' do
before :each do
Sidekiq::RateLimiter.configure do |config|
config.fetch_strategy = Sidekiq::RateLimiter::SleepStrategy
end
end

it 'should have the sleep strategy if set' do
expect(Sidekiq::RateLimiter.configuration.fetch_strategy).to eq(Sidekiq::RateLimiter::SleepStrategy)
end

after :each do
Sidekiq::RateLimiter.reset
end
end
end
108 changes: 95 additions & 13 deletions spec/sidekiq-rate-limiter/fetch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,25 @@ def perform(arg1, arg2); end
expect(fetch.queues_cmd).to eql(["queue:#{queue}", "queue:#{another_queue}", timeout])
end

it 'should retrieve work', queuing: true do
worker.perform_async(*args)
fetch = described_class.new(options)
work = fetch.retrieve_work
parsed = JSON.parse(work.respond_to?(:message) ? work.message : work.job)
shared_examples 'retrieve_work' do |parameter|

expect(work).not_to be_nil
expect(work.queue_name).to eql(queue)
expect(work.acknowledge).to be_nil
it 'should retrieve work', queuing: true do
worker.perform_async(*args)
fetch = described_class.new(options)
work = fetch.retrieve_work
parsed = JSON.parse(work.respond_to?(:message) ? work.message : work.job)

expect(parsed).to include(worker.get_sidekiq_options)
expect(parsed).to include("class" => worker.to_s, "args" => args)
expect(parsed).to include("jid", "enqueued_at")
expect(work).not_to be_nil
expect(work.queue_name).to eql(queue)
expect(work.acknowledge).to be_nil

q = Sidekiq::Queue.new(queue)
expect(q.size).to eq 0
expect(parsed).to include(worker.get_sidekiq_options)
expect(parsed).to include("class" => worker.to_s, "args" => args)
expect(parsed).to include("jid", "enqueued_at")

q = Sidekiq::Queue.new(queue)
expect(q.size).to eq 0
end
end

it 'should place rate-limited work at the back of the queue', queuing: true do
Expand All @@ -81,6 +84,85 @@ def perform(arg1, arg2); end
expect(q.size).to eq 1
end

context 'with the basic strategy' do
include_examples 'retrieve_work'
end

context 'with the sleep strategy' do
before :each do
Sidekiq::RateLimiter.configure do |config|
config.fetch_strategy = Sidekiq::RateLimiter::SleepStrategy
end
end

include_examples 'retrieve_work'

it 'should place rate-limited work at the back of the queue immediately when retry_in? is less than 1.0', queuing: true do
worker.perform_async(*args)
expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true)
expect_any_instance_of(redis_class).to receive(:lpush).exactly(:once).and_call_original

expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(0.1)
expect_any_instance_of(Sidekiq::RateLimiter::SleepStrategy).to_not receive(:sleep)

fetch = described_class.new(options)
expect(fetch.retrieve_work).to be_nil

q = Sidekiq::Queue.new(queue)
expect(q.size).to eq 1
end

it 'should call sleep when appropriate' do
worker.perform_async(*args)
expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true)
expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(2.0)
expect_any_instance_of(Sidekiq::RateLimiter::SleepStrategy).to receive(:sleep).with(1).and_return(true)

expect_any_instance_of(redis_class).to receive(:lpush).exactly(:once).and_call_original

fetch = described_class.new(options)
expect(fetch.retrieve_work).to be_nil

q = Sidekiq::Queue.new(queue)
expect(q.size).to eq 1
end

after :each do
Sidekiq::RateLimiter.reset
end
end

context 'with the schedule in the future strategy' do
before :each do
Sidekiq::RateLimiter.configure do |config|
config.fetch_strategy = Sidekiq::RateLimiter::ScheduleInFutureStrategy
end
end

include_examples 'retrieve_work'

it 'should place rate-limited work at the back of the queue', queuing: true do
worker.perform_async(*args)
expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:exceeded?).and_return(true)

expect_any_instance_of(Sidekiq::RateLimiter::Limit).to receive(:retry_in?).and_return(100)

fetch = described_class.new(options)
expect(fetch.retrieve_work).to be_nil

# expect the job to move to being scheduled in the future
q = Sidekiq::Queue.new(queue)
expect(q.size).to eq(0)

ss = Sidekiq::ScheduledSet.new
expect(ss.size).to eq(1)
end

after :each do
Sidekiq::RateLimiter.reset
end
end

it 'should accept procs for limit, name, and period config keys', queuing: true do
proc_worker.perform_async(1,2)

Expand Down