Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No connection timeout #38

Open
nick-adjust opened this issue Sep 18, 2019 · 3 comments
Open

No connection timeout #38

nick-adjust opened this issue Sep 18, 2019 · 3 comments
Assignees

Comments

@nick-adjust
Copy link

If there is no connection to the kafka broker an attempt to establish such a connection hangs indefinitely and doesn't check for interrupts.

Unfortunately I don't have a full backtrace, but here's librdkafka part of it:

#0  0x00007f127898d96a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f1279e61e59 in cnd_timedwait (cond=cond@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, ts=ts@entry=0x7ffe3d7ca300) at tinycthread.c:462
#2  0x00007f1279e62273 in cnd_timedwait_abs (cnd=cnd@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, tspec=tspec@entry=0x7ffe3d7ca300) at tinycthread_extra.c:100
#3  0x00007f1279e2bdff in rd_kafka_q_serve (rkq=rkq@entry=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x7f1279df67f0 <rd_kafka_consume_cb>, opaque=opaque@entry=0x7ffe3d7ca410) at rdkafka_queue.h:475
#4  0x00007f1279df6f51 in rd_kafka_consume_callback0 (rkq=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=<optimized out>, consume_cb=<optimized out>, opaque=<optimized out>)
    at rdkafka.c:2617
@zilder
Copy link
Contributor

zilder commented Sep 23, 2019

The full stacktrace:

#0  0x00007f018c4b196a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f018b10ee59 in cnd_timedwait (cond=cond@entry=0x55d599512918, 
    mtx=mtx@entry=0x55d5995128f0, ts=ts@entry=0x7fffd3c3ea50) at tinycthread.c:462
#2  0x00007f018b10f273 in cnd_timedwait_abs (cnd=cnd@entry=0x55d599512918, 
    mtx=mtx@entry=0x55d5995128f0, tspec=tspec@entry=0x7fffd3c3ea50)
    at tinycthread_extra.c:100
#3  0x00007f018b0d8dff in rd_kafka_q_serve (rkq=rkq@entry=0x55d5995128f0, 
    timeout_ms=timeout_ms@entry=100, max_cnt=max_cnt@entry=0, 
    cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, 
    callback=callback@entry=0x7f018b0a7090 <rd_kafka_poll_cb>, opaque=opaque@entry=0x0)
    at rdkafka_queue.h:475
#4  0x00007f018b0a637e in rd_kafka_query_watermark_offsets (rk=<optimized out>, 
    topic=<optimized out>, partition=partition@entry=0, low=low@entry=0x7fffd3c3ecc0, 
    high=high@entry=0x7fffd3c3ecc8, timeout_ms=timeout_ms@entry=1000) at rdkafka.c:3102
#5  0x00007f018c7d00ff in kafka_get_watermarks (fcinfo=0x55d5995c6310) at src/utils.c:72
#6  0x000055d597c0ef1f in ExecMakeFunctionResultSet (fcache=0x55d5995c62a0, 
    econtext=econtext@entry=0x55d5995c5860, argContext=<optimized out>, 
    isNull=<optimized out>, isDone=isDone@entry=0x55d5995c6288) at execSRF.c:604
#7  0x000055d597c2a7ba in ExecProjectSRF (node=node@entry=0x55d5995c5750, 
    continuing=continuing@entry=false) at nodeProjectSet.c:175
#8  0x000055d597c2a875 in ExecProjectSet (pstate=0x55d5995c5750) at nodeProjectSet.c:105
#9  0x000055d597c0570b in ExecProcNode (node=0x55d5995c5750)
    at ../../../src/include/executor/executor.h:247
#10 ExecutePlan (execute_once=<optimized out>, dest=0x55d5995d3a60, 
    direction=<optimized out>, numberTuples=0, sendTuples=<optimized out>, 
    operation=CMD_SELECT, use_parallel_mode=<optimized out>, planstate=0x55d5995c5750, 
    estate=0x55d5995c5540) at execMain.c:1723
#11 standard_ExecutorRun (queryDesc=0x55d5995bf7e0, direction=<optimized out>, count=0, 
    execute_once=<optimized out>) at execMain.c:364
#12 0x00007f018c8e0fc5 in pgss_ExecutorRun (queryDesc=0x55d5995bf7e0, 
    direction=ForwardScanDirection, count=0, execute_once=<optimized out>)
    at pg_stat_statements.c:892
#13 0x000055d597d5026b in PortalRunSelect (portal=portal@entry=0x55d599587fe0, 
    forward=forward@entry=true, count=0, count@entry=9223372036854775807, 
    dest=dest@entry=0x55d5995d3a60) at pquery.c:932
#14 0x000055d597d5171e in PortalRun (portal=portal@entry=0x55d599587fe0, 
    count=count@entry=9223372036854775807, isTopLevel=isTopLevel@entry=true, 
    run_once=run_once@entry=true, dest=dest@entry=0x55d5995d3a60, 
    altdest=altdest@entry=0x55d5995d3a60, completionTag=0x7fffd3c3f340 "") at pquery.c:773
#15 0x000055d597d4d5e7 in exec_simple_query (
    query_string=0x55d5994eb150 "select kafka_get_watermarks('kafka_7_cpc'::regclass);")
    at postgres.c:1145
#16 0x000055d597d4ee5e in PostgresMain (argc=<optimized out>, 
    argv=argv@entry=0x55d599520df8, dbname=<optimized out>, username=<optimized out>)
    at postgres.c:4182
#17 0x000055d597cdd036 in BackendRun (port=0x55d59951d7a0) at postmaster.c:4358
#18 BackendStartup (port=0x55d59951d7a0) at postmaster.c:4030
#19 ServerLoop () at postmaster.c:1707
#20 0x000055d597cddf07 in PostmasterMain (argc=5, argv=0x55d5994e5910) at postmaster.c:1380
#21 0x000055d597a6f636 in main (argc=5, argv=0x55d5994e5910) at main.c:228

@zilder
Copy link
Contributor

zilder commented Sep 23, 2019

I think this might be a bug in librdkafka but I cannot yet assert this with full confidence. From my perspective it looks like in rd_kafka_query_watermark_offsets() the result of rd_kafka_q_serve() is not interpreted correctly. It should return the number of messages that have been processed, but it's treated as if it was rd_kafka_op_res_t (a result value of a specified callback function, rd_kafka_poll_cb). It either doesn't make sense or i don't see something.

In our case rd_kafka_q_serve returns 0 because it times out. The caller compares the result with RD_KAFKA_OP_RES_YIELD (which is equal to 3 if converted to integer) and proceeds to the next iteration. This repeats again and again and execution stucks in the infinite loop. I prepared a rough draft for the patch:

-        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
-               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
-                                rd_kafka_poll_cb, NULL) !=
-               RD_KAFKA_OP_RES_YIELD)
-                ;
+        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+        {
+                if (rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL) == 0)
+                {
+                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+                }
+        }

It worked in our case. I'll double check it soon and will create a PR to librdkafka.

@nick-adjust
Copy link
Author

Posting it here for visibility:
confluentinc/librdkafka#2535

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants