Skip to content

Commit

Permalink
Support very large channel membership lists (#11939)
Browse files Browse the repository at this point in the history
Fixes the channel membership dialogue for the zed channel by not
downloading all 111k people in one go.

Release Notes:

- N/A
  • Loading branch information
ConradIrwin committed May 17, 2024
1 parent df3bd40 commit 57b5bff
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 261 deletions.
39 changes: 20 additions & 19 deletions crates/channel/src/channel_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Channel {
}
}

#[derive(Debug)]
pub struct ChannelMembership {
pub user: Arc<User>,
pub kind: proto::channel_member::Kind,
Expand Down Expand Up @@ -815,9 +816,11 @@ impl ChannelStore {
Ok(())
})
}
pub fn get_channel_member_details(
pub fn fuzzy_search_members(
&self,
channel_id: ChannelId,
query: String,
limit: u16,
cx: &mut ModelContext<Self>,
) -> Task<Result<Vec<ChannelMembership>>> {
let client = self.client.clone();
Expand All @@ -826,26 +829,24 @@ impl ChannelStore {
let response = client
.request(proto::GetChannelMembers {
channel_id: channel_id.0,
query,
limit: limit as u64,
})
.await?;

let user_ids = response.members.iter().map(|m| m.user_id).collect();
let user_store = user_store
.upgrade()
.ok_or_else(|| anyhow!("user store dropped"))?;
let users = user_store
.update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
.await?;

Ok(users
.into_iter()
.zip(response.members)
.map(|(user, member)| ChannelMembership {
user,
role: member.role(),
kind: member.kind(),
})
.collect())
user_store.update(&mut cx, |user_store, _| {
user_store.insert(response.users);
response
.members
.into_iter()
.filter_map(|member| {
Some(ChannelMembership {
user: user_store.get_cached_user(member.user_id)?,
role: member.role(),
kind: member.kind(),
})
})
.collect()
})
})
}

Expand Down
2 changes: 1 addition & 1 deletion crates/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ lazy_static! {
}

pub const INITIAL_RECONNECTION_DELAY: Duration = Duration::from_millis(100);
pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(20);

actions!(client, [SignIn, SignOut, Reconnect]);

Expand Down
33 changes: 18 additions & 15 deletions crates/client/src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,28 +670,31 @@ impl UserStore {
cx.spawn(|this, mut cx| async move {
if let Some(rpc) = client.upgrade() {
let response = rpc.request(request).await.context("error loading users")?;
let users = response
.users
.into_iter()
.map(User::new)
.collect::<Vec<_>>();

this.update(&mut cx, |this, _| {
for user in &users {
this.users.insert(user.id, user.clone());
this.by_github_login
.insert(user.github_login.clone(), user.id);
}
})
.ok();
let users = response.users;

Ok(users)
this.update(&mut cx, |this, _| this.insert(users))
} else {
Ok(Vec::new())
}
})
}

pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
let mut ret = Vec::with_capacity(users.len());
for user in users {
let user = User::new(user);
if let Some(old) = self.users.insert(user.id, user.clone()) {
if old.github_login != user.github_login {
self.by_github_login.remove(&old.github_login);
}
}
self.by_github_login
.insert(user.github_login.clone(), user.id);
ret.push(user)
}
ret
}

pub fn set_participant_indices(
&mut self,
participant_indices: HashMap<u64, ParticipantIndex>,
Expand Down
3 changes: 1 addition & 2 deletions crates/collab/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,7 @@ pub type NotificationBatch = Vec<(UserId, proto::Notification)>;

pub struct CreatedChannelMessage {
pub message_id: MessageId,
pub participant_connection_ids: Vec<ConnectionId>,
pub channel_members: Vec<UserId>,
pub participant_connection_ids: HashSet<ConnectionId>,
pub notifications: NotificationBatch,
}

Expand Down
21 changes: 4 additions & 17 deletions crates/collab/src/db/queries/buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,7 @@ impl Database {
channel_id: ChannelId,
user: UserId,
operations: &[proto::Operation],
) -> Result<(
Vec<ConnectionId>,
Vec<UserId>,
i32,
Vec<proto::VectorClockEntry>,
)> {
) -> Result<(HashSet<ConnectionId>, i32, Vec<proto::VectorClockEntry>)> {
self.transaction(move |tx| async move {
let channel = self.get_channel_internal(channel_id, &tx).await?;

Expand Down Expand Up @@ -479,7 +474,6 @@ impl Database {
.filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
.collect::<Vec<_>>();

let mut channel_members;
let max_version;

if !operations.is_empty() {
Expand All @@ -504,12 +498,6 @@ impl Database {
)
.await?;

channel_members = self.get_channel_participants(&channel, &tx).await?;
let collaborators = self
.get_channel_buffer_collaborators_internal(channel_id, &tx)
.await?;
channel_members.retain(|member| !collaborators.contains(member));

buffer_operation::Entity::insert_many(operations)
.on_conflict(
OnConflict::columns([
Expand All @@ -524,11 +512,10 @@ impl Database {
.exec(&*tx)
.await?;
} else {
channel_members = Vec::new();
max_version = Vec::new();
}

let mut connections = Vec::new();
let mut connections = HashSet::default();
let mut rows = channel_buffer_collaborator::Entity::find()
.filter(
Condition::all()
Expand All @@ -538,13 +525,13 @@ impl Database {
.await?;
while let Some(row) = rows.next().await {
let row = row?;
connections.push(ConnectionId {
connections.insert(ConnectionId {
id: row.connection_id as u32,
owner_id: row.connection_server_id.0 as u32,
});
}

Ok((connections, channel_members, buffer.epoch, max_version))
Ok((connections, buffer.epoch, max_version))
})
.await
}
Expand Down
114 changes: 55 additions & 59 deletions crates/collab/src/db/queries/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use rpc::{
proto::{channel_member::Kind, ChannelBufferVersion, VectorClockEntry},
ErrorCode, ErrorCodeExt,
};
use sea_orm::TryGetableMany;
use sea_orm::{DbBackend, TryGetableMany};

impl Database {
#[cfg(test)]
Expand Down Expand Up @@ -700,77 +700,73 @@ impl Database {
pub async fn get_channel_participant_details(
&self,
channel_id: ChannelId,
filter: &str,
limit: u64,
user_id: UserId,
) -> Result<Vec<proto::ChannelMember>> {
let (role, members) = self
) -> Result<(Vec<proto::ChannelMember>, Vec<proto::User>)> {
let members = self
.transaction(move |tx| async move {
let channel = self.get_channel_internal(channel_id, &tx).await?;
let role = self
.check_user_is_channel_participant(&channel, user_id, &tx)
self.check_user_is_channel_participant(&channel, user_id, &tx)
.await?;
Ok((
role,
self.get_channel_participant_details_internal(&channel, &tx)
.await?,
))
let mut query = channel_member::Entity::find()
.find_also_related(user::Entity)
.filter(channel_member::Column::ChannelId.eq(channel.root_id()));

if cfg!(any(test, sqlite)) && self.pool.get_database_backend() == DbBackend::Sqlite {
query = query.filter(Expr::cust_with_values(
"UPPER(github_login) LIKE ?",
[Self::fuzzy_like_string(&filter.to_uppercase())],
))
} else {
query = query.filter(Expr::cust_with_values(
"github_login ILIKE $1",
[Self::fuzzy_like_string(filter)],
))
}
let members = query.order_by(
Expr::cust(
"not role = 'admin', not role = 'member', not role = 'guest', not accepted, github_login",
),
sea_orm::Order::Asc,
)
.limit(limit)
.all(&*tx)
.await?;

Ok(members)
})
.await?;

if role == ChannelRole::Admin {
Ok(members
.into_iter()
.map(|channel_member| proto::ChannelMember {
role: channel_member.role.into(),
user_id: channel_member.user_id.to_proto(),
kind: if channel_member.accepted {
let mut users: Vec<proto::User> = Vec::with_capacity(members.len());

let members = members
.into_iter()
.map(|(member, user)| {
if let Some(user) = user {
users.push(proto::User {
id: user.id.to_proto(),
avatar_url: format!(
"https://github.com/{}.png?size=128",
user.github_login
),
github_login: user.github_login,
})
}
proto::ChannelMember {
role: member.role.into(),
user_id: member.user_id.to_proto(),
kind: if member.accepted {
Kind::Member
} else {
Kind::Invitee
}
.into(),
})
.collect())
} else {
return Ok(members
.into_iter()
.filter_map(|member| {
if !member.accepted {
return None;
}
Some(proto::ChannelMember {
role: member.role.into(),
user_id: member.user_id.to_proto(),
kind: Kind::Member.into(),
})
})
.collect());
}
}

async fn get_channel_participant_details_internal(
&self,
channel: &channel::Model,
tx: &DatabaseTransaction,
) -> Result<Vec<channel_member::Model>> {
Ok(channel_member::Entity::find()
.filter(channel_member::Column::ChannelId.eq(channel.root_id()))
.all(tx)
.await?)
}
}
})
.collect();

/// Returns the participants in the given channel.
pub async fn get_channel_participants(
&self,
channel: &channel::Model,
tx: &DatabaseTransaction,
) -> Result<Vec<UserId>> {
let participants = self
.get_channel_participant_details_internal(channel, tx)
.await?;
Ok(participants
.into_iter()
.map(|member| member.user_id)
.collect())
Ok((members, users))
}

/// Returns whether the given user is an admin in the specified channel.
Expand Down
8 changes: 2 additions & 6 deletions crates/collab/src/db/queries/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,15 @@ impl Database {
.await?;

let mut is_participant = false;
let mut participant_connection_ids = Vec::new();
let mut participant_connection_ids = HashSet::default();
let mut participant_user_ids = Vec::new();
while let Some(row) = rows.next().await {
let row = row?;
if row.user_id == user_id {
is_participant = true;
}
participant_user_ids.push(row.user_id);
participant_connection_ids.push(row.connection());
participant_connection_ids.insert(row.connection());
}
drop(rows);

Expand Down Expand Up @@ -336,13 +336,9 @@ impl Database {
}
}

let mut channel_members = self.get_channel_participants(&channel, &tx).await?;
channel_members.retain(|member| !participant_user_ids.contains(member));

Ok(CreatedChannelMessage {
message_id,
participant_connection_ids,
channel_members,
notifications,
})
})
Expand Down

0 comments on commit 57b5bff

Please sign in to comment.