Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent misbehaving destinations from effecting healthy ones #2

Open
bretthoerner opened this issue Dec 20, 2023 · 6 comments
Open

Comments

@bretthoerner
Copy link
Collaborator

bretthoerner commented Dec 20, 2023

We have 2 unimplemented features on the v1 requirements list:

  • "First tries prioritized over retries of failures (separate queues)"
  • "Single slow webhook [sic: destination?] doesn't block everything"

These feel pretty related, so I figured we could discuss them in one place. See prior discussion in Slack, but I plan to summarize/clarify here so you should be able to skip that unless you're really curious.


Separate Queues

This should be relatively easy and makes a lot of sense.

  • Have a standard/default queue, say webhooks. When submitting a job for retry, also change its queue column (say, webhooks-degraded).
  • Run another consumer that just operates on webhooks-degraded, probably with a lower number of max PG connections.
  • The janitor can clean both queues (at the same time), because nothing about deleting rows or sending app_metrics has to do with the queue.

This prevents retries from blocking events that haven't been tried at all.


Slow destination doesn't block up everything else

This one is more difficult, and in my opinion the primary goal of this project. If we didn't implement this, we may as well have used Kafka and pushed failures to a second topic (which would push failures back onto itself to retry).

To give a concrete example (and why "Separate Queues" above doesn't solve it alone):

  • Say a popular Webhook destination like Slack begins to take 30 seconds to respond.
  • Separate Queues above doesn't prevent us from trying every Webhook at least once in the main queue.
    • Sure, we have a 5 second (or whatever) timeout, but that still means 5 seconds per attempt if we are naive.
  • Remember, it's a popular destination! Enough of these taking 5 seconds for the first attempt will completely clog up the Webhook pipeline.
  • It's a popular destination, and it will definitely come back eventually, so we don't want to actually disable the senders.
    • We could in theory if we had a system to safely auto-reenable them. That sounds complicated, and would still require a system to detect which backends are bad enough, and to disable them. I think the proposal below is simpler and safer.

(In the Slack thread linked above, we discussed some ideas around the janitor service collecting metrics about timeouts/connection-errors/etc and flagging certain destinations as degraded. The producer (and consumer?) would then need to get that information (via PG, either pub/sub or by polling a table). None of this is impossible but it's a lot of moving parts.)

I propose a consumer-only solution that I think will be easy to implement, cheap to use, and solve the problem above:

  • Each consumer process periodically runs a query roughly like:
SELECT target, count(*)
FROM job_queue
WHERE attempt > 1
AND <timeout error or connection error or http 5XX in errors>
-- edit: I don't think we even need to check errors? Having an attempt > 1 means it hit a retryable error!
ORDER BY count(*) DESC;
  • This can select from either queue or both of them at once.
  • Note that it does not use FOR UPDATE SKIP LOCKED and it does not hold open a transaction for any other work.
  • It caches the results of that query in memory.
  • The dequeue query is adjusted like so:
ORDER BY
  CASE
    WHEN target IN ('cached_bad_target_1', 'cached_bad_target_2') THEN 1
    ELSE 0
  END,
  attempts,
  id
  • This could be adjusted to have more CASEs (up to a reasonable limit) and assign a higher number to targets that have observed more errors, effectively pushing them back in the order.

What this does is cheaply (AFAIK) prevent a slow destination from hurting the pipeline even on the first attempt for any given Webhook payload.

The producer could even use the same query/cache and proactively put jobs into the degraded queue (they'd have attempt=0 and so they'd be tried first in the degraded queue).

Downsides:

  1. It's working on stale information.
    • I propose that it doesn't matter whether completed/failed rows are being cleaned up by the janitor, or whether we have metrics per destination going back in time. The single failure of a destination puts it behind others in the queue. The queue should have capacity to still service those, this just prefers healthy destinations when there is contention on resources (PG connections, primarily).
    • The informational query should be pretty cheap, we can run it every 30 or 60 seconds, with jitter.
    • IMO we don't care if the pipeline chokes up momentarily on a bad destination, so long as everything starts improving within ~30 seconds. What we want to prevent is a 5 hour delay to destination A just because B is having issues. I don't think we care about the slightly increased p99 as things are detected.
  2. What if the number of bad targets get huge?
    • Is this a real risk? We should put in a realistic LIMIT to prevent the query breaking, but I think that's enough. We select the top 10, 25, 50 worst backends and by eliminating them we make the queue a lot more healthy.
    • I don't think we should have more than a small handful of destinations with retryable errors at any given time anyway?
    • v2 calls for disabling bad destinations automatically, which would help this.
    • We should report the count of bad destinations as a Prometheus metric and monitor this as we roll it out.

Upsides:

  • Don't have to get special information from the janitor task to all of the other processes.
  • Don't have to track any metrics per destination over time, just ORDERing the queue based on information available is exactly what we want.
  • Queries used should be cheap with the proper indexes.
@tomasfarias
Copy link
Contributor

tomasfarias commented Dec 29, 2023

So we define bad targets as targets with more than one attempt (we can ignore errors as, like you said, if you have more than 1 attempt, you failed with a retryable error):

SELECT target, count(*)
FROM job_queue
WHERE attempt > 1
AND <timeout error or connection error or http 5XX in errors>
-- edit: I don't think we even need to check errors? Having an attempt > 1 means it hit a retryable error!
ORDER BY count(*) DESC;

Once we know who the bad targets are, we pass them to dequeue:

ORDER BY
  CASE
    WHEN target IN ('cached_bad_target_1', 'cached_bad_target_2') THEN 1
    ELSE 0
  END,
  attempt,
  id

Why don't we just sort on attempt, which is what defines a bad target?:

ORDER BY
  attempt,
  id

Given that bad targets are a function of attempt, can't we can inline the function and sort ASC on attempt?

I agree with the general approach, just wondering if we can skip the first query entirely (as we have the number of attempts in the job table already).

The downside of this simplification is that we don't know what the bad targets are, so the v2 goal would need some extra work, but I would argue we can worry about that when we work on v2 (maybe we track failures on a separate table to make it more permanent?).

@tomasfarias
Copy link
Contributor

One more thing to consider: A job's attempt is bumped when the job is dequeued, not before, so we should probably look at jobs with attempt > 0.

@tomasfarias
Copy link
Contributor

For 1: I implemented support for retrying jobs to a separate queue here: PostHog/rusty-hook#20

@tiina303
Copy link

tiina303 commented Jan 2, 2024

To make sure I understand the world correctly, here are my assumptions:

  • if a target (e.g. Slack) is failing for a single specific endpoint, it might work fine for other endpoints, so we can decrease the priority for that single endpoint, but shouldn't impact others.
  • if a target (e.g. Slack) is slow for a single specific endpoint, we can assume it's slow for other endpoints too. Note that if a specific endpoint fails fast, then it's shouldn't impact other endpoints. Think of a popular destination - we wouldn't want bad actors using that target to cause others to be slowed down as long as the target itself is good at returning fast on bad entries. Furthermore it doesn't matter if the target succeeds or fails if it is slow we want to put it later in the queue. If a single fast endpoint continuously fails and we're still billing for it, but we can also bump its priority lower.

Based on that I'd propose we process things in this order:

  1. 1st attempt for fast targets, (optionally 1a: for successful endpoints 1b: failing endpoints)
  2. 1st attempt for slow targets (optionally 2a: for successful endpoints 2b: failing endpoints)
  3. 2nd attempt
  4. 3rd attempt
  5. ...

We can have a background process (janitor) add slow targets to a cache / table.

ORDER BY
  CASE
    WHEN attempt > 1 THEN attempt
    WHEN target IN ('cached_slow_targets') THEN 1
    ELSE 0
  END,

Rollout considerations: we can initially roll this out with nothing populating 'cached_slow_targets', so we'll only prioritise based on attempts. This is already better than the existing system, where we just don't have retries at all, so we can start rolling this out on the plugin server side while in parallel building the 'cached_slow_targets' population part.

@bretthoerner
Copy link
Collaborator Author

bretthoerner commented Jan 2, 2024

So we define bad targets as targets with more than one attempt (we can ignore errors as, like you said, if you have more than 1 attempt, you failed with a retryable error):

SELECT target, count(*)
FROM job_queue
WHERE attempt > 1
AND <timeout error or connection error or http 5XX in errors>
-- edit: I don't think we even need to check errors? Having an attempt > 1 means it hit a retryable error!
ORDER BY count(*) DESC;

Once we know who the bad targets are, we pass them to dequeue:

ORDER BY
  CASE
    WHEN target IN ('cached_bad_target_1', 'cached_bad_target_2') THEN 1
    ELSE 0
  END,
  attempt,
  id

Why don't we just sort on attempt, which is what defines a bad target?:

ORDER BY
  attempt,
  id

Given that bad targets are a function of attempt, can't we can inline the function and sort ASC on attempt?

I agree with the general approach, just wondering if we can skip the first query entirely (as we have the number of attempts in the job table already).

The downside of this simplification is that we don't know what the bad targets are, so the v2 goal would need some extra work, but I would argue we can worry about that when we work on v2 (maybe we track failures on a separate table to make it more permanent?).

If you don't do the 1st aggregating query and you only dequeue jobs by sorting by attempt (on an individual row level), then a bad destination will still get eagerly tried once for every job, right?

That's the "what if a popular destination is slow" example I gave, which I think is what we need to protect against.

@bretthoerner
Copy link
Collaborator Author

bretthoerner commented Jan 2, 2024

To make sure I understand the world correctly, here are my assumptions:

  • if a target (e.g. Slack) is failing for a single specific endpoint, it might work fine for other endpoints, so we can decrease the priority for that single endpoint, but shouldn't impact others.
  • if a target (e.g. Slack) is slow for a single specific endpoint, we can assume it's slow for other endpoints too. Note that if a specific endpoint fails fast, then it's shouldn't impact other endpoints. Think of a popular destination - we wouldn't want bad actors using that target to cause others to be slowed down as long as the target itself is good at returning fast on bad entries. Furthermore it doesn't matter if the target succeeds or fails if it is slow we want to put it later in the queue. If a single fast endpoint continuously fails and we're still billing for it, but we can also bump its priority lower.

Yeah, I mostly agree. Some small tweaks:

  • I think for v1 we are calling target the hostname in the URL, we can make it more elaborate later but I think we do gain a lot from aggregating stats together. We don't want to only track errors for every single individual Slack endpoint (i.e. per user). And automagically figuring out what part of the URL is as a special endpoint and what part is user-specific seems... hard.
  • An important piece of the above is we only consider retryable errors as failures for what we're discussing here. A 4XX shouldn't effect anything, so bad actors are pretty limited here.

Based on that I'd propose we process things in this order:

  1. 1st attempt for fast targets, (optionally 1a: for successful endpoints 1b: failing endpoints)
  2. 1st attempt for slow targets (optionally 2a: for successful endpoints 2b: failing endpoints)
  3. 2nd attempt
  4. 3rd attempt
  5. ...

We can have a background process (janitor) add slow targets to a cache / table.

ORDER BY
  CASE
    WHEN attempt > 1 THEN attempt
    WHEN target IN ('cached_slow_targets') THEN 1
    ELSE 0
  END,

Having something smart in charge of telling others who to delay would definitely work. That said, having to store, update, and pass around centralized knowledge is what I was hoping to avoid by my simple/dumb "stateless" query.

Rollout considerations: we can initially roll this out with nothing populating 'cached_slow_targets', so we'll only prioritise based on attempts. This is already better than the existing system, where we just don't have retries at all, so we can start rolling this out on the plugin server side while in parallel building the 'cached_slow_targets' population part.

Well, we can easily sort by attempts and ship this and work on this later, I agree. But as noted above I feel like only handling errors/retries on a per row/event basis makes us totally weak against the most common failure scenario, and the one we built this project to avoid (AFAIK). That is, one slow backend can choke up the whole pipeline just on 1st attempts alone. It feels like we're using PG in a way we could have used Kafka.

But, just to repeat: sorting by attempt and working on this later is totally fine with me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants