Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: dynamic flow control for batcher part 2 #1310

Merged
merged 17 commits into from Mar 22, 2021

Conversation

mutianf
Copy link
Contributor

@mutianf mutianf commented Feb 22, 2021

Implementation of go/veneer-dynamic-flow-control part 2.
Adding a DynamicFlowControlSetting and new aPIs in FlowController so the flow control limits in the FlowController can be adjusted.

@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Feb 22, 2021
@codecov
Copy link

codecov bot commented Feb 22, 2021

Codecov Report

Merging #1310 (39fb9fe) into master (7f7aa25) will increase coverage by 0.06%.
The diff coverage is 86.74%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1310      +/-   ##
============================================
+ Coverage     80.27%   80.34%   +0.06%     
- Complexity     1304     1331      +27     
============================================
  Files           209      210       +1     
  Lines          5547     5682     +135     
  Branches        479      519      +40     
============================================
+ Hits           4453     4565     +112     
- Misses          905      912       +7     
- Partials        189      205      +16     
Impacted Files Coverage Δ Complexity Δ
.../google/api/gax/batching/NonBlockingSemaphore.java 76.31% <74.07%> (-4.94%) 11.00 <6.00> (+6.00) ⬇️
...e/api/gax/batching/DynamicFlowControlSettings.java 83.72% <83.72%> (ø) 2.00 <2.00> (?)
...com/google/api/gax/batching/BlockingSemaphore.java 81.81% <85.18%> (+0.86%) 12.00 <6.00> (+6.00)
...va/com/google/api/gax/batching/FlowController.java 88.49% <93.84%> (+2.13%) 34.00 <22.00> (+14.00)
.../java/com/google/api/gax/batching/BatcherImpl.java 94.94% <100.00%> (-1.13%) 22.00 <0.00> (-1.00)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7f7aa25...39fb9fe. Read the comment docs.

@mutianf mutianf marked this pull request as ready for review February 22, 2021 22:45
@mutianf mutianf requested review from a team as code owners February 22, 2021 22:45
@mutianf
Copy link
Contributor Author

mutianf commented Feb 22, 2021

@igorbernstein2 The second part is ready for review. This part adds the DynamicFlowControlSettings and makes flow control limits in the FlowController adjustable. Thanks! :)

Copy link
Contributor

@elharo elharo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some common style issues here that I haven't noted on every occurrence, but please fix globally.

if (!outstandingByteCount.acquire(permitsToDraw)) {
long permitsToDraw, permitsOwed;
boolean acquired;
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid locking on this, its better to use a private lock Object

Copy link
Contributor

@vam-google vam-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concurrency part looks a bit scary to me (it seems like it can unintentionally an indirectly break pretty much everything else in gax).

wait();
} catch (InterruptedException e) {
interrupted = true;
while (!interrupted) {
Copy link
Contributor

@vam-google vam-google Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Partially this comment applies even to the original implementation, pushed in part 1, which I LGTM'ed but I missed it during reiew of part 1)

It looks like the only way the thread, which entered this loop, can exit is is by receiving interrupt signal. It looks very dangerous not only for correctness of this new feature but for correctness of the rest of gax in an indirect way.

In general, pretty much all the code in gax-java is executed in the context of the thread executor pool, which has 4 threads by default (or number of current CPUs if > 4). My biggest concern is that once code enters this block, it means one of those 4 threads is completely owned by this block and there are only two possible ways out:

  • this code will never end and will keep attempting to wait forever;
  • this thread gets interrupted, then the execution reaches line 91 and Thread.currentTrhead.interrupt() is called.

Then it is not clear what will happen with this thread after interrupt() is called, as it only sets the interrupt flag to true, and does not guarantee that the thread does not get "reinterrupted" (interrupt flag cleared).

Also, if the thread actually gets terminated, it means it will have to be re-created in the executor pool, basically defeating the purpose of using thread pool (compared to simply spawning new thread for every shingle task). In other words this code block messes with executors pool in a very brutal way:

  • 4 parallel entries in this block and the executor pool is dead (assuming 4 is the number of configured threads)
  • if the execution flow assumes frequent entering/leaving this code block, it would result into frequent thread_interrupt->thread_kill->thread_create, which is a heavy operation and is not how threads arfe supposed to be used in the context of a thread pool (threads must be shared by tasks, and tasks should never dictate the lifecycle of a thread it is executed in).

Basically this code block does not "respect" the other potential tasks in the pool, assuming that the thread it executes in now belongs to it and will never be released back to the pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@mutianf mutianf Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back to the original logic, which will wait for the permits and interrupt. And it has always been implemented this way https://github.com/googleapis/gax-java/blame/master/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java#L59-L70

4 parallel entries in this block and the executor pool is dead

This class is used in FlowController, so wherever it's called, it'll want to block the thread from doing anything for throttling?

if the execution flow assumes frequent entering/leaving this code block, it would result into frequent thread_interrupt->thread_kill->thread_create, which is a heavy operation and is not how threads are supposed to be used in the context of a thread pool

Sorry again for the confusing changes from using 2 locks (which I've reverted back). But in both implementations, the expected way of leaving this code block is when there are more permits to give to the caller. And interrupt should rarely happen.

Copy link
Contributor

@vam-google vam-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(forgot to select "request changes" in previous review, please check the comments from above for context)

Copy link
Contributor

@vam-google vam-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, but the concurrency/synchronization part is still concerning, unfortunately.

@Nullable private final Long maxRequestBytesLimit;
@Nullable private final Long minElementCountLimit;
@Nullable private final Long minRequestBytesLimit;
@Nullable private Long currentElementCountLimit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are used in multithreaded environment, it would be a good practice to declare them volatile.

Also, given that these values are used only in simple context, it might be possible to resort to using non-blocking approach (and convert this to AtomicLong)

Copy link
Contributor Author

@mutianf mutianf Mar 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentElementCountLimit is modified and accessed in synchronized blocks (will add synchronize to the getters), so is it still necessary to make it volatile? Or is it just to make the code clear?
Moved the limit to the semaphore class

Copy link
Contributor

@igorbernstein2 igorbernstein2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me...just a couple of nits on naming

Copy link
Contributor

@igorbernstein2 igorbernstein2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Copy link
Contributor

@vam-google vam-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments about multithreading part, but LGTM =)

@Override
public void increasePermitLimit(long permits) {
checkNotNegative(permits);
availablePermits.addAndGet(permits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks suspicious. Even though both availablePermits and limit are atomic, their joint update here is not atomic (if they were under the same lock, it would be). Is it ok here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point and I struggled with it a bit but I think it's ok:

  • If multiple threads are calling increasePermitsLimit, this is ok because updates to the variables are atomic and we just need to make sure the numbers add up.
  • If it happens concurrently with reduceLimit (which shouldn't really happen in reality), we need to make sure limit don't go < 0, and the checks in reducePermit should handle that.
  • If increasePermitLimit happens concurrently with acquire, limit is not accessed / updated in acquire, and availablePermits is atomic, so it should be fine.
  • If increasePermitLimit happens concurrently with acquirePartial, limit is accessed to check if there's enough permits. If limit is updated after if (old < Math.min(limit.get(), permits)), it means either availablePermits was also updated midway so if (availablePermits.compareAndSet(old, old - permits)) won't succeed and it'll loop one more time, or availablePermits is also updated and it passed / failed the check.

long oldLimit = limit.get();
Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow");
if (limit.compareAndSet(oldLimit, oldLimit - reduction)) {
availablePermits.addAndGet(-reduction);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call aquirePartial() here instead to update availablePermits() in a non-blocking algorithm fashion, instead of updating it directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I think it's pretty safe the just update it, calling aquirePartial() seems to work too but I feel it's a bit harder to read?

checkNotNegative(reduction);
Preconditions.checkState(limit - reduction > 0, "permit limit underflow");
availablePermits -= reduction;
limit -= reduction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see increasePermitLimit calls notifyAll(), but reduce does not. Looks suspicious, as it is a reverse operation to increase (so I would assume it should follow the same principles).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be ok here, because the wait() in other threads are waiting for more permits to become available, while this method removes permits. So even if we call notifyAll() here, the threads are likely to go back to sleep again :). It's similar to how we don't call notifyAll() in reserve().

@igorbernstein2
Copy link
Contributor

@vam-google I'm going to merge this, if you still have concerns after @mutianf responses, we can open follow up PRs.

@igorbernstein2 igorbernstein2 merged commit 20f6ecf into googleapis:master Mar 22, 2021
@mutianf mutianf deleted the dynamic_flow_control_p2 branch March 22, 2021 14:56
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants