Skip to content

Commit

Permalink
Check 'closed_' in ExchangeClient::next()
Browse files Browse the repository at this point in the history
Summary:
Using Zombie Tasks we detected that Drviers can end up referenced by the lambdas waiting on the promises to be fulfilled.
Promises given by the Exchange.
Now, when Exchange is being closed, than everything under it (ExchangeClients and ExchangeQueues) are beling closed too, fulfilling any outstanding promises.
The issue is that ExchangeClient allows to new promises being created in the next() call after we are closed().
This creates a situation where these promises are never fulfilled, because there is a proteciton to not call the fulfilling any outstanding promises more than once.

The toot cause here is that next() does not respect 'closed_' flag and simply proceeds with asking the underlying ExchnageQueue for data, which in turn creates the promise.

The fix is to check the 'closed_' flag and return straight away.
The fix fixed the Zombie Tasks in the E2E test I was using to reproduce the issue.
GH issue for this: prestodb/presto#22550

Differential Revision: D56712493
  • Loading branch information
Sergey Pershin authored and facebook-github-bot committed Apr 29, 2024
1 parent 84acb4c commit d7e1f31
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
5 changes: 5 additions & 0 deletions velox/exec/ExchangeClient.cpp
Expand Up @@ -117,6 +117,11 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
std::vector<std::unique_ptr<SerializedPage>> pages;
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (closed_) {
*atEnd = true;
return pages;
}

*atEnd = false;
pages = queue_->dequeueLocked(maxBytes, atEnd, future);
if (*atEnd) {
Expand Down
47 changes: 47 additions & 0 deletions velox/exec/tests/ExchangeClientTest.cpp
Expand Up @@ -423,5 +423,52 @@ TEST_F(ExchangeClientTest, sourceTimeout) {
test::testingShutdownLocalExchangeSource();
}

TEST_F(ExchangeClientTest, callNextAfterClose) {
constexpr int32_t kNumSources = 3;
common::testutil::TestValue::enable();
auto client =
std::make_shared<ExchangeClient>("test", 17, 1 << 20, pool(), executor());

bool atEnd;
ContinueFuture future;
auto pages = client->next(1, &atEnd, &future);
ASSERT_EQ(0, pages.size());
ASSERT_FALSE(atEnd);

for (auto i = 0; i < kNumSources; ++i) {
client->addRemoteTaskId(fmt::format("local://{}", i));
}
client->noMoreRemoteTasks();

// Fetch a page. No page is found. All sources are fetching.
pages = client->next(1, &atEnd, &future);
EXPECT_TRUE(pages.empty());

const auto& queue = client->queue();
for (auto i = 0; i < 10; ++i) {
enqueue(*queue, makePage(1'000 + i));
}

// Fetch multiple pages. Each page is slightly larger than 1K bytes, hence,
// only 4 pages fit.
pages = client->next(5'000, &atEnd, &future);
EXPECT_EQ(4, pages.size());
EXPECT_FALSE(atEnd);

// Close the client and try calling next again.
client->close();

// Here we should have no pages returned, be at end (we are closed) and the
// future should be invalid (not based on a valid promise).
ContinueFuture futureFinal{ContinueFuture::makeEmpty()};
pages = client->next(10'000, &atEnd, &futureFinal);
EXPECT_EQ(0, pages.size());
EXPECT_TRUE(atEnd);
EXPECT_FALSE(futureFinal.valid());

client->close();
test::testingShutdownLocalExchangeSource();
}

} // namespace
} // namespace facebook::velox::exec

0 comments on commit d7e1f31

Please sign in to comment.