feat: dynamic flow control for batcher part 2 #1310
feat: dynamic flow control for batcher part 2 #1310
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1310 +/- ##
============================================
+ Coverage 80.27% 80.34% +0.06%
- Complexity 1304 1331 +27
============================================
Files 209 210 +1
Lines 5547 5682 +135
Branches 479 519 +40
============================================
+ Hits 4453 4565 +112
- Misses 905 912 +7
- Partials 189 205 +16
Continue to review full report at Codecov.
|
@igorbernstein2 The second part is ready for review. This part adds the DynamicFlowControlSettings and makes flow control limits in the FlowController adjustable. Thanks! :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some common style issues here that I haven't noted on every occurrence, but please fix globally.
gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/batching/DynamicFlowControlSettingsTest.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/batching/DynamicFlowControlSettingsTest.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/batching/DynamicFlowControlSettingsTest.java
Outdated
Show resolved
Hide resolved
gax/src/test/java/com/google/api/gax/batching/Semaphore64Test.java
Outdated
Show resolved
Hide resolved
8566969
to
53d8d7e
Compare
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/FlowController.java
Outdated
Show resolved
Hide resolved
if (!outstandingByteCount.acquire(permitsToDraw)) { | ||
long permitsToDraw, permitsOwed; | ||
boolean acquired; | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid locking on this, its better to use a private lock Object
gax/src/main/java/com/google/api/gax/batching/FlowController.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/FlowController.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/FlowController.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java
Outdated
Show resolved
Hide resolved
868e6aa
to
9064e26
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concurrency part looks a bit scary to me (it seems like it can unintentionally an indirectly break pretty much everything else in gax).
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
wait(); | ||
} catch (InterruptedException e) { | ||
interrupted = true; | ||
while (!interrupted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Partially this comment applies even to the original implementation, pushed in part 1, which I LGTM'ed but I missed it during reiew of part 1)
It looks like the only way the thread, which entered this loop, can exit is is by receiving interrupt signal. It looks very dangerous not only for correctness of this new feature but for correctness of the rest of gax in an indirect way.
In general, pretty much all the code in gax-java is executed in the context of the thread executor pool, which has 4 threads by default (or number of current CPUs if > 4). My biggest concern is that once code enters this block, it means one of those 4 threads is completely owned by this block and there are only two possible ways out:
- this code will never end and will keep attempting to wait forever;
- this thread gets interrupted, then the execution reaches line 91 and
Thread.currentTrhead.interrupt()
is called.
Then it is not clear what will happen with this thread after interrupt()
is called, as it only sets the interrupt flag to true
, and does not guarantee that the thread does not get "reinterrupted" (interrupt flag cleared).
Also, if the thread actually gets terminated, it means it will have to be re-created in the executor pool, basically defeating the purpose of using thread pool (compared to simply spawning new thread for every shingle task). In other words this code block messes with executors pool in a very brutal way:
- 4 parallel entries in this block and the executor pool is dead (assuming 4 is the number of configured threads)
- if the execution flow assumes frequent entering/leaving this code block, it would result into frequent thread_interrupt->thread_kill->thread_create, which is a heavy operation and is not how threads arfe supposed to be used in the context of a thread pool (threads must be shared by tasks, and tasks should never dictate the lifecycle of a thread it is executed in).
Basically this code block does not "respect" the other potential tasks in the pool, assuming that the thread it executes in now belongs to it and will never be released back to the pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@igorbernstein2 PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted back to the original logic, which will wait for the permits and interrupt. And it has always been implemented this way https://github.com/googleapis/gax-java/blame/master/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java#L59-L70
4 parallel entries in this block and the executor pool is dead
This class is used in FlowController, so wherever it's called, it'll want to block the thread from doing anything for throttling?
if the execution flow assumes frequent entering/leaving this code block, it would result into frequent thread_interrupt->thread_kill->thread_create, which is a heavy operation and is not how threads are supposed to be used in the context of a thread pool
Sorry again for the confusing changes from using 2 locks (which I've reverted back). But in both implementations, the expected way of leaving this code block is when there are more permits to give to the caller. And interrupt should rarely happen.
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(forgot to select "request changes" in previous review, please check the comments from above for context)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, but the concurrency/synchronization part is still concerning, unfortunately.
gax/src/main/java/com/google/api/gax/batching/FlowController.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/FlowController.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java
Outdated
Show resolved
Hide resolved
@Nullable private final Long maxRequestBytesLimit; | ||
@Nullable private final Long minElementCountLimit; | ||
@Nullable private final Long minRequestBytesLimit; | ||
@Nullable private Long currentElementCountLimit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are used in multithreaded environment, it would be a good practice to declare them volatile.
Also, given that these values are used only in simple context, it might be possible to resort to using non-blocking approach (and convert this to AtomicLong)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentElementCountLimit
is modified and accessed in synchronized blocks (will add synchronize to the getters), so is it still necessary to make it volatile? Or is it just to make the code clear?
Moved the limit to the semaphore class
677e470
to
b7b525d
Compare
d83fdc1
to
96ef367
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me...just a couple of nits on naming
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java
Outdated
Show resolved
Hide resolved
gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments about multithreading part, but LGTM =)
@Override | ||
public void increasePermitLimit(long permits) { | ||
checkNotNegative(permits); | ||
availablePermits.addAndGet(permits); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks suspicious. Even though both availablePermits
and limit
are atomic, their joint update here is not atomic (if they were under the same lock, it would be). Is it ok here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good point and I struggled with it a bit but I think it's ok:
- If multiple threads are calling
increasePermitsLimit
, this is ok because updates to the variables are atomic and we just need to make sure the numbers add up. - If it happens concurrently with
reduceLimit
(which shouldn't really happen in reality), we need to make sure limit don't go < 0, and the checks inreducePermit
should handle that. - If
increasePermitLimit
happens concurrently withacquire
, limit is not accessed / updated inacquire
, andavailablePermits
is atomic, so it should be fine. - If
increasePermitLimit
happens concurrently withacquirePartial
,limit
is accessed to check if there's enough permits. If limit is updated afterif (old < Math.min(limit.get(), permits))
, it means eitheravailablePermits
was also updated midway soif (availablePermits.compareAndSet(old, old - permits))
won't succeed and it'll loop one more time, oravailablePermits
is also updated and it passed / failed the check.
long oldLimit = limit.get(); | ||
Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow"); | ||
if (limit.compareAndSet(oldLimit, oldLimit - reduction)) { | ||
availablePermits.addAndGet(-reduction); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call aquirePartial()
here instead to update availablePermits()
in a non-blocking algorithm fashion, instead of updating it directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I think it's pretty safe the just update it, calling aquirePartial()
seems to work too but I feel it's a bit harder to read?
checkNotNegative(reduction); | ||
Preconditions.checkState(limit - reduction > 0, "permit limit underflow"); | ||
availablePermits -= reduction; | ||
limit -= reduction; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see increasePermitLimit
calls notifyAll(), but reduce does not. Looks suspicious, as it is a reverse operation to increase (so I would assume it should follow the same principles).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be ok here, because the wait() in other threads are waiting for more permits to become available, while this method removes permits. So even if we call notifyAll() here, the threads are likely to go back to sleep again :). It's similar to how we don't call notifyAll() in reserve().
@vam-google I'm going to merge this, if you still have concerns after @mutianf responses, we can open follow up PRs. |
Implementation of go/veneer-dynamic-flow-control part 2.
Adding a DynamicFlowControlSetting and new aPIs in FlowController so the flow control limits in the FlowController can be adjusted.