Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Zombie tasks might not get cleaned up #22550

Closed
czentgr opened this issue Apr 17, 2024 · 12 comments
Closed

[native] Zombie tasks might not get cleaned up #22550

czentgr opened this issue Apr 17, 2024 · 12 comments
Labels

Comments

@czentgr
Copy link
Contributor

czentgr commented Apr 17, 2024

Describe the problem you faced

I'm running a query where one stage is cancelled and as a result one of three tasks ends up in the aborted state. However, the tasks is not cleaned up after having been successfully run (and all results returned). Instead, the tasks hangs around. I can run the same query multiple times and the same tasks is zombified.

Prestissimo keeps running and when it checks for cleanup of old tasks it issues this (this is captured after running the same query 3 times):

E20240417 13:29:56.045046 14170202 TaskManager.cpp:264] There are 3 zombie Task that satisfy cleanup conditions but could not be cleaned up, because the Task are referenced by more than 1 owners. RUNNING[0] FINISHED[0] CANCELED[0] ABORTED[3] FAILED[0]  Sample task IDs (shows only 20 IDs):
E20240417 13:29:56.045222 14170202 TaskManager.cpp:274] Zombie Task[1/3]: 20240416_183118_00001_tqrxh.1.0.0.0
E20240417 13:29:56.045297 14170202 TaskManager.cpp:274] Zombie Task[2/3]: 20240416_184624_00003_tqrxh.1.0.0.0
E20240417 13:29:56.045354 14170202 TaskManager.cpp:274] Zombie Task[3/3]: 20240416_183728_00002_tqrxh.1.0.0.0

It appears the task has more than one reference still

      // Do not remove 'zombie' tasks (with outstanding references) from the
      // map. We use it to track the number of tasks. Note, since we copied the
      // task map, presto tasks should have an extra reference (2 from two
      // maps).
      if (prestoTaskRefCount > 2 || taskRefCount > 1) {
        auto& task = prestoTask->task;
        if (prestoTaskRefCount > 2) {
          ++zombiePrestoTaskCounts.numTotal;
          if (task != nullptr) {
            zombiePrestoTaskCounts.updateCounts(task);
          }
        }
        if (taskRefCount > 1) {
          ++zombieVeloxTaskCounts.numTotal;   <<< this is incremented
          zombieVeloxTaskCounts.updateCounts(task);
        }
      }

For example, the following query causes the issue (this is a modified subquery from TPCDS Q1) - the dataset is the 1k TPCDS dataset.

select sr_customer_sk as ctr_customer_sk ,sr_store_sk as ctr_store_sk from store_returns ,date_dim where sr_returned_date_sk = d_date_sk and d_year =1999 limit 100;

The aborted task is stage 1 and 2 (out of 4 stages) and get cancelled. It contains a partial aggregation for the limit clause and I suppose it cancels the other piece of the stage which is a tablescan that feeds into the partial aggregation. It appears the entire stage 1 and 2 are cancelled as soon as enough results are present.

 Fragment 1 [SINGLE]                                                                                                                                                                               >
     CPU: 247.38us, Scheduled: 288.96us, Input: 100 rows (1.88kB); per task: avg.: 100.00 std.dev.: 0.00, Output: 100 rows (1.63kB), 1 tasks                                                       >
     Output layout: [sr_customer_sk, sr_store_sk]                                                                                                                                                  >
     Output partitioning: SINGLE []                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                 >
     - Limit[PlanNodeId 357][100] => [sr_customer_sk:bigint, sr_store_sk:bigint]                                                                                                                   >
             Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                        >
             CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 100 rows (1.63kB)                                                                                                             >
             Input avg.: 100.00 rows, Input std.dev.: 0.00%                                                                                                                                        >
         - LocalExchange[PlanNodeId 588][SINGLE] () => [sr_customer_sk:bigint, sr_store_sk:bigint]                                                                                                 >
                 Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                    >
                 CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 100 rows (1.88kB)                                                                                                         >
                 Input avg.: 12.50 rows, Input std.dev.: 264.58%                                                                                                                                   >
             - RemoteSource[2] => [sr_customer_sk:bigint, sr_store_sk:bigint]                                                                                                                      >
                     CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 100 rows (1.88kB)                                                                                                     >
                     Input avg.: 12.50 rows, Input std.dev.: 264.58%                                                                                                                               >
                                                                                                                                                                                                   >
 Fragment 2 [HASH]                                                                                                                                                                                 >
     CPU: 4.45ms, Scheduled: 5.00ms, Input: 8,565 rows (294.84kB); per task: avg.: 8,565.00 std.dev.: 0.00, Output: 100 rows (1.63kB), 1 tasks                                                     >
     Output layout: [sr_customer_sk, sr_store_sk]                                                                                                                                                  >
     Output partitioning: SINGLE []                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                 >
     - LimitPartial[PlanNodeId 523][100] => [sr_customer_sk:bigint, sr_store_sk:bigint]                                                                                                            >
             Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                        >
             CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 100 rows (1.63kB)                                                                                                             >
             Input avg.: 128.00 rows, Input std.dev.: 264.58%                                                                                                                                      >
         - InnerJoin[PlanNodeId 475][("sr_returned_date_sk" = "d_date_sk")] => [sr_customer_sk:bigint, sr_store_sk:bigint]                                                                         >
                 Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                    >
                 CPU: 2.00ms (0.11%), Scheduled: 2.00ms (0.11%), Output: 1,024 rows (24.30kB)                                                                                                      >
                 Distribution: PARTITIONED                                                                                                                                                         >
             - RemoteSource[3] => [sr_returned_date_sk:bigint, sr_customer_sk:bigint, sr_store_sk:bigint]                                                                                          >
                     CPU: 1.00ms (0.05%), Scheduled: 1.00ms (0.05%), Output: 8,200 rows (291.94kB)                                                                                                 >
                     Input avg.: 1,025.00 rows, Input std.dev.: 264.58%                                                                                                                            >
             - LocalExchange[PlanNodeId 587][HASH] (d_date_sk) => [d_date_sk:bigint]                                                                                                               >
                     Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                >
                     CPU: 1.00ms (0.05%), Scheduled: 1.00ms (0.05%), Output: 365 rows (2.90kB)                                                                                                     >
                     Input avg.: 45.63 rows, Input std.dev.: 264.58%                                                                                                                               >
                 - RemoteSource[4] => [d_date_sk:bigint]                                                                                                                                           >
                         CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 365 rows (2.91kB)                                                                                                 >
                         Input avg.: 45.63 rows, Input std.dev.: 264.58%                                                                                                                           >
 Fragment 3 [SOURCE]                                                                                                                                                                               >
     CPU: 1.80s, Scheduled: 1.81s, Input: 2,642,651 rows (75.31MB); per task: avg.: 2,642,651.00 std.dev.: 0.00, Output: 2,625,351 rows (58.95MB), 1 tasks                                         >
     Output layout: [sr_returned_date_sk, sr_customer_sk, sr_store_sk]                                                                                                                             >
     Output partitioning: HASH [sr_returned_date_sk]                                                                                                                                               >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                 >
     - TableScan[PlanNodeId 0][TableHandle {connectorId='hive-hadoop2', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10_parquet_varchar, tableName=store_returns, analyzePartitionValues=Opt>
             Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                        >
             CPU: 1.80s (97.66%), Scheduled: 1.82s (97.63%), Output: 2,625,351 rows (58.95MB)                                                                                                      >
             Input avg.: 377,521.57 rows, Input std.dev.: 244.95%                                                                                                                                  >
             LAYOUT: tpcds_sf10_parquet_varchar.store_returns{}                                                                                                                                    >
             sr_customer_sk := sr_customer_sk:bigint:3:REGULAR (1:92)                                                                                                                              >
             sr_store_sk := sr_store_sk:bigint:7:REGULAR (1:92)                                                                                                                                    >
             sr_returned_date_sk := sr_returned_date_sk:bigint:0:REGULAR (1:92)                                                                                                                    >
             Input: 2,642,651 rows (75.31MB), Filtered: 0.65%                                                                                                                                      >
                                                                                                                                                                                                   >
 Fragment 4 [SOURCE]                                                                                                                                                                               >
     CPU: 39.10ms, Scheduled: 40.40ms, Input: 365 rows (50.81kB); per task: avg.: 365.00 std.dev.: 0.00, Output: 365 rows (2.89kB), 1 tasks                                                        >
     Output layout: [d_date_sk]                                                                                                                                                                    >
     Output partitioning: HASH [d_date_sk]                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                 >
     - ScanFilterProject[PlanNodeId 1,520,328][table = TableHandle {connectorId='hive-hadoop2', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10_parquet_varchar, tableName=date_dim, analyze>
             Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}/{so>
             CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 365 rows (2.89kB)                                                                                                             >
             Input avg.: 45.63 rows, Input std.dev.: 264.58%                                                                                                                                       >
             LAYOUT: tpcds_sf10_parquet_varchar.date_dim{domains={d_year=[ [["1999"]] ]}}                                                                                                          >
             d_date_sk := d_date_sk:bigint:0:REGULAR (1:107)                                                                                                                                       >
             d_year := d_year:int:6:REGULAR (1:107)                                                                                                                                                >
             Input: 365 rows (50.81kB), Filtered: 0.00%                                                                                                                                            >
             Raw input: 73,049 rows (572.39kB), Filtered: 99.50%                                                                                                                                   >

Environment Description

  • Presto version used: 0.287-SNAPSHOT
  • Storage (HDFS/S3/GCS..):
  • Data source and connectors or catalogs used: TPCDS with Hive connector and parquet files)
  • Deployment (Cloud or On-prem): on macOS
  • Pastebin link to the complete debug logs:

zombie-tasks.tar.gz

Steps To Reproduce

Steps to reproduce the behavior:
1.
2.
3.
4.

Expected behavior

Additional context

Stacktrace

@czentgr czentgr added the usage label Apr 17, 2024
@czentgr czentgr changed the title Zombie tasks might not get cleaned up [native] Zombie tasks might not get cleaned up Apr 18, 2024
@aditi-pandit
Copy link
Contributor

aditi-pandit commented Apr 18, 2024

@czentgr : Just checking you have facebookincubator/velox#9207 in the build. #22129 (comment) is related to the sequence of events you saw as well.

@spershin
Copy link
Contributor

We created an internal task for this and will take a look.

@czentgr
Copy link
Contributor Author

czentgr commented Apr 18, 2024

@aditi-pandit I was using my memory debug build and that has an older commit base which doesn't include this fix (the build I collected with was initially new but then I switched back to my memory debug build). I will retry with the fix.

@czentgr
Copy link
Contributor Author

czentgr commented Apr 18, 2024

I can repro the issue with velox commit https://github.com/facebookincubator/velox/commits/da6a3d305bd92a2ab40d2f013ed191f261a92732

commit da6a3d305bd92a2ab40d2f013ed191f261a92732 (HEAD)
Author: aditi-pandit <Aditi.Pandit@ibm.com>
Date:   Tue Apr 16 16:03:53 2024 -0700

    Fix nulls ordering for Range frames (#9271

Presto commit https://github.com/prestodb/presto/commits/45387f9956eb899a93b96eaccf73b239102ee3fe

commit 45387f9956eb899a93b96eaccf73b239102ee3fe (HEAD -> master, origin/master, origin/HEAD)
Author: Konjac Huang <konjac@meta.com>
Date:   Thu Apr 11 00:14:48 2024 -0700

    Fix pre process metadata call

I'm using my rebased memory debug build after running the query twice:

I20240418 16:40:32.606573 16172521 TaskManager.cpp:705] cleanOldTasks: Cleaned 3 old task(s) in 0 ms
E20240418 16:40:32.608332 16172521 TaskManager.cpp:264] There are 2 zombie Task that satisfy cleanup conditions
 but could not be cleaned up, because the Task are referenced by more than 1 owners. RUNNING[0] FINISHED[0] CAN
CELED[0] ABORTED[2] FAILED[0]  Sample task IDs (shows only 20 IDs):
E20240418 16:40:32.608397 16172521 TaskManager.cpp:274] Zombie Task[1/2]: 20240418_203702_00002_p9hw7.1.0.0.0
E20240418 16:40:32.608460 16172521 TaskManager.cpp:274] Zombie Task[2/2]: 20240418_203854_00003_p9hw7.1.0.0.0
I20240418 16:40:33.742187 16172522 PeriodicTaskManager.cpp:275] MemoryManager stats:

But just to be sure I'll retry with a clean build from the above commits.

@czentgr
Copy link
Contributor Author

czentgr commented Apr 18, 2024

Confirming repro on current code from presto master branch:

I20240418 18:01:35.145920 16284686 PeriodicTaskManager.cpp:674] Spill memory usage: current[0B] peak[0B]
E20240418 18:01:35.146342 16284681 TaskManager.cpp:264] There are 1 zombie Task that satisfy cleanup conditions but could not be cleaned up, because the Task are referenced by more than 1 owners. RUNNING[0] FINISHED[0] CANCELED[0] ABORTED[1] FAILED[0]  Sample task IDs (shows only 20 IDs):
E20240418 18:01:35.146414 16284681 TaskManager.cpp:274] Zombie Task[1/1]: 20240418_215248_00000_jvpu5.1.0.0.0
I20240418 18:01:59.052568 16284676 PeriodicServiceInventoryManager.cpp:115] Announcement succeeded: HTTP 202. State: active.
I20240418 18:02:28.213596 16284676 PeriodicServiceInventoryManager.cpp:115] Announcement succeeded: HTTP 202. State: active.
I20240418 18:02:35.146222 16284686 PeriodicTaskManager.cpp:674] Spill memory usage: current[0B] peak[0B]
E20240418 18:02:35.152554 16284681 TaskManager.cpp:264] There are 1 zombie Task that satisfy cleanup conditions but could not be cleaned up, because the Task are referenced by more than 1 owners. RUNNING[0] FINISHED[0] CANCELED[0] ABORTED[1] FAILED[0]  Sample task IDs (shows only 20 IDs):
E20240418 18:02:35.152657 16284681 TaskManager.cpp:274] Zombie Task[1/1]: 20240418_215248_00000_jvpu5.1.0.0.0

zombie_query.json

@spershin
Copy link
Contributor

@czentgr
Do you know why the task is aborted? Is this because of reaching the limit 100?

I ran the following query multiple times in one of our clusters and not seeing any zombies:

SELECT 
  surface, 
  entity_type 
FROM 
  mrs_content_pool_metrics, 
  dmars_sensitive_entities 
WHERE 
  metric_name = 'time_spent_in_pool' 
  AND mrs_content_pool_metrics.ds = '2024-04-04' 
  and dmars_sensitive_entities.ds = '2024-04-04' 
LIMIT 
  100

Maybe specific data affects zombification somehow?

Have you tried to run it w/o limit and see if there are any zombie tasks left?

@czentgr
Copy link
Contributor Author

czentgr commented Apr 19, 2024

@spershin Yes, it is related to the limit clause. The limit is pushed down to the partial limit in Stage 1 - we can see that in the pan fragment:

     - LimitPartial[PlanNodeId 523][100] => [sr_customer_sk:bigint, sr_store_sk:bigint]                                                                                                            >
             Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                        >
             CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 100 rows (1.63kB)                                                                                                             >
             Input avg.: 128.00 rows, Input std.dev.: 264.58%   

The tablescan feeding into this is from stage 2 which has 2+ million rows.

     - TableScan[PlanNodeId 0][TableHandle {connectorId='hive-hadoop2', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10_parquet_varchar, tableName=store_returns, analyzePartitionValues=Opt>
             Estimates: {source: CostBasedSourceInfo, rows: 0 (0B), cpu: 0.00, memory: 0.00, network: 0.00}                                                                                        >
             CPU: 1.80s (97.66%), Scheduled: 1.82s (97.63%), Output: 2,625,351 rows (58.95MB)                                                                                                      >
             Input avg.: 377,521.57 rows, Input std.dev.: 244.95%                                                                                                                                  >
             LAYOUT: tpcds_sf10_parquet_varchar.store_returns{} 

So I think you need to make sure the tablescan has enough data to read and is still busy when the limit is being processed.

And yes, removing the limit clause will not result in zombies.

@spershin
Copy link
Contributor

We do observe zombie tasks occasionally for some queries in Meta.
However, it is not reproducible.
Seems like a race.

@czentgr
Is it possible to run that particular setup you run on a local Presto checkout?
Do you have instructions to do so?
Thanks!

@czentgr
Copy link
Contributor Author

czentgr commented Apr 24, 2024

@spershin I've pushed a branch with an E2E unit test that reproduces the problem. Once you've run this you have the data and setup to run your own coordinator and worker. I hope this helps.

Before running the test export WORKER_COUNT=1 as env variable. I've run it through Intellij and set the configuration accordingly.

My branch is https://github.com/czentgr/presto/tree/cz_zombie_repro .

The test will not create the data if the hive storage has the DATE_DIM and STORE_SALES already. So make you remove the directories (aka delete the tables). You probably know this already. By default they are in your source path to presto:

/path/to/presto/presto-native-execution/target/velox_data/PARQUET/hive_data/tpcds

I had the tiny tables previously from other tests and had to clean them up. The generation of the SF10 data doesn't take too long - I think it was 15-20 mins on my laptop for the first time. The data should look like this once generated:

 velox_data/PARQUET/hive_data/tpcds$ ls -ltr store_returns 
total 259832
-rw-r--r--@ 1 czentgr  wheel  16610765 Apr 24 00:37 20240424_043701_00001_jyfiy_bce74d4d-87ea-4237-97a6-04da16583b2d
-rw-r--r--@ 1 czentgr  wheel  16752268 Apr 24 00:37 20240424_043701_00001_jyfiy_66ea4ce0-9642-4b48-a1eb-8d8c0598a102
-rw-r--r--@ 1 czentgr  wheel  16126842 Apr 24 00:37 20240424_043701_00001_jyfiy_fae3dc92-5fd7-44e9-a5f6-db2ab4dfc55a
-rw-r--r--@ 1 czentgr  wheel  16761534 Apr 24 00:37 20240424_043701_00001_jyfiy_756a5798-740f-449c-914c-4297b240bb7d
-rw-r--r--@ 1 czentgr  wheel  15958474 Apr 24 00:37 20240424_043701_00001_jyfiy_3b7963f8-d464-4faf-ab0f-b11144614052
-rw-r--r--@ 1 czentgr  wheel  16113506 Apr 24 00:37 20240424_043701_00001_jyfiy_f5eb26e8-9547-4212-ae84-86844b3841ea
-rw-r--r--@ 1 czentgr  wheel  16232518 Apr 24 00:37 20240424_043701_00001_jyfiy_2ddf9dd7-c93b-48dd-86f6-b59a16e2aa01
-rw-r--r--@ 1 czentgr  wheel  18459891 Apr 24 00:37 20240424_043701_00001_jyfiy_d436bd67-39ac-437e-bfb5-34843de23473
 velox_data/PARQUET/hive_data/tpcds$ ls -ltr date_dim
total 1480
-rw-r--r--@ 1 czentgr  wheel   89850 Apr 24 00:37 20240424_043645_00000_jyfiy_40d5fb0c-03cf-4755-9dac-9e01974f04c8
-rw-r--r--@ 1 czentgr  wheel   61140 Apr 24 00:37 20240424_043645_00000_jyfiy_f47c8bf9-0d24-45bb-98f9-b79bcf037dfb
-rw-r--r--@ 1 czentgr  wheel   89944 Apr 24 00:37 20240424_043645_00000_jyfiy_94b6b274-3be9-4d16-9b69-d19da52b908d
-rw-r--r--@ 1 czentgr  wheel   90074 Apr 24 00:37 20240424_043645_00000_jyfiy_293109bc-15ff-4e93-a8e1-38f2bd95a68a
-rw-r--r--@ 1 czentgr  wheel   90088 Apr 24 00:37 20240424_043645_00000_jyfiy_3cd7b4d0-28b6-4ab1-b16f-222047d26598
-rw-r--r--@ 1 czentgr  wheel  151704 Apr 24 00:37 20240424_043645_00000_jyfiy_a7f7a0aa-161d-4ec6-a2c1-e10c5c590ad2
-rw-r--r--@ 1 czentgr  wheel  116265 Apr 24 00:37 20240424_043645_00000_jyfiy_1ce7fa41-a442-4bd4-9486-a2526ae76988
-rw-r--r--@ 1 czentgr  wheel   60980 Apr 24 00:37 20240424_043645_00000_jyfiy_5383a211-fd8e-472e-8daa-599b370b6ee3

The test run will fail with

java.lang.AssertionError: expected:<0> but was:<1>
Expected :0
Actual   :1

because the zombie task is present

E20240424 13:28:11.787199 23734551 TaskManager.cpp:264] There are 1 zombie Task that satisfy cleanup conditions but could not be cleaned up, because the Task are referenced by more than 1 owners. RUNNING[0] FINISHED[0] CANCELED[0] ABORTED[1] FAILED[0]  Sample task IDs (shows only 20 IDs):

Note, the log dir (/tmp/PrestoNativeQueryRunnerUtils) is not cleaned up so you can look at old logs (and get old hits for the counter if run multiple times without cleanup).

Please let me know if you have questions.

@spershin
Copy link
Contributor

We are dealing with race condition.
I have found out that it possible to call ExchangeQueue::dequeueLocked() AFTER ExchangeQueue::close() has been called from ExchangeClient::close().

Facts:

  1. ExchangeClient::close() is being called only once, protected by a ExchangeClient::closed_ flag. And we don't even calls it more than once.
  2. ExchangeQueue::dequeueLocked() can add a promise, which will hold reference to Driver, which in turn holds reference to the Task.
  3. ExchangeQueue::close() would clean up and fulfill all existing promises. We must not allow adding any more promises after that. But apparently there is no such protection.

I will need to look at the code more and figure out if we need a closed_ flag in the ExchangeQueue itself or protection simply broken at the level of ExchangeClient.
I suspect the latter.
The logic of Exchange, ExchangeClient and ExchangeQueue is, unfortunately, overcomplicated.

@spershin
Copy link
Contributor

Confirmed that chain of calls (note that member functions are renamed a bit here to navigate better among dozens of close() functions).

Exchange::isBlocked
  ExchangeClient::nextPage
      std::lock_guard<std::mutex> l(queue_->mutex());
      *atEnd = false;
      ExchangeQueue::dequeueLocked

Is in not protected in any way agains the exchange client being closed:

Task::terminate
Exchange::close
ExchangeClient::~ExchangeClient
  ExchangeClient::closeClient
    {
      std::lock_guard<std::mutex> l(queue_->mutex());
      closed_ = true;
    }
    ExchangeQueue::closeQueue()

Simple fix would be adding

if (closed_) {
  *atEnd = true;
  return pages;
}

in ExchangeClient::nextPage right after the lock.

Need to come up with a unit test for that.

spershin pushed a commit to spershin/velox-1 that referenced this issue Apr 29, 2024
Summary:
Using Zombie Tasks we detected that Drviers can end up referenced by the lambdas waiting on the promises to be fulfilled.
Promises given by the Exchange.
Now, when Exchange is being closed, than everything under it (ExchangeClients and ExchangeQueues) are beling closed too, fulfilling any outstanding promises.
The issue is that ExchangeClient allows to new promises being created in the next() call after we are closed().
This creates a situation where these promises are never fulfilled, because there is a proteciton to not call the fulfilling any outstanding promises more than once.

The toot cause here is that next() does not respect 'closed_' flag and simply proceeds with asking the underlying ExchnageQueue for data, which in turn creates the promise.

The fix is to check the 'closed_' flag and return straight away.
The fix fixed the Zombie Tasks in the E2E test I was using to reproduce the issue.
GH issue for this: prestodb/presto#22550

Differential Revision: D56712493
facebook-github-bot pushed a commit to facebookincubator/velox that referenced this issue May 1, 2024
Summary:
Using Zombie Tasks we detected that Drviers can end up referenced by the lambdas waiting on the promises to be fulfilled.
Promises given by the Exchange.
Now, when Exchange is being closed, than everything under it (ExchangeClients and ExchangeQueues) are beling closed too, fulfilling any outstanding promises.
The issue is that ExchangeClient allows to new promises being created in the next() call after we are closed().
This creates a situation where these promises are never fulfilled, because there is a proteciton to not call the fulfilling any outstanding promises more than once.

The toot cause here is that next() does not respect 'closed_' flag and simply proceeds with asking the underlying ExchnageQueue for data, which in turn creates the promise.

The fix is to check the 'closed_' flag and return straight away.
The fix fixed the Zombie Tasks in the E2E test I was using to reproduce the issue.
GH issue for this: prestodb/presto#22550

Reviewed By: Yuhta

Differential Revision: D56712493

fbshipit-source-id: 8808f854872b68c5c29bdd67daceb656f92da8f0
@spershin
Copy link
Contributor

spershin commented May 9, 2024

This issue should be fixed now.

@spershin spershin closed this as completed May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

No branches or pull requests

3 participants