New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
federation: parallel sending per instance #4623
base: main
Are you sure you want to change the base?
Conversation
crates/federate/src/worker.rs
Outdated
let domain = self.instance.domain.clone(); | ||
tokio::spawn(async move { | ||
let mut report = report; | ||
if let Err(e) = InstanceWorker::send_retry_loop( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Store this in a variable first to make it more readable.
) -> Result<()> { | ||
let inbox_urls = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_debug!(!inbox_urls.is_empty())
crates/federate/src/worker.rs
Outdated
activity: &SentActivity, | ||
object: &SharedInboxActivities, | ||
inbox_urls: Vec<Url>, | ||
report: &mut UnboundedSender<SendActivityResult>, | ||
initial_fail_count: i32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would make more sense to use a single shared AtomicI32 for fail count, instead of passing it back and forth like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not passed back and forth, it's only passed one way, from the worker to the send task. the issue with not passing it in explicitly would be that then the send_retry_loop function would access self
, and thus the whole InstanceWorker struct would have to be Send+Sync, which means wrapping everything in locks.
That's why i create those local variables before the lambda calling this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you can share around an Arc<AtomicI32>
instead of self
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point is we don't need any atomics, so adding atomics without any real reason is bad. this is a read-only immutable variable that is passed only one way, from the main thread to each send task so it knows if the request fails how much to sleep initially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just realizing you're referring to the channel send when you are talking about the "passing it back".
I see that might be an option. But I don't like it. The send task is cleanly separated and has no direct interaction with the main thread. It's nicely compartmentalized and you can now purely look at the send.rs file to understand how the send-retry-loop of one activity works. It purely sends notifications upwards, it never receives updates downwards after it is started.
The failure event needs to be transferred to the main task in any case so it can update the database. The main fail_count should only be updated under certain conditions and these conditions belong in the main task, since the logic that updates the database needs the same information.
In addition, if we changed this to an atomic for the fails then we should also change the "success" sends to direct writes into a RwLock instead of writing to a channel. Otherwise there's a weird mix where some data is sent via channel and some via atomics. But changing that would mean you would again have to figure out how to signal the main task when to continue and again make all interactions more complex to understand since they can happen with arbitrary concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, it seems a bit weird at first but makes sense with your explanation. Would be good if you can copy that into a code comment on SendActivityResult
.
crates/federate/src/worker.rs
Outdated
let Some(actor_apub_id) = &activity.actor_apub_id else { | ||
return Ok(()); // activity was inserted before persistent queue was activated | ||
return Err(anyhow::anyhow!("activity is from before lemmy 0.19")); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This column is showing for me as not null in sql, seems like we can also make it non-optional in Rust and remove this check.
\d sent_activity
Table "public.sent_activity"
Column | Type | Collation | Nullable | Default
-----------------------------+--------------------------+-----------+----------+-------------------------------------------
id | bigint | | not null | nextval('sent_activity_id_seq'::regclass)
ap_id | text | | not null |
data | json | | not null |
sensitive | boolean | | not null |
published | timestamp with time zone | | not null | now()
send_inboxes | text[] | | not null |
send_community_followers_of | integer | | |
send_all_instances | boolean | | not null |
actor_type | actor_type_enum | | not null |
actor_apub_id | text | | not null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently not, on lemmy.ml it is nullable. And after wiping the local db and setting it up again, its also nullable now. Anyway you can add a migration to make it not null with this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its nullable for me also, you might just need to do diesel migration run
// which can only happen by an event sent into the channel | ||
self | ||
.handle_send_results(&mut receive_send_result, &mut successfuls, &mut in_flight) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would launch a separate background task for this instead of manually checking here. Can also potentially be moved into a separate struct/file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, that's what I originally wanted to do, but the problem then is that you need some way for the "read activities loop" to pause and unpause depending on the state of the results handling tasks. That would require either another arbitrary sleep() or another channel for sending around values.
I intentionally put that inline in here because then the "read activities" loop knows exactly when to continue and nothing need any thread synchronization. Nothing in this struct needs atomics or locks because there's only a single task accessing anything, the only thing running in parallel is the actual sends.
I agree that passing muts to local variables is pretty... weird, I originally just had this part inlined, i moved it out so it's easier to grok the whole loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recv_many blocks automatically if there are no items. Then you can also sleep for WORK_FINISHED_RECHECK_DELAY
if there are less than 4 items or so. Anyway UnboundedReceiver works across threads so I dont see why it would need any locks.
You should also be able to handle print_stats()
from the same receiver, I dont see any reason why it should need a separate channel. Additionally it should be possible to use a single receiver worker to write data for all instances, no need to do it separately per instance.
Some of this may be too much effort to be worth implementing now, but at least mention the possibilities in a comment for future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recv_many blocks automatically if there are no items
Yes but the problem is that you're saying you want to have the recv be in a separate thread from the thread that queues new sends. So then how would the sending-queuer know when it's supposed to send new items? It's inline in here exactly because it blocks if there's no items and thus pauses the send loop for the exactly correct time until a new item should be sent. The alternatives are hacky and would require more arbitrarily chosen delays. And this event is expected to happen 10+ times per second per instance so waiting would basically be a busy loop (sleep 10ms, recheck, sleep 10ms, recheck, ...)
I don't really see how this would be a possibility, it just seems all around worse to me to move this to a separate thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. What about using only a single unbounded_channel
for activity send completion and print_stats()
?
tracing::debug!("{}: {:?} no inboxes", self.instance.domain, activity.id); | ||
report.send(SendActivityResult::Success(SendSuccessInfo { | ||
activity_id, | ||
published: Some(activity.published), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be None as it didnt actually connect to the instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, this value is shown in that status page I wrote ( https://phiresky.github.io/lemmy-federation-state/ ) as well as one of those grafana charts https://grafana.lem.rocks/d/edf3gjrxxnocgd/federation-health-activities-behind?orgId=1&var-instance=All&var-remote_instance=lemdro.id&var-remote_software=All&from=now-24h&to=now
The behaviour you want is how it currently works, but it can be a bit misleading because if you look at / chart the published time for federation from a large to a small instance that's only subscribed to a few small communities, then it will show the last published time as a days ago even though federation is up to date.
I don't have a strong opinion one way or another though, so let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah Im not sure either. Maybe just leave a coment that it may be changed to None.
936469a
to
5e986ef
Compare
I've split the instance worker into three separate files:
|
@@ -0,0 +1,149 @@ | |||
use crate::util::LEMMY_TEST_FAST_FEDERATION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a note, i've not made any changes to this code, just moved it into a separate struct
/// The first time some user on an instance follows a specific remote community (or, more precisely: the first time a (followed_community_id, follower_inbox_url) tuple appears), | ||
/// this delay limits the maximum time until the follow actually results in activities from that community id being sent to that inbox url. | ||
/// This delay currently needs to not be too small because the DB load is currently fairly high because of the current structure of storing inboxes for every person, not having a separate list of shared_inboxes, and the architecture of having every instance queue be fully separate. | ||
/// (see https://github.com/LemmyNet/lemmy/issues/3958) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to stay within 100 columns ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great if we had an auto formatter for doc comments, I always struggle with this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can set comment_width = 100 in .rustfmt.toml
last_fetch: DateTime<Utc>, | ||
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> { | ||
let new_last_fetch = | ||
Utc::now() - chrono::TimeDelta::try_seconds(10).expect("TimeDelta out of bounds"); // update to time before fetch to ensure overlap. subtract 10s to ensure overlap even if published date is not exact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again keep the line shorter, put the comment separately.
.state | ||
.last_successful_id | ||
.map(|e| e.0) | ||
.expect("set above"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this set? Better to store it in a local var, we dont want to crash the process even if its very unlikely.
As mentioned in the dev chat it would be very useful to have some unit tests here, to ensure it works as expected. |
Any ideas on how best to do that? The only clean way I can think of would be abstracting all DB and HTTP interactions (I guess it would be like 5-10 functions?) into a trait so the whole federate crate code is pure, and then mocking the DB interactions and HTTP interactions with data from memory. |
Mocking would be too complicated and could introduce problems of its own. Look at how other tests are implemented, eg for db views. Basically write some test data to the db, then call functions and see if they behave as expected. You can start a local server with inbox route to check that activities are received (with an instance like |
fn cmp(&self, other: &Self) -> std::cmp::Ordering { | ||
other.activity_id.cmp(&self.activity_id) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These impls cause backwards ordering because of other
being put on the left. If it's intentional, then add a comment that explains this.
Also, something that may or may not be a problem: the impls are technically incorrect because they ignore fields other than activity_id, which does not match the derived PartialEq impl. For example, this requirement from PartialOrd docs is not followed:
a == b
if and only ifpartial_cmp(a, b) == Some(Equal)
.
pub published: Option<DateTime<Utc>>, | ||
pub was_skipped: bool, | ||
} | ||
// need to be able to order them for the binary heap in the worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// need to be able to order them for the binary heap in the worker | |
// order backwards by activity_id for the binary heap in the worker | |
impl PartialEq for SendSuccessInfo { | |
fn eq(&self, other: &Self) -> bool { | |
self.activity_id == other.activity_id | |
} | |
} |
This impl matches the PartialOrd impl by ignoring the other fields, which might make BinaryHeap work more correctly
Currently, with the implementation of the federation queue from #3605 that was enabled in 0.19, the federation is parallel across instances but sequential per receiving instance.
This means that the maximum throughput of activities is limited by the network latency between instances as well as the internal latency for processing a single activity.
There is an extensive discussion here: #4529 (comment) Though the issue itself is only about one sub-problem.
This PR changes the federation sending component to send activities in parallel, with a configurable maximum concurrency of 8. The implementation is more complex than I expected since we need to keep track of the last_successful_id (which needs to be the highest activity id where every single lower activity has been successfully sent) and we need to keep track of failure counts without immediately jumping to hour-long retry delays when 8 concurrent sends fail simultaneously.
The implementation roughly works as follows:
In order for the results of this to still be correct, fixes need to be applied to make all activities commutative (as discussed above).
It should be possible to also make the concurrency only happen when necessary since for most instance-instance connections it is not, which would reduce the ordering issue. This is not implemented here though.
Currently, this PR fails the federation tests. I think this is both due to a bug somewhere as well as due to the ordering problem.