Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement threadsafe version of exec_within_threshold #35

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jamespeerless
Copy link

@jamespeerless jamespeerless commented Jan 8, 2019

When using exec_within_threshold in an environment with multiple threads/processes you can run into an issue where multiple threads read the count for a given subject at the same time. If the read count is right below the current threshold then all threads will be allowed to execute before any of them can perform an add that pushes the count over the threshold.

For example, if you had three threads and a ratelimit with a threshold of 20 in 600 seconds and the current count was at 19. All three threads would read the current count of 19 and the exceeded? check used in exec_within_threshold would evaluate to false so all three threads would execute the block.

The fix requires that we can read and increment the count in an atomic way. There is no way to do this natively with redis out-of-the-box. I had to implement a short Lua Script that can perform the count, check, and increment. Using this new script we are able to implement a threadsafe version of exec_within_threshold that also automatically increments the subject.

I wrote some tests to verify the issue and that this fixes it in a project where we use the ratelimit gem.

The test that exposes the issue where exec_within_threshold allows more than the threshold number of executions in an interval is here:

rlKey = SecureRandom.uuid
rlSubject = SecureRandom.uuid
ratelimit = Ratelimit.new(rlKey,{:redis => $redis})

  statusMap = { stop: 0, sleep: 0, run: 0, finished: 0}

  threads = [
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }},
    Thread.new { ratelimit.exec_within_threshold(rlSubject, {interval: 10, threshold: 1 }) { sleep 0.5; ratelimit.add(rlSubject) }}
  ]

  sleep 1

  threads.each do |t| 
    status_symbol = t.status ? t.status.to_sym : :finished
    statusMap[status_symbol] += 1
  end
  
  expect(statusMap[:finished]).to be > 1

The test with the fixed implementation looks like this:

rlKey = SecureRandom.uuid
rlSubject = SecureRandom.uuid
ratelimit = Ratelimit.new(rlKey, {:redis => $redis})

  statusMap = { stop: 0, sleep: 0, run: 0, finished: 0}
  threads = [
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })},
    Thread.new { ratelimit.exec_and_increment_within_threshold(rlSubject, {interval: 10, threshold: 1 })}
  ]

 sleep 1

  threads.each do |t| 
    status_symbol = t.status ? t.status.to_sym : :finished
    statusMap[status_symbol] += 1
  end

  expect(statusMap[:finished]).to eq(1)

@coveralls
Copy link

Coverage Status

Coverage decreased (-7.6%) to 90.845% when pulling 509fabc on jamespeerless:threadsafe-exec-within-threshold-no-lock into 0e60d34 on ejfinneran:master.

@coveralls
Copy link

coveralls commented Jan 8, 2019

Coverage Status

Coverage decreased (-8.1%) to 90.278% when pulling dc46e5f on jamespeerless:threadsafe-exec-within-threshold-no-lock into 0e60d34 on ejfinneran:master.

evalScript = 'local a=KEYS[1]local b=tonumber(ARGV[1])local c=tonumber(ARGV[b+2])local d=tonumber(ARGV[b+3])local e=tonumber(ARGV[b+4])local f=tonumber(ARGV[b+5])local g=tonumber(ARGV[b+6])local h=0;local i=false;for j,k in ipairs(redis.call("HMGET",a,unpack(ARGV,2,b+1)))do h=h+(tonumber(k)or 0)end;if h<f then redis.call("HINCRBY",a,c,g)redis.call("HDEL",a,(c+1)%d)redis.call("HDEL",a,(c+2)%d)redis.call("EXPIRE",a,e)i=true end;return i'
evalKeys = [get_key_for_subject(subject)]
evalArgs = [keys.length, *keys, bucket, @bucket_count, @bucket_expiry, options[:threshold], options[:increment]]
redis.eval(evalScript, evalKeys, evalArgs)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not need to send full script to eval every time, waste of bandwidth. Can take sha1 of the script and use that to check if script exists and then execute using only sha1 of script.

@jamespeerless jamespeerless force-pushed the threadsafe-exec-within-threshold-no-lock branch 5 times, most recently from a970c46 to 630d218 Compare January 9, 2019 21:09
…reshold that counts and increments atomically
@feliperaul
Copy link

@jamespeerless James, have you ever gotten around implementing the idea of not evaluating that lua script every time, and instead checking if it was already evaluated using it's SHA?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants