Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] Subqueries return different results depending on where clause #12949

Open
RalfJL opened this issue Apr 17, 2024 · 17 comments
Open
Labels
multi-stage Related to the multi-stage query engine troubleshooting

Comments

@RalfJL
Copy link

RalfJL commented Apr 17, 2024

Currently I am investigating if Pinot can help us to calculate inbytes and outbytes from Radius accounting data.
The setup is:
sysbench (produces artificial accounting data) -> freeradius -> kafka -> pinot
The randomness of the artificial data is low so we end up with several "sessions" having the same Acct-Unique-Session-ID.
e.g. query:

select a.acctuniquesessionid, a.acctstatustype, a.eventtime, a.eventtimestamp from radius_json a
where a.acctuniquesessionid = 'da0f86138ec36b2364889f048bf2ac82'
order by a.eventtime

returns:

acctuniquesessionid	acctstatustype	eventtime	eventtimestamp
da0f86138ec36b2364889f048bf2ac82	Start	1712050245468	Apr 2 2024 09:30:45 UTC
da0f86138ec36b2364889f048bf2ac82	Stop	1712050293427	Apr 2 2024 09:31:33 UTC
da0f86138ec36b2364889f048bf2ac82	Start	1712759564017	Apr 10 2024 14:32:43 UTC
da0f86138ec36b2364889f048bf2ac82	Interim-Update	1712759589307	Apr 10 2024 14:33:09 UTC
da0f86138ec36b2364889f048bf2ac82	Interim-Update	1712759612449	Apr 10 2024 14:33:32 UTC
da0f86138ec36b2364889f048bf2ac82	Interim-Update	1712759616370	Apr 10 2024 14:33:36 UTC
da0f86138ec36b2364889f048bf2ac82	Stop	1712759668126	Apr 10 2024 14:34:27 UTC

Obviously there are two different sessions with the same acctuniquesessionid in the data.
In real life we can not guarantee that Session ID's are always unique, so this might also happen in real life data.

The solution is to filter out all 'Start' - 'Stop' combinations where there is a 'Start' or a 'Stop' in between.
And here comes the problem:
query:

select a.acctuniquesessionid, (a.eventtime - b.eventtime)/1000 as Zeit, a.acctstatustype, b.acctstatustype, a.eventtime, b.eventtime 
 ,( select count(c.eventtime) from radius_json c where b.acctuniquesessionid = c.acctuniquesessionid 
				 and a.acctuniquesessionid = c.acctuniquesessionid
				and a.eventtime > 0 and b.eventtime > 0
				and (c.acctstatustype = 'Stop' or c.acctstatustype = 'Start' ) 
 	 			and c.eventtime between b.eventtime+1 and a.eventtime-1
				 ) 

from radius_json a
join radius_json b
on b.acctuniquesessionid = a.acctuniquesessionid 
   and a.acctstatustype = 'Stop' and b.acctstatustype = 'Start' -- and a._3gppimsi <> 'null'
   and a.eventtime > b.eventtime
where  
	not exists ( select 1 from radius_json c where b.acctuniquesessionid = c.acctuniquesessionid and a.acctuniquesessionid = c.acctuniquesessionid
				and a.eventtime > 0 and b.eventtime > 0
				and (c.acctstatustype = 'Stop' or c.acctstatustype = 'Start' ) 
 	 			and c.eventtime between b.eventtime+1 and a.eventtime-1
				 ) 
-- and  			a.acctuniquesessionid = 'fac07ae7853810946d87e868e463af2c'
-- and 				a.acctuniquesessionid = 'ac6baa744130522c1eb1eec161114d1b'
and		   a.acctuniquesessionid = 'da0f86138ec36b2364889f048bf2ac82'
order by Zeit desc

Returns:

acctuniquesessionid	Zeit	acctstatustype	acctstatustype	eventtime	eventtime	EXPR$4
da0f86138ec36b2364889f048bf2ac82	104	Stop	Start	1712759668126	1712759564017	0
da0f86138ec36b2364889f048bf2ac82	47	Stop	Start	1712050293427	1712050245468	0

The two sessions as expected.

Removing the filter a.acctuniquesessionid = 'da0f86138ec36b2364889f048bf2ac82' shows a very different result
query:

select a.acctuniquesessionid, (a.eventtime - b.eventtime)/1000 as Zeit, a.acctstatustype, b.acctstatustype, a.eventtime, b.eventtime 
 ,( select count(c.eventtime) from radius_json c where b.acctuniquesessionid = c.acctuniquesessionid 
				 and a.acctuniquesessionid = c.acctuniquesessionid
				and a.eventtime > 0 and b.eventtime > 0
				and (c.acctstatustype = 'Stop' or c.acctstatustype = 'Start' ) 
 	 			and c.eventtime between b.eventtime+1 and a.eventtime-1
				 ) 

from radius_json a
join radius_json b
on b.acctuniquesessionid = a.acctuniquesessionid 
   and a.acctstatustype = 'Stop' and b.acctstatustype = 'Start' -- and a._3gppimsi <> 'null'
   and a.eventtime > b.eventtime
where  
	not exists ( select 1 from radius_json c where b.acctuniquesessionid = c.acctuniquesessionid and a.acctuniquesessionid = c.acctuniquesessionid
				and a.eventtime > 0 and b.eventtime > 0
				and (c.acctstatustype = 'Stop' or c.acctstatustype = 'Start' ) 
 	 			and c.eventtime between b.eventtime+1 and a.eventtime-1
				 ) 
-- and  			a.acctuniquesessionid = 'fac07ae7853810946d87e868e463af2c'
-- and 				a.acctuniquesessionid = 'ac6baa744130522c1eb1eec161114d1b'
-- and		   a.acctuniquesessionid = 'da0f86138ec36b2364889f048bf2ac82'
order by Zeit desc

result (shortened by thousands of other sessions):

da0f86138ec36b2364889f048bf2ac82	709422	Stop	Start	1712759668126	1712050245468	0

So here obviously both subqueries return wrong results. There is a 'Start' and a 'Stop' between timestamp 1712759668126 and 1712050245468 but it is not detected anymore when the filter for the acctuniquessionid is removed

Am I missing here something or am I hitting a bug in multistage engine?

I am using Pinot 1.1.0 as docker compose image (single instance of all server processes)
The query plan for both queries is the same beside the Logical Filter
first query:
LogicalFilter(condition=[AND(=($14, _UTF-8'da0f86138ec36b2364889f048bf2ac82'), =($12, _UTF-8'Stop'))])
second query:
LogicalFilter(condition=[=($12, _UTF-8'Stop')])

PLEASE NOTE: Some filters are redundant like "b.acctuniquesessionid = c.acctuniquesessionid and a.acctuniquesessionid = c.acctuniquesessionid" and are a test if the result changes. But it doesn't

@Jackie-Jiang
Copy link
Contributor

Can you share the whole logical plan for these 2 queries?
Can you also check if any resource limit is hit (e.g. rows in the right table of the JOIN operator)

@RalfJL
Copy link
Author

RalfJL commented Apr 18, 2024

Here the two completet query plans:
First with filter a.acctuniquesessionid = 'da0f86138ec36b2364889f048bf2ac82'

Execution Plan
LogicalSort(sort0=[$1], dir0=[DESC])
  PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalProject(acctuniquesessionid=[$14], Zeit=[/(-($17, $36), 1000)], acctstatustype=[$12], acctstatustype0=[$31], eventtime=[$17], eventtime0=[$36])
      LogicalFilter(condition=[IS NULL($48)])
        LogicalJoin(condition=[AND(=($14, $42), =($33, $43), =($38, $44), =($39, $45), =($40, $46), =($41, $47))], joinType=[left])
          PinotLogicalExchange(distribution=[hash[14, 33, 38, 39, 40, 41]])
            LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $docId0=[$21], $hostName0=[$22], $segmentName0=[$23], USERNAME0=[$24], _3gppimsi0=[$25], _3gppimsimccmnc0=[$26], _3gppsgsnmccmnc0=[$27], acctinputoctets0=[$28], acctinputpackets0=[$29], acctoutputoctets0=[$30], acctoutputpackets0=[$31], acctsessiontime0=[$32], acctstatustype0=[$33], acctterminatecause0=[$34], acctuniquesessionid0=[$35], calledstationid0=[$36], callingstationid0=[$37], eventtime0=[$38], eventtimestamp0=[$39], $f40=[$19], $f41=[$40], $f42=[$41], $f43=[$20])
              LogicalJoin(condition=[AND(=($35, $14), >($17, $38))], joinType=[inner])
                PinotLogicalExchange(distribution=[hash[14]])
                  LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $f40=[>($17, 0)], $f43=[-($17, 1)])
                    **LogicalFilter(condition=[AND(=($14, _UTF-8'da0f86138ec36b2364889f048bf2ac82'), =($12, _UTF-8'Stop'))])**
                      LogicalTableScan(table=[[radius_json]])
                PinotLogicalExchange(distribution=[hash[14]])
                  LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $f41=[>($17, 0)], $f42=[+($17, 1)])
                    LogicalFilter(condition=[=($12, _UTF-8'Start')])
                      LogicalTableScan(table=[[radius_json]])
          PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
            LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}], agg#0=[MIN($6)])
              PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
                LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}], agg#0=[MIN($6)])
                  LogicalProject(acctuniquesessionid0=[$2], acctuniquesessionid00=[$3], $f40=[$4], $f41=[$5], $f42=[$6], $f43=[$7], $f0=[true])
                    LogicalJoin(condition=[AND(=($3, $0), =($2, $0), >=($1, $6), <=($1, $7))], joinType=[inner])
                      PinotLogicalExchange(distribution=[hash[0, 0]])
                        LogicalProject(acctuniquesessionid=[$14], eventtime=[$17])
                          LogicalFilter(condition=[OR(=($12, _UTF-8'Start'), =($12, _UTF-8'Stop'))])
                            LogicalTableScan(table=[[radius_json]])
                      PinotLogicalExchange(distribution=[hash[0, 1]])
                        LogicalProject(acctuniquesessionid=[$0], acctuniquesessionid0=[$3], $f40=[$1], $f41=[$4], $f42=[$5], $f43=[$2])
                          LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}])
                            PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
                              LogicalAggregate(group=[{0, 2, 3, 4, 6, 7}])
                                LogicalJoin(condition=[AND(=($4, $0), >($1, $5))], joinType=[inner])
                                  PinotLogicalExchange(distribution=[hash[0]])
                                    LogicalProject(acctuniquesessionid=[$14], eventtime=[$17], $f40=[>($17, 0)], $f43=[-($17, 1)])
                                      **LogicalFilter(condition=[AND(=($14, _UTF-8'da0f86138ec36b2364889f048bf2ac82'), =($12, _UTF-8'Stop'), >($17, 0))])**
                                        LogicalTableScan(table=[[radius_json]])
                                  PinotLogicalExchange(distribution=[hash[0]])
                                    LogicalProject(acctuniquesessionid=[$14], eventtime=[$17], $f41=[>($17, 0)], $f42=[+($17, 1)])
                                      LogicalFilter(condition=[AND(=($12, _UTF-8'Start'), >($17, 0))])
                                        LogicalTableScan(table=[[radius_json]])

Without the filter:

Execution Plan
LogicalSort(sort0=[$1], dir0=[DESC])
  PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalProject(acctuniquesessionid=[$14], Zeit=[/(-($17, $36), 1000)], acctstatustype=[$12], acctstatustype0=[$31], eventtime=[$17], eventtime0=[$36])
      LogicalFilter(condition=[IS NULL($48)])
        LogicalJoin(condition=[AND(=($14, $42), =($33, $43), =($38, $44), =($39, $45), =($40, $46), =($41, $47))], joinType=[left])
          PinotLogicalExchange(distribution=[hash[14, 33, 38, 39, 40, 41]])
            LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $docId0=[$21], $hostName0=[$22], $segmentName0=[$23], USERNAME0=[$24], _3gppimsi0=[$25], _3gppimsimccmnc0=[$26], _3gppsgsnmccmnc0=[$27], acctinputoctets0=[$28], acctinputpackets0=[$29], acctoutputoctets0=[$30], acctoutputpackets0=[$31], acctsessiontime0=[$32], acctstatustype0=[$33], acctterminatecause0=[$34], acctuniquesessionid0=[$35], calledstationid0=[$36], callingstationid0=[$37], eventtime0=[$38], eventtimestamp0=[$39], $f40=[$19], $f41=[$40], $f42=[$41], $f43=[$20])
              LogicalJoin(condition=[AND(=($35, $14), >($17, $38))], joinType=[inner])
                PinotLogicalExchange(distribution=[hash[14]])
                  LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $f40=[>($17, 0)], $f43=[-($17, 1)])
                    **LogicalFilter(condition=[=($12, _UTF-8'Stop')])**
                      LogicalTableScan(table=[[radius_json]])
                PinotLogicalExchange(distribution=[hash[14]])
                  LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $f41=[>($17, 0)], $f42=[+($17, 1)])
                    LogicalFilter(condition=[=($12, _UTF-8'Start')])
                      LogicalTableScan(table=[[radius_json]])
          PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
            LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}], agg#0=[MIN($6)])
              PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
                LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}], agg#0=[MIN($6)])
                  LogicalProject(acctuniquesessionid0=[$2], acctuniquesessionid00=[$3], $f40=[$4], $f41=[$5], $f42=[$6], $f43=[$7], $f0=[true])
                    LogicalJoin(condition=[AND(=($3, $0), =($2, $0), >=($1, $6), <=($1, $7))], joinType=[inner])
                      PinotLogicalExchange(distribution=[hash[0, 0]])
                        LogicalProject(acctuniquesessionid=[$14], eventtime=[$17])
                          LogicalFilter(condition=[OR(=($12, _UTF-8'Start'), =($12, _UTF-8'Stop'))])
                            LogicalTableScan(table=[[radius_json]])
                      PinotLogicalExchange(distribution=[hash[0, 1]])
                        LogicalProject(acctuniquesessionid=[$0], acctuniquesessionid0=[$3], $f40=[$1], $f41=[$4], $f42=[$5], $f43=[$2])
                          LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}])
                            PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
                              LogicalAggregate(group=[{0, 2, 3, 4, 6, 7}])
                                LogicalJoin(condition=[AND(=($4, $0), >($1, $5))], joinType=[inner])
                                  PinotLogicalExchange(distribution=[hash[0]])
                                    LogicalProject(acctuniquesessionid=[$14], eventtime=[$17], $f40=[>($17, 0)], $f43=[-($17, 1)])
                                      **LogicalFilter(condition=[AND(=($12, _UTF-8'Stop'), >($17, 0))])**
                                        LogicalTableScan(table=[[radius_json]])
                                  PinotLogicalExchange(distribution=[hash[0]])
                                    LogicalProject(acctuniquesessionid=[$14], eventtime=[$17], $f41=[>($17, 0)], $f42=[+($17, 1)])
                                      LogicalFilter(condition=[AND(=($12, _UTF-8'Start'), >($17, 0))])
                                        LogicalTableScan(table=[[radius_json]])

Enclosed in "**" the 2 lines that differs between the 2 plans. Please note: the "**" is not part of the plan, it should be bold which does not work in code.
How can I check if any resource limit is hit?
Logs do not show an error.
The server log shows:

2024/04/18 06:19:45.695 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2875] Processed requestId=5100306610870553857,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/215/148/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=192,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=264651,scanInFilter=0,scanPostFilter=529302,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:45.714 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2864] Processed requestId=5100306610870557697,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/215/148/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=212,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=264651,scanInFilter=0,scanPostFilter=529302,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:46.555 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2850] Processed requestId=5100306610870555905,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/215/148/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=1052,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=264651,scanInFilter=0,scanPostFilter=5028369,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:46.556 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2874] Processed requestId=5100306610870552065,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/215/148/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=1053,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=264651,scanInFilter=0,scanPostFilter=5028369,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:46.690 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2857] Processed requestId=5100306610870557441,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/158/158/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=1183,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=202133,scanInFilter=0,scanPostFilter=404266,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:46.729 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2837] Processed requestId=5100306610870553601,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/158/158/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=1226,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=202133,scanInFilter=0,scanPostFilter=404266,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:47.461 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2832] Processed requestId=5100306610870556673,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/257/257/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=1955,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=466784,scanInFilter=0,scanPostFilter=933568,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:47.545 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2855] Processed requestId=5100306610870552833,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/257/257/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=2039,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=466784,scanInFilter=0,scanPostFilter=933568,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:47.851 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2841] Processed requestId=5100306610870555649,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/158/158/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=2346,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=202133,scanInFilter=0,scanPostFilter=3840527,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0
2024/04/18 06:19:48.745 INFO [ServerQueryLogger] [query-runner-on-33095-1-thread-2849] Processed requestId=5100306610870554625,table=radius_json_REALTIME,segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/invalid/limit/value)=880/257/257/10/0/0/0/0/0,schedulerWaitMs=-1,reqDeserMs=-1,totalExecMs=3241,resSerMs=-1,totalTimeMs=-1,minConsumingFreshnessMs=1713358280165,broker=unknown,numDocsScanned=466784,scanInFilter=0,scanPostFilter=8868896,sched=MultistageEngine,threadCpuTimeNs(total/thread/sysActivity/resSer)=0/0/0/0

Thanks
Ralf

@Jackie-Jiang
Copy link
Contributor

@gortiz Can you help take a look at this query?

@Jackie-Jiang
Copy link
Contributor

This is quite a complicated query, but is it possible that with the extra filter, the following filter returns false thus less sessions returned?

not exists ( select 1 from radius_json c where b.acctuniquesessionid = c.acctuniquesessionid and a.acctuniquesessionid = c.acctuniquesessionid
				and a.eventtime > 0 and b.eventtime > 0
				and (c.acctstatustype = 'Stop' or c.acctstatustype = 'Start' ) 
 	 			and c.eventtime between b.eventtime+1 and a.eventtime-1
				 )

I don't see how table c is joined here

@RalfJL
Copy link
Author

RalfJL commented Apr 26, 2024

to reduce the complexity of the execution plan I removed the subselect in the select. This subselect is not necessary but was there only for debugging.
The result stays the same. With the filter to a specific acctuniquesessionid the result is correct, without the filter, the result is incorrect
this is the reduced query:

-- explain plan for
select a.acctuniquesessionid, (a.eventtime - b.eventtime)/1000 as Zeit, a.acctstatustype, b.acctstatustype, a.eventtime, b.eventtime 
-- ,( select count(c.eventtime) from radius_json c where b.acctuniquesessionid = c.acctuniquesessionid 
--				 and a.acctuniquesessionid = c.acctuniquesessionid
--				and a.eventtime > 0 and b.eventtime > 0
--				and (c.acctstatustype = 'Stop' or c.acctstatustype = 'Start' ) 
-- 	 			and c.eventtime between b.eventtime+1 and a.eventtime-1
--				 ) 

from radius_json a
join radius_json b
on b.acctuniquesessionid = a.acctuniquesessionid 
   and a.acctstatustype = 'Stop' and b.acctstatustype = 'Start' -- and a._3gppimsi <> 'null'
   and a.eventtime > b.eventtime
where  
	not exists ( select 1 from radius_json c where b.acctuniquesessionid = c.acctuniquesessionid and a.acctuniquesessionid = c.acctuniquesessionid
				and a.eventtime > 0 and b.eventtime > 0
				and (c.acctstatustype = 'Stop' or c.acctstatustype = 'Start' ) 
 	 			and c.eventtime between b.eventtime+1 and a.eventtime-1
				 ) 
-- and  			a.acctuniquesessionid = 'fac07ae7853810946d87e868e463af2c'
-- and 				a.acctuniquesessionid = 'ac6baa744130522c1eb1eec161114d1b'
-- and		   a.acctuniquesessionid = 'da0f86138ec36b2364889f048bf2ac82'

order by Zeit desc

The query plan is:

Execution Plan
LogicalSort(sort0=[$1], dir0=[DESC])
  PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalProject(acctuniquesessionid=[$14], Zeit=[/(-($17, $36), 1000)], acctstatustype=[$12], acctstatustype0=[$31], eventtime=[$17], eventtime0=[$36])
      LogicalFilter(condition=[IS NULL($48)])
        LogicalJoin(condition=[AND(=($14, $42), =($33, $43), =($38, $44), =($39, $45), =($40, $46), =($41, $47))], joinType=[left])
          PinotLogicalExchange(distribution=[hash[14, 33, 38, 39, 40, 41]])
            LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $docId0=[$21], $hostName0=[$22], $segmentName0=[$23], USERNAME0=[$24], _3gppimsi0=[$25], _3gppimsimccmnc0=[$26], _3gppsgsnmccmnc0=[$27], acctinputoctets0=[$28], acctinputpackets0=[$29], acctoutputoctets0=[$30], acctoutputpackets0=[$31], acctsessiontime0=[$32], acctstatustype0=[$33], acctterminatecause0=[$34], acctuniquesessionid0=[$35], calledstationid0=[$36], callingstationid0=[$37], eventtime0=[$38], eventtimestamp0=[$39], $f40=[$19], $f41=[$40], $f42=[$41], $f43=[$20])
              LogicalJoin(condition=[AND(=($35, $14), >($17, $38))], joinType=[inner])
                PinotLogicalExchange(distribution=[hash[14]])
                  LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $f40=[>($17, 0)], $f43=[-($17, 1)])
                    LogicalFilter(condition=[=($12, _UTF-8'Stop')])
                      LogicalTableScan(table=[[radius_json]])
                PinotLogicalExchange(distribution=[hash[14]])
                  LogicalProject($docId=[$0], $hostName=[$1], $segmentName=[$2], USERNAME=[$3], _3gppimsi=[$4], _3gppimsimccmnc=[$5], _3gppsgsnmccmnc=[$6], acctinputoctets=[$7], acctinputpackets=[$8], acctoutputoctets=[$9], acctoutputpackets=[$10], acctsessiontime=[$11], acctstatustype=[$12], acctterminatecause=[$13], acctuniquesessionid=[$14], calledstationid=[$15], callingstationid=[$16], eventtime=[$17], eventtimestamp=[$18], $f41=[>($17, 0)], $f42=[+($17, 1)])
                    LogicalFilter(condition=[=($12, _UTF-8'Start')])
                      LogicalTableScan(table=[[radius_json]])
          PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
            LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}], agg#0=[MIN($6)])
              PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
                LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}], agg#0=[MIN($6)])
                  LogicalProject(acctuniquesessionid0=[$2], acctuniquesessionid00=[$3], $f40=[$4], $f41=[$5], $f42=[$6], $f43=[$7], $f0=[true])
                    LogicalJoin(condition=[AND(=($3, $0), =($2, $0), >=($1, $6), <=($1, $7))], joinType=[inner])
                      PinotLogicalExchange(distribution=[hash[0, 0]])
                        LogicalProject(acctuniquesessionid=[$14], eventtime=[$17])
                          LogicalFilter(condition=[OR(=($12, _UTF-8'Start'), =($12, _UTF-8'Stop'))])
                            LogicalTableScan(table=[[radius_json]])
                      PinotLogicalExchange(distribution=[hash[0, 1]])
                        LogicalProject(acctuniquesessionid=[$0], acctuniquesessionid0=[$3], $f40=[$1], $f41=[$4], $f42=[$5], $f43=[$2])
                          LogicalAggregate(group=[{0, 1, 2, 3, 4, 5}])
                            PinotLogicalExchange(distribution=[hash[0, 1, 2, 3, 4, 5]])
                              LogicalAggregate(group=[{0, 2, 3, 4, 6, 7}])
                                LogicalJoin(condition=[AND(=($4, $0), >($1, $5))], joinType=[inner])
                                  PinotLogicalExchange(distribution=[hash[0]])
                                    LogicalProject(acctuniquesessionid=[$14], eventtime=[$17], $f40=[>($17, 0)], $f43=[-($17, 1)])
                                      LogicalFilter(condition=[AND(=($12, _UTF-8'Stop'), >($17, 0))])
                                        LogicalTableScan(table=[[radius_json]])
                                  PinotLogicalExchange(distribution=[hash[0]])
                                    LogicalProject(acctuniquesessionid=[$14], eventtime=[$17], $f41=[>($17, 0)], $f42=[+($17, 1)])
                                      LogicalFilter(condition=[AND(=($12, _UTF-8'Start'), >($17, 0))])
                                        LogicalTableScan(table=[[radius_json]])

The overall goal is to compute the Bytes sent and received by a device in a defined timeframe from freeradius accounting data.
So, for e.g., we want to know how many bytes where transfered by device "XYZ" in the month march.
We will have to do this for more than 1000000 devices.
This is the easiest request out of 4 requests. The other 3 requests will be more complicated

We are still in the POC phase to see if Pinot is the rigth tool for it

@RalfJL
Copy link
Author

RalfJL commented Apr 26, 2024

This might have the same root cause and may be easier to debug.
I splitted the freeradius requests into 3 tables. radius_start, radius_update, radius_stop which obviously contain the Start Stop and Interim-Update records.
Now the query:

select a.acctuniquesessionid,  a.acctstatustype, a.ts, b.acctstatustype, b.ts, (cast(b.ts as BIGINT)-cast(a.ts as BIGINT))/1000 as Zeit from radius_start a
join radius_stop b
on a.acctuniquesessionid = b.acctuniquesessionid
order by Zeit desc

returns 185096 records
The query having a where clause with a subselect:

select a.acctuniquesessionid,  a.acctstatustype, a.ts, b.acctstatustype, b.ts, (cast(b.ts as BIGINT)-cast(a.ts as BIGINT))/1000 as Zeit from radius_start a
join radius_stop b
on a.acctuniquesessionid = b.acctuniquesessionid
where not exists ( select 1 from radius_start c where c.acctuniquesessionid = a.acctuniquesessionid and c.ts < a.ts)
order by Zeit desc

returns 466790 records.
I would have expected less records with the filter than without, right?
And the query plan is interesting:
The first one:

Execution Plan
LogicalSort(sort0=[$5], dir0=[DESC])
  PinotLogicalSortExchange(distribution=[hash], collation=[[5 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalProject(acctuniquesessionid=[$1], acctstatustype=[$0], ts=[$2], acctstatustype0=[$4], ts0=[$6], Zeit=[/(-($7, $3), 1000)])
      LogicalJoin(condition=[=($1, $5)], joinType=[inner])
        PinotLogicalExchange(distribution=[hash[1]])
          LogicalProject(acctstatustype=[$12], acctuniquesessionid=[$14], ts=[$18], EXPR$0=[CAST($18):BIGINT NOT NULL])
            LogicalTableScan(table=[[radius_start]])
        PinotLogicalExchange(distribution=[hash[1]])
          LogicalProject(acctstatustype=[$12], acctuniquesessionid=[$14], ts=[$18], EXPR$0=[CAST($18):BIGINT NOT NULL])
            LogicalTableScan(table=[[radius_stop]])

pretty simple and stright forward.
The second one:

Execution Plan
LogicalSort(sort0=[$5], dir0=[DESC])
  PinotLogicalSortExchange(distribution=[hash], collation=[[5 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalProject(acctuniquesessionid=[$14], acctstatustype=[$12], ts=[$18], acctstatustype0=[$31], ts0=[$37], Zeit=[/(-(CAST($37):BIGINT NOT NULL, CAST($18):BIGINT NOT NULL), 1000)])
      LogicalFilter(condition=[IS NULL($40)])
        LogicalJoin(condition=[AND(=($14, $38), =($18, $39))], joinType=[left])
          PinotLogicalExchange(distribution=[hash[14, 18]])
            LogicalJoin(condition=[=($14, $33)], joinType=[inner])
              PinotLogicalExchange(distribution=[hash[14]])
                LogicalTableScan(table=[[radius_start]])
              PinotLogicalExchange(distribution=[hash[14]])
                LogicalTableScan(table=[[radius_stop]])
          PinotLogicalExchange(distribution=[hash[0, 1]])
            LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
              PinotLogicalExchange(distribution=[hash[0, 1]])
                LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
                  LogicalProject(acctuniquesessionid0=[$2], ts0=[$3], $f0=[true])
                    LogicalJoin(condition=[AND(=($0, $2), <($1, $3))], joinType=[inner])
                      PinotLogicalExchange(distribution=[hash[0]])
                        LogicalProject(acctuniquesessionid=[$14], ts=[$18])
                          LogicalTableScan(table=[[radius_start]])
                      PinotLogicalExchange(distribution=[hash[0]])
                        LogicalAggregate(group=[{0, 1}])
                          PinotLogicalExchange(distribution=[hash[0, 1]])
                            LogicalAggregate(group=[{0, 1}])
                              LogicalJoin(condition=[=($0, $2)], joinType=[inner])
                                PinotLogicalExchange(distribution=[hash[0]])
                                  LogicalProject(acctuniquesessionid=[$14], ts=[$18])
                                    LogicalTableScan(table=[[radius_start]])
                                PinotLogicalExchange(distribution=[hash[0]])
                                  LogicalProject(acctuniquesessionid=[$14])
                                    LogicalTableScan(table=[[radius_stop]])

has 3 additional joins

@gortiz gortiz added the multi-stage Related to the multi-stage query engine label Apr 26, 2024
@gortiz
Copy link
Contributor

gortiz commented Apr 26, 2024

I was able to replicate a similar plan in MultiStageQuickStart using:

select 
  playerID,
  numberOfGamesAsBatter,
  playerName,
  runs,
  intentionalWalks,
  CAST(hits as BIGINT) as Zeit
from baseballStats a
  join dimBaseballTeams b on a.teamID = b.teamID
  where not exists ( select 1 from baseballStats c where c.teamID = a.teamID and c.yearID < a.yearID)
order by Zeit desc

Which generates

Execution Plan
LogicalSort(sort0=[$5], dir0=[DESC])
  PinotLogicalSortExchange(distribution=[hash], collation=[[5 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalProject(playerID=[$3], numberOfGamesAsBatter=[$2], playerName=[$4], runs=[$5], intentionalWalks=[$1], Zeit=[CAST($0):BIGINT NOT NULL])
      LogicalFilter(condition=[IS NULL($11)])
        LogicalJoin(condition=[AND(=($6, $9), =($7, $10))], joinType=[left])
          PinotLogicalExchange(distribution=[hash[6, 7]])
            LogicalJoin(condition=[=($6, $8)], joinType=[inner])
              PinotLogicalExchange(distribution=[hash[6]])
                LogicalProject(hits=[$9], intentionalWalks=[$12], numberOfGamesAsBatter=[$15], playerID=[$16], playerName=[$17], runs=[$19], teamID=[$25], yearID=[$27])
                  LogicalTableScan(table=[[default, baseballStats]])
              PinotLogicalExchange(distribution=[hash[0]])
                LogicalProject(teamID=[$3])
                  LogicalTableScan(table=[[default, dimBaseballTeams]])
          PinotLogicalExchange(distribution=[hash[0, 1]])
            LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
              PinotLogicalExchange(distribution=[hash[0, 1]])
                LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
                  LogicalProject(teamID0=[$2], yearID0=[$3], $f0=[true])
                    LogicalJoin(condition=[AND(=($0, $2), <($1, $3))], joinType=[inner])
                      PinotLogicalExchange(distribution=[hash[0]])
                        LogicalProject(teamID=[$25], yearID=[$27])
                          LogicalTableScan(table=[[default, baseballStats]])
                      PinotLogicalExchange(distribution=[hash[0]])
                        LogicalAggregate(group=[{0, 1}])
                          PinotLogicalExchange(distribution=[hash[0, 1]])
                            LogicalAggregate(group=[{0, 1}])
                              LogicalJoin(condition=[=($0, $2)], joinType=[inner])
                                PinotLogicalExchange(distribution=[hash[0]])
                                  LogicalProject(teamID=[$25], yearID=[$27])
                                    LogicalTableScan(table=[[default, baseballStats]])
                                PinotLogicalExchange(distribution=[hash[0]])
                                  LogicalProject(teamID=[$3])
                                    LogicalTableScan(table=[[default, dimBaseballTeams]])

But at lest with the numbers we have in this quickstart, the rows returned by this version are smaller than the number returned without the where.

I'll take a look next week trying to understand the rules that have been applied to generate that plan.

Meanwhile, can you try to use LAST_WITH_TIME instead? IICU your query, that should be semantically equivalent to:

select a.acctuniquesessionid,
  a.acctstatustype,
  a.ts,
  b.acctstatustype,
  b.ts,
  (cast(b.ts as BIGINT) - cast(a.ts as BIGINT)) / 1000 as Zeit
from (
    select acctuniquesessionid,
      LAST_WITH_TIME(acctstatustype, ts, "STRING") as acctstatustype,
      MAX(ts) as ts
    from radius_start
    group by acctuniquesessionid
  ) as a
  join radius_stop b on a.acctuniquesessionid = b.acctuniquesessionid
order by Zeit desc

@RalfJL
Copy link
Author

RalfJL commented Apr 29, 2024

Thanks for the query.
I can confirm that it returns the same amount of records as the query

select a.acctuniquesessionid,  a.acctstatustype, a.ts, b.acctstatustype, b.ts, (cast(b.ts as BIGINT)-cast(a.ts as BIGINT))/1000 as Zeit from radius_start a
join radius_stop b
on a.acctuniquesessionid = b.acctuniquesessionid
order by Zeit desc

But it is not the same as the second query with the "not exists" clause. Please correct me if I am wrong.
And by the way, my "not exists" clause is wrong, see explanation below.
Never the less it is astonishing, that adding a filter produces more output than the same query without the filter.

The overall goal is to find matching Start-Stop Records though they have the same "acctuniquesessionid". The key is the timestamp column "ts".
Think about the following example:
acctuniquesssionid ts-Start ts-Stop
099d8dd5581b5e511f895c7c736913d4 2024-04-28 13:05:05.0 2024-04-28 13:15:08.0
099d8dd5581b5e511f895c7c736913d4 2024-04-28 13:15:09.0 2024-04-28 14:24:48.0

Simply joining both tables will return 4 records.
ts acctstatustype ts acctstatustype Zeit
2024-04-28 13:15:09.0 Start 2024-04-28 13:15:08.0 Stop -1
2024-04-28 13:15:09.0 Start 2024-04-28 14:24:48.0 Stop 4179
2024-04-28 13:05:05.0 Start 2024-04-28 13:15:08.0 Stop 603
2024-04-28 13:05:05.0 Start 2024-04-28 14:24:48.0 Stop 4783
And obviously the first and the last records are wrong
Next step is to filter out the record, where Stop is before Start (kick out the 1. Record)

select a.ts, a.acctstatustype, b.ts, b.acctstatustype,(cast(b.ts as BIGINT) - cast(a.ts as BIGINT)) / 1000 as Zeit from radius_start a
inner join radius_stop b
on a.acctuniquesessionid = b.acctuniquesessionid
where a.ts < b.ts and a.acctuniquesessionid = '099d8dd5581b5e511f895c7c736913d4'

And the next step is to kick out the pair where Start from the first session is combined with the Stop of the second session.
In other words there can not be any other Start record between our Start-Stop pair

select a.ts, a.acctstatustype, b.ts, b.acctstatustype,(cast(b.ts as BIGINT) - cast(a.ts as BIGINT)) / 1000 as Zeit from radius_start a
inner join radius_stop b
on a.acctuniquesessionid = b.acctuniquesessionid and a.ts < b.ts 
where  a.acctuniquesessionid = '099d8dd5581b5e511f895c7c736913d4'
and not exists ( select 1 from radius_start c where c.acctuniquesessionid = a.acctuniquesessionid and cast(c.ts as BIGINT) between cast(a.ts as BIGINT) +1 and cast(b.ts as BIGINT) -1)

Which returns the desired pairs:
acctuniquesessionid ts acctstatustype ts acctstatustype Zeit
099d8dd5581b5e511f895c7c736913d4 2024-04-28 13:15:09.0 Start 2024-04-28 14:24:48.0 Stop 4179
099d8dd5581b5e511f895c7c736913d4 2024-04-28 13:05:05.0 Start 2024-04-28 13:15:08.0 Stop 603

Removing the filter for acctuniquessionid still shows the right records but the number of records returned differ by the factor 2.5 depending on the where clause
Query with "not exists" returns 467702 records
Query without any where clause returns 185501 records
actually it should be the other way round, right?

@gortiz
Copy link
Contributor

gortiz commented Apr 29, 2024

We are pretty blind here given we don't know in which stage the extra rows are being generated. @RalfJL could you compile and run your case with the code I have in #12704? That PR is quite long but as you can see in the description, we are adding there a new stats system that can be used to list information per operation. It would be super useful to understand where the problem is being generated.

@RalfJL
Copy link
Author

RalfJL commented Apr 30, 2024

Sure I will do that.
But currently I am not sure what to do.
Can you please outline the steps? What do I have to pull or merge? What do I have to do to compile the code and is there any special instruction on running the query?
I just want to make sure that I am doing exactely what you expect me to do.
Thanks
Ralf

@gortiz
Copy link
Contributor

gortiz commented Apr 30, 2024

We plan to have this PR merged soon (maybe next week) which would mean you will have a docker image ready to use.

But in order to accelerate the discoveries, you can fetch my code and compile yourself, but luckily we already include an script to do so.

Specifically, in any branch, you can execute:

cd docker/images/pinot 
./docker-build.sh 1.2.0-multi-stage-stats multi-stage-stats https://github.com/gortiz/pinot.git

AFAIR the only dependency is to be able to build docker images (and have internet access).

Once that command finishes (it will take some mins) you should have a docker image called apachepinot/pinot:1.2.0-multi-stage-stats. If you deploy that image in your servers and brokers, you will receive a slightly different json response to your queries. The new response will include and entry named stageStats that will be a JSON with all these new stats. It would be great if you can share that.

@RalfJL
Copy link
Author

RalfJL commented May 2, 2024

Hmmm,
I think I did everything right and the result is better. The query with "where" condition returns less records than without.
But I can't see any "stageStats" which makes me suspicious.
Should "stageStats" appear in the query plan or in the "query response stats"?
How can I make sure that I really use your branch?
Thanks
Ralf

@gortiz
Copy link
Contributor

gortiz commented May 3, 2024

I think I did everything right and the result is better. The query with "where" condition returns less records than without.

That looks strange, AFAIK we didn't change that part of the code since 1.1.0. But good to know, maybe we can catch where the problem was fixed thanks to that.

But I can't see any "stageStats" which makes me suspicious.
Should "stageStats" appear in the query plan or in the "query response stats"?

It should be in the returned JSON. If you execute the query using the Pinot UI (aka the controller web interface) you should be able to read that by clicking on Show JSON format. Then the table below will be changed with a (probably very large) json that will contain the data of your response and other metadata like stageStats. See this image:

image

BTW, I've just merged my branch, so the next version published docker image tagged with latest will contain my changes. IIRC they are published every day (or maybe even more than once every day)

@RalfJL
Copy link
Author

RalfJL commented May 7, 2024

Very strange! I can't reproduce it. Neither with the current stable 1.1.0 nor with the repository 1.2.0 from gortiz.
So it might be in the Data, which I unfortunately did not save from the first try.

I suggest to close this thread with reason "not reproducable". Because I am not done yet with the application we are trying to build, I will be on it for the next 3 or 4 weeks. And if I find that problem again I can reopen this thread.
Do you agree?

By the way, I confirm that the execution plans are almost the same. But the changes look to me not to be relevant.
e.g.
LogicalProject(ts=[$18], acctstatustype=[$12], ts0=[$37], acctstatustype0=[$31], acctuniquesessionid=[$14], Zeit=[/(-(CAST($37):BIGINT NOT NULL, CAST($18):BIGINT NOT NULL), 1000)])
becomes is 1.2.0:
LogicalProject(ts=[$2], acctstatustype=[$0], ts0=[$4], acctstatustype0=[$3], acctuniquesessionid=[$1], Zeit=[/(-(CAST($4):BIGINT NOT NULL, CAST($2):BIGINT NOT NULL), 1000)])
So only the number changed
And
LogicalTableScan(table=[[radius_start]])
becomes
LogicalTableScan(table=[[default, radius_start]])

@gortiz
Copy link
Contributor

gortiz commented May 7, 2024

were you able to use the stageStats field? Was it useful?

@RalfJL
Copy link
Author

RalfJL commented May 7, 2024

I found the stageStats json element but currently I do not really have an idea of what to look for or pay attention to.
Is there a document or a guideline?
In the end I will have to run NiFi to do some time triggered curl commands against Pinot. The result has to be analysed for errors, the resulting records will have to be split and pushed to a MariaDB that holds further asset data that are needed for computation and analysis.
Speaking about this, it would be helpfull if there would be a guide on how to interpret the stageStats.
And, in real life, I will have to handle over 300.000.000 records a week from around 2.000.000 devices. Very important would be to see if any limits are hit or any resources are exausted.

@RalfJL
Copy link
Author

RalfJL commented May 8, 2024

Different result set on successive calls of the same SQL. This might be the same root cause as above.
So I have a complicated call

select b.ts, a.ts, a.callingstationid, b.callingstationid, cast(a.ts as bigint)-cast(b.ts as bigint) as Zeit, a.ts as Stop, b.ts as Start, a.acctuniquesessionid, b.acctuniquesessionid from radius_stop a
inner join radius_start b
on a.acctuniquesessionid = b.acctuniquesessionid and a.callingstationid = b.callingstationid and b.ts < a.ts
where a._3gppimsi = 'null' and
not exists ( select 1 from radius_start c where c.acctuniquesessionid = a.acctuniquesessionid and cast(c.ts as BIGINT) between cast(b.ts as BIGINT) +1 and cast(a.ts as BIGINT) -1) and
not exists ( select 1 from radius_stop c where c.acctuniquesessionid = b.acctuniquesessionid and cast(c.ts as BIGINT) between cast(b.ts as BIGINT) +1 and cast(a.ts as BIGINT) -1) and
a.callingstationid = '94-58-CB-C1-5E-D1'

and the number of rows returned differ between 10 and 24. So sometimes it returns 10 immediatley after that 17 rows, immediatley after it 24 rows than mabye 10 again or maybe 14, ...
But there are only a distinct number of rows returned: 10, 14, 17, 21, 24 and they are not equally distributed. 10 is more often, 21 or 24 is seldom.
With every call to the same SQL the number of rows returned changes or not.
I have only 1 broker, 1 server, 1 controller
The stageStats are the same except the number of "emittedRows" and the time.
e.g.

    "children": [
      {
        "type": "MAILBOX_SEND",
        "executionTimeMs": 688,
        "emittedRows": 17,

compared to

   "children": [
      {
        "type": "MAILBOX_SEND",
        "executionTimeMs": 632,
        "emittedRows": 10,

The database is currently static. No rows added or deleted.
My first observation might be false. It might not have to do with the where clause but with the fact that successive calls return different result sets

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multi-stage Related to the multi-stage query engine troubleshooting
Projects
None yet
Development

No branches or pull requests

3 participants