Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

federation: parallel sending per instance #4623

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

phiresky
Copy link
Collaborator

Currently, with the implementation of the federation queue from #3605 that was enabled in 0.19, the federation is parallel across instances but sequential per receiving instance.

This means that the maximum throughput of activities is limited by the network latency between instances as well as the internal latency for processing a single activity.

There is an extensive discussion here: #4529 (comment) Though the issue itself is only about one sub-problem.

This PR changes the federation sending component to send activities in parallel, with a configurable maximum concurrency of 8. The implementation is more complex than I expected since we need to keep track of the last_successful_id (which needs to be the highest activity id where every single lower activity has been successfully sent) and we need to keep track of failure counts without immediately jumping to hour-long retry delays when 8 concurrent sends fail simultaneously.

The implementation roughly works as follows:

  1. We have a main loop that waits for new activities and spawns send tasks for them when the conditions are right
  2. We create a multi-producer single-consumer channel, each activity send sends a notification through this channel to the main loop:
    • If successful, the activity id is added to a in-memory priority queue, if the lowest ID in the queue is the previous ID + 1 we increment the last_successful_id counter
    • If failing, the fail_count and last retry is updated. We completely ignore the activity id here and always take the maximum (but not sum) of fail_count from any activity send that is happening

In order for the results of this to still be correct, fixes need to be applied to make all activities commutative (as discussed above).

It should be possible to also make the concurrency only happen when necessary since for most instance-instance connections it is not, which would reduce the ordering issue. This is not implemented here though.

Currently, this PR fails the federation tests. I think this is both due to a bug somewhere as well as due to the ordering problem.

let domain = self.instance.domain.clone();
tokio::spawn(async move {
let mut report = report;
if let Err(e) = InstanceWorker::send_retry_loop(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store this in a variable first to make it more readable.

) -> Result<()> {
let inbox_urls = self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_debug!(!inbox_urls.is_empty())

activity: &SentActivity,
object: &SharedInboxActivities,
inbox_urls: Vec<Url>,
report: &mut UnboundedSender<SendActivityResult>,
initial_fail_count: i32,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would make more sense to use a single shared AtomicI32 for fail count, instead of passing it back and forth like this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not passed back and forth, it's only passed one way, from the worker to the send task. the issue with not passing it in explicitly would be that then the send_retry_loop function would access self, and thus the whole InstanceWorker struct would have to be Send+Sync, which means wrapping everything in locks.

That's why i create those local variables before the lambda calling this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you can share around an Arc<AtomicI32> instead of self.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is we don't need any atomics, so adding atomics without any real reason is bad. this is a read-only immutable variable that is passed only one way, from the main thread to each send task so it knows if the request fails how much to sleep initially.

Copy link
Collaborator Author

@phiresky phiresky Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realizing you're referring to the channel send when you are talking about the "passing it back".

I see that might be an option. But I don't like it. The send task is cleanly separated and has no direct interaction with the main thread. It's nicely compartmentalized and you can now purely look at the send.rs file to understand how the send-retry-loop of one activity works. It purely sends notifications upwards, it never receives updates downwards after it is started.

The failure event needs to be transferred to the main task in any case so it can update the database. The main fail_count should only be updated under certain conditions and these conditions belong in the main task, since the logic that updates the database needs the same information.

In addition, if we changed this to an atomic for the fails then we should also change the "success" sends to direct writes into a RwLock instead of writing to a channel. Otherwise there's a weird mix where some data is sent via channel and some via atomics. But changing that would mean you would again have to figure out how to signal the main task when to continue and again make all interactions more complex to understand since they can happen with arbitrary concurrency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, it seems a bit weird at first but makes sense with your explanation. Would be good if you can copy that into a code comment on SendActivityResult.

let Some(actor_apub_id) = &activity.actor_apub_id else {
return Ok(()); // activity was inserted before persistent queue was activated
return Err(anyhow::anyhow!("activity is from before lemmy 0.19"));
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This column is showing for me as not null in sql, seems like we can also make it non-optional in Rust and remove this check.

\d sent_activity
Table "public.sent_activity"
Column            |           Type           | Collation | Nullable |                  Default
-----------------------------+--------------------------+-----------+----------+-------------------------------------------
id                          | bigint                   |           | not null | nextval('sent_activity_id_seq'::regclass)
ap_id                       | text                     |           | not null |
data                        | json                     |           | not null |
sensitive                   | boolean                  |           | not null |
published                   | timestamp with time zone |           | not null | now()
send_inboxes                | text[]                   |           | not null |
send_community_followers_of | integer                  |           |          |
send_all_instances          | boolean                  |           | not null |
actor_type                  | actor_type_enum          |           | not null |
actor_apub_id               | text                     |           | not null |

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh, that's weird. the schema file in the repo definitely still has it nullable:
image

the database you're looking at is clean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently not, on lemmy.ml it is nullable. And after wiping the local db and setting it up again, its also nullable now. Anyway you can add a migration to make it not null with this pr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its nullable for me also, you might just need to do diesel migration run

crates/federate/src/worker.rs Outdated Show resolved Hide resolved
// which can only happen by an event sent into the channel
self
.handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight)
.await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would launch a separate background task for this instead of manually checking here. Can also potentially be moved into a separate struct/file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, that's what I originally wanted to do, but the problem then is that you need some way for the "read activities loop" to pause and unpause depending on the state of the results handling tasks. That would require either another arbitrary sleep() or another channel for sending around values.

I intentionally put that inline in here because then the "read activities" loop knows exactly when to continue and nothing need any thread synchronization. Nothing in this struct needs atomics or locks because there's only a single task accessing anything, the only thing running in parallel is the actual sends.

I agree that passing muts to local variables is pretty... weird, I originally just had this part inlined, i moved it out so it's easier to grok the whole loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recv_many blocks automatically if there are no items. Then you can also sleep for WORK_FINISHED_RECHECK_DELAY if there are less than 4 items or so. Anyway UnboundedReceiver works across threads so I dont see why it would need any locks.

You should also be able to handle print_stats() from the same receiver, I dont see any reason why it should need a separate channel. Additionally it should be possible to use a single receiver worker to write data for all instances, no need to do it separately per instance.

Some of this may be too much effort to be worth implementing now, but at least mention the possibilities in a comment for future work.

Copy link
Collaborator Author

@phiresky phiresky Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recv_many blocks automatically if there are no items

Yes but the problem is that you're saying you want to have the recv be in a separate thread from the thread that queues new sends. So then how would the sending-queuer know when it's supposed to send new items? It's inline in here exactly because it blocks if there's no items and thus pauses the send loop for the exactly correct time until a new item should be sent. The alternatives are hacky and would require more arbitrarily chosen delays. And this event is expected to happen 10+ times per second per instance so waiting would basically be a busy loop (sleep 10ms, recheck, sleep 10ms, recheck, ...)

I don't really see how this would be a possibility, it just seems all around worse to me to move this to a separate thread

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. What about using only a single unbounded_channel for activity send completion and print_stats()?

crates/federate/src/worker.rs Outdated Show resolved Hide resolved
crates/federate/src/worker.rs Outdated Show resolved Hide resolved
tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id);
report.send(SendActivityResult::Success(SendSuccessInfo {
activity_id,
published: Some(activity.published),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be None as it didnt actually connect to the instance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, this value is shown in that status page I wrote ( https://phiresky.github.io/lemmy-federation-state/ ) as well as one of those grafana charts https://grafana.lem.rocks/d/edf3gjrxxnocgd/federation-health-activities-behind?orgId=1&var-instance=All&var-remote_instance=lemdro.id&var-remote_software=All&from=now-24h&to=now

The behaviour you want is how it currently works, but it can be a bit misleading because if you look at / chart the published time for federation from a large to a small instance that's only subscribed to a few small communities, then it will show the last published time as a days ago even though federation is up to date.

I don't have a strong opinion one way or another though, so let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah Im not sure either. Maybe just leave a coment that it may be changed to None.

crates/federate/src/worker.rs Outdated Show resolved Hide resolved
@phiresky
Copy link
Collaborator Author

I've split the instance worker into three separate files:

  • worker.rs - main code
  • inboxes.rs - code that makes sure the community lists are up to date and calculates the inboxes
  • send.rs - a single send-retry-loop task that runs concurrently

@@ -0,0 +1,149 @@
use crate::util::LEMMY_TEST_FAST_FEDERATION;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a note, i've not made any changes to this code, just moved it into a separate struct

/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears),
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url.
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate.
/// (see https://github.com/LemmyNet/lemmy/issues/3958)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to stay within 100 columns ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great if we had an auto formatter for doc comments, I always struggle with this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can set comment_width = 100 in .rustfmt.toml

last_fetch: DateTime<Utc>,
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
let new_last_fetch =
Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again keep the line shorter, put the comment separately.

.state
.last_successful_id
.map(|e| e.0)
.expect("set above");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this set? Better to store it in a local var, we dont want to crash the process even if its very unlikely.

@Nutomic
Copy link
Member

Nutomic commented Apr 16, 2024

As mentioned in the dev chat it would be very useful to have some unit tests here, to ensure it works as expected.

@phiresky
Copy link
Collaborator Author

As mentioned in the dev chat it would be very useful to have some unit tests here, to ensure it works as expected.

Any ideas on how best to do that? The only clean way I can think of would be abstracting all DB and HTTP interactions (I guess it would be like 5-10 functions?) into a trait so the whole federate crate code is pure, and then mocking the DB interactions and HTTP interactions with data from memory.

@Nutomic
Copy link
Member

Nutomic commented Apr 17, 2024

Mocking would be too complicated and could introduce problems of its own. Look at how other tests are implemented, eg for db views. Basically write some test data to the db, then call functions and see if they behave as expected. You can start a local server with inbox route to check that activities are received (with an instance like localhost:8123). For start_stop_federation_workers() there should be some way to check the content of workers variable, eg by moving it into a struct and moving the function into impl block.

fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.activity_id.cmp(&self.activity_id)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These impls cause backwards ordering because of other being put on the left. If it's intentional, then add a comment that explains this.

Also, something that may or may not be a problem: the impls are technically incorrect because they ignore fields other than activity_id, which does not match the derived PartialEq impl. For example, this requirement from PartialOrd docs is not followed:

a == b if and only if partial_cmp(a, b) == Some(Equal).

@dessalines dessalines added this to the 0.19.5 milestone Apr 29, 2024
pub published: Option<DateTime<Utc>>,
pub was_skipped: bool,
}
// need to be able to order them for the binary heap in the worker
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// need to be able to order them for the binary heap in the worker
// order backwards by activity_id for the binary heap in the worker
impl PartialEq for SendSuccessInfo {
fn eq(&self, other: &Self) -> bool {
self.activity_id == other.activity_id
}
}

This impl matches the PartialOrd impl by ignoring the other fields, which might make BinaryHeap work more correctly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants