Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoidance of head-of-line blocking by means of portals suspension? #652

Open
evgenyvsmirnov opened this issue May 3, 2024 · 2 comments
Open
Labels
for: external-project For an external project and not something we can fix

Comments

@evgenyvsmirnov
Copy link

Greeting!

Please consider the following code. It's a conceptual model of a reactive chain which reads a stream of DDD-aggregates (in a real app I would be better off doing the same via spring crud-repositories). The first select retrieves roots of aggregates (user), selects from the downstream retrieve some related entities (accounts):

Flux.defer {
    databaseClient.sql("select userId from users where userId >= $1 and userId <= $2")
        .bind("$1", 1)
        .bind("$2", 256)
        .flatMap { r -> r.map { row, _ -> row.get(0).toString().toInt() } }
        .flatMap { userId ->
            databaseClient.sql("select login from accounts where userId=$1 limit 1")
                .bind("$1", userId!!)
                .flatMap { r -> r.map { row, meta -> row.get(0, kotlin.String::class.java) } }
        }
}.`as`(transactionalOperator::transactional)

Problem

Such a chain can read at most 255 users. If the number exceeds an exception ("Cannot exchange messages because the request queue limit is exceeded") is thrown which is frustrating especially for spring-data-crud-repository users. Apparently if one wants to read a stream of (DDD) aggregates (via multiple crud-repositories) they are about to use either pagination (and carefully calculate all the requests the downstream might emit) or joins (the former is nothing to do with the reactive approach (we want streams, not pages)).

The reason of the aforementioned exception is head of line blocking: the conversation for "select userId from users" can't be removed from the conversations queue (io.r2dbc.postgresql.client.ReactorNettyClient) until Postgres's "Command completion" is received and the portal is closed thereafter, simultaneously conversations for the requests from the downstream are queueing up until the limit is reached and the exception is thrown.

Workaround

At the same time the task is doable. The following code (employing a cursor) is capable of reading an infinite stream of users and emitting additional request for each of them:

@Transactional(isolation = Isolation.REPEATABLE_READ, readOnly = true)
open fun browseUserIds(transformer: (Int) -> Mono<UserEntity>): Flux<UserEntity> =
    Flux.usingWhen(
        Mono.deferContextual {
            Mono.just((it.get("cursorId") as String))
        }.flatMap {
            cursor -> client.sql("DECLARE $cursor CURSOR FOR SELECT userId FROM users;")
                .flatMap { r -> Flux.just(cursor) }
                .last()
        },

        {
            cursor ->
            Flux.generate { sink ->
                sink.next(1)
            }.concatMap {
                client.sql("FETCH FORWARD $it FROM $cursor;")
                    .flatMap { r -> r.map { row, _ -> row.get("userId")!!.toString().toInt() } }
                    .switchIfEmpty(Mono.just(POISON_PILL))
            }
                .takeUntil { it == POISON_PILL }
                // Emit an arbitrary number of selects for the next userId via crud-repositories/templates. 
               .concatMap { transformer.invoke(it) }
        },

        {
                cursor -> client.sql("CLOSE $cursor;").then()
        }
    ).contextWrite(
        Context.of("cursorId", "uc_${UUID.randomUUID().toString().replace("-", "")}")
    )

The key difference is that every cursor-related instruction is treated by the driver as a separate conversation plus suspension is intrinsic to portals created via declare cursor which both make possible to send additional (conventional) selects inside "transformer.invoke(it)".

Possible solution

At the same time according to this answer in pgsql-hackers (https://www.postgresql.org/message-id/1915c800-2c49-4039-a840-7cafc0654fe4%40iki.fi) there is no difference (for a backend) between a portal created via declare cursor and via bind.

So could the following approach pay off: if Postgres's "Portal suspended" is received for Nth conversation (a fetchSize is to be defined for the corresponding query which is already possible) the driver goes about executing subsequent conversations (generated by a downstream (if any)) and resumes the Nth conversation (via a new "Execute" command) only after all subsequent conversations are done, thereby the following simple code, a lot of developers tend to write, could work (now it can't owing to the aforementioned reasons):

@Transactional(isolation = Isolation.REPEATABLE_READ, readOnly = true)
override fun browseUsers(): Flow<User> =
    // Not necessarily all but some considerable amount 
    crudUserEntityRepository.findAll().map {
        User(
            it.userId,
            it.userName,
            it.email,
            crudAccountEntityRepository.findAllByUserId(it.userId).map { Account(it.login, it.password) }.toSet(),
            crudPhoneEntityRepository.findAllByUserId(it.userId).map { Phone(it.number) }.toSet()
        )
    }

Motivation

In my eyes it would be a step change for spring-data users because it seems that the only way to read a stream of complex objects nowadays is either the aforementioned approach (employing a cursor) or joins. The former has undue complexity, the latter has a lot of disadvantages:

  • joining several tables may be quite a task itself,
  • tons of boilerplate code are inherent in dealing with joins,
  • the more tables one joins the more chances postgresql ends up with an ineffective plan.

Thanks!

@evgenyvsmirnov evgenyvsmirnov added the type: enhancement A general enhancement label May 3, 2024
@mp911de
Copy link
Collaborator

mp911de commented May 3, 2024

This is a known issue. R2DBC is a bit more honest than JDBC in that regard as JDBC fetches the entire cursor into memory when running another query while the cursor is open. That is also the reason why reading an entire chunk into List makes sense as nested queries require some sort of chunking to not exhaust all memory.

I would ask you to file the ticket in https://github.com/spring-projects/spring-data-relational/issues as the driver itself only passes-thru SQL. The issue could be also solved by utilizing multiple connections. That would however force data to escape its transactional closure.

Your example is exactly doing that, allocating cursors on the server and progressively consuming these.

@mp911de mp911de added for: external-project For an external project and not something we can fix and removed type: enhancement A general enhancement labels May 3, 2024
@evgenyvsmirnov
Copy link
Author

Thank you for the swift reply!

I agree it could be solved in spring-data-relational but it would be an ad-hoc solution whereas this issue is intrinsic to basic driver scenarios (its quite a mundane thing to have a downstream which does something with the data from the upstream).
Probably I wasn't quite clear about the idea. The following traffic is produced by aforementioned code which employs a cursor. Apparently it doesn't fetch the entire cursor data into memory.

113	5.348749	64682	5432	PGSQL	110	>Q -------> BEGIN ISOLATION LEVEL READ COMMITTED, READ WRITE
114	5.349200	5432	64682	TCP	56	5432 → 64682 [ACK] Seq=1 Ack=55 Win=5510 Len=0 TSval=1616254453 TSecr=1260703638
121	5.369258	5432	64682	PGSQL	73	<C/Z
122	5.369310	64682	5432	TCP	56	64682 → 5432 [ACK] Seq=55 Ack=18 Win=7487 Len=0 TSval=1260703658 TSecr=1616254473
123	5.371822	64682	5432	PGSQL	149	>Q -------> DECLARE uc_b5eda82269194db080bc38b7d49ad82b CURSOR FOR SELECT userId FROM users;
124	5.371942	5432	64682	TCP	56	5432 → 64682 [ACK] Seq=18 Ack=148 Win=5509 Len=0 TSval=1616254476 TSecr=1260703661
129	5.381385	5432	64682	PGSQL	82	<C/Z
130	5.381426	64682	5432	TCP	56	64682 → 5432 [ACK] Seq=148 Ack=44 Win=7487 Len=0 TSval=1260703670 TSecr=1616254485
131	5.382232	64682	5432	PGSQL	119	>Q -------> FETCH FORWARD 1 FROM uc_b5eda82269194db080bc38b7d49ad82b;
132	5.382290	5432	64682	TCP	56	5432 → 64682 [ACK] Seq=44 Ack=211 Win=5508 Len=0 TSval=1616254486 TSecr=1260703671
137	5.387157	5432	64682	PGSQL	123	<T/D/C/Z -------> FETCH 1 Backend returns another one userId
138	5.387192	64682	5432	TCP	56	64682 → 5432 [ACK] Seq=211 Ack=111 Win=7486 Len=0 TSval=1260703676 TSecr=1616254491
139	5.390694	64682	5432	PGSQL	202	>Q -------> SELECT users.* FROM users WHERE users.userId = 1001
140	5.390799	5432	64682	TCP	56	5432 → 64682 [ACK] Seq=111 Ack=357 Win=5505 Len=0 TSval=1616254495 TSecr=1260703680
145	5.402451	5432	64682	PGSQL	220	<1/2/T/D/C/3/Z
146	5.402490	64682	5432	TCP	56	64682 → 5432 [ACK] Seq=357 Ack=275 Win=7483 Len=0 TSval=1260703691 TSecr=1616254506
147	5.406728	64682	5432	PGSQL	119	>Q -------> SELECT accounts.* FROM accounts WHERE accounts.userId = 1001 
…
10380	10.210070	64682	5432	PGSQL	119	>Q -------> FETCH FORWARD 1 FROM uc_b5eda82269194db080bc38b7d49ad82b;
10381	10.210126	5432	64682	TCP	56	5432 → 64682 [ACK] Seq=378573 Ack=180099 Win=6780 Len=0 TSval=1616259314 TSecr=1260708499
10390	10.212295	5432	64682	PGSQL	107	<T/C/Z -------> FETCH 0
10391	10.212424	64682	5432	TCP	56	64682 → 5432 [ACK] Seq=180306 Ack=378624 Win=5654 Len=0 TSval=1260708501 TSecr=1616259316
10414	10.227658	64682	5432	PGSQL	104	>Q -------> CLOSE uc_b5eda82269194db080bc38b7d49ad82b;
10415	10.227717	5432	64682	TCP	56	5432 → 64682 [ACK] Seq=379888 Ack=180677 Win=6771 Len=0 TSval=1616259331 TSecr=1260708516
10420	10.229530	5432	64682	PGSQL	80	<C/Z
10421	10.229615	64682	5432	TCP	56	64682 → 5432 [ACK] Seq=180677 Ack=379912 Win=5634 Len=0 TSval=1260708519 TSecr=1616259334
10422	10.230124	64682	5432	PGSQL	68	>Q -------> COMMIT
10423	10.230200	5432	64682	TCP	56	5432 → 64682 [ACK] Seq=379912 Ack=180689 Win=6771 Len=0 TSval=1616259334 TSecr=1260708519
10428	10.233159	5432	64682	PGSQL	74	<C/Z

For the sake of conciseness let us write down what happens in the channel:

>BEGIN ISOLATION LEVEL REPEATABLE READ, READ WRITE
>DECLARE uc_b5eda82269194db080bc38b7d49ad82b CURSOR FOR SELECT userId FROM users;


>FETCH FORWARD 1 FROM uc_b5eda82269194db080bc38b7d49ad82b;
<FETCH 1 (DB returns 1st user id)
>Q (SELECT users.* FROM users WHERE users.userId = 1001)
< …
>Q (SELECT accounts.* FROM accounts WHERE accounts.userId = 1001)
< …


FETCH FORWARD 1 FROM uc_b5eda82269194db080bc38b7d49ad82b;
<FETCH 1 (DB returns 2nd user id)
>Q (SELECT users.* FROM users WHERE users.userId = 1002)
< …
>Q (SELECT accounts.* FROM accounts WHERE accounts.userId = 1002)
< …


… After an umpteenth repetition …


>FETCH FORWARD 1 FROM uc_b5eda82269194db080bc38b7d49ad82b;
<FETCH 0 (Nothing userIds left in the cursor)


>CLOSE uc_b5eda82269194db080bc38b7d49ad82b;
>COMMIT

I want to denote that all this communication happens inside the same transaction (hence inside the same connection) and according to https://www.postgresql.org/message-id/1915c800-2c49-4039-a840-7cafc0654fe4%40iki.fi the driver could get the same result without a cursor by means of a limit in the execute and more sophisticated queue processing algorithm (it wouldn't be a queue actually):

>BEGIN ISOLATION LEVEL REPEATABLE READ, READ WRITE


>P/B/D/E/H (PARSE; BIND B_0 (SELECT userId FROM users); DESCRIBE; EXECUTE B_0 (fetchSize 1); FLUSH)
<1/2/T/D/s (…; DATA ROW; PORTAL SUSPENDED)
Here B_0 is suspended and arbitrary selects can be executed (according to that pgsql-hackers thread)
>Q (SELECT users.* FROM users WHERE users.userId = 1001)
< …
>Q (SELECT accounts.* FROM accounts WHERE accounts.userId = 1001)
< …


>E (EXECUTE B_0 (fetchSize 1))
>H (FLUSH)
<D/s (DATA ROW; PORTAL SUSPENDED))
Here B_0 is again suspended and arbitrary selects can be executed
>Q (SELECT users.* FROM users WHERE users.userId = 1002)
< …
>Q (SELECT accounts.* FROM accounts WHERE accounts.userId = 1002)
< …


…Many more repetitions…


>E (EXECUTE B_0 (fetchSize 1))
>H (FLUSH)
<C (Command completion)
>C
>S
<3/Z

> COMMIT

If the driver produced such a traffic the very first chain would work for any number of users:

Flux.defer {
    databaseClient.sql("select userId from users where userId >= $1 and userId <= $2")
        .bind("$1", 1)
        .bind("$2", 100_000)
        .flatMap { r -> r.map { row, _ -> row.get(0).toString().toInt() } }
        .flatMap { userId ->
            databaseClient.sql("select login from accounts where userId=$1 limit 1")
                .bind("$1", userId!!)
                .flatMap { r -> r.map { row, meta -> row.get(0, kotlin.String::class.java) } }
        }
}.`as`(transactionalOperator::transactional)

Do I miss something? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: external-project For an external project and not something we can fix
Projects
None yet
Development

No branches or pull requests

2 participants