Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new participant mutex to VideoRoom #3361

Merged
merged 1 commit into from May 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 42 additions & 4 deletions src/plugins/janus_videoroom.c
Expand Up @@ -2149,6 +2149,7 @@ typedef struct janus_videoroom_publisher {
int udp_sock; /* The udp socket on which to forward rtp packets */
gboolean kicked; /* Whether this participant has been kicked */
gboolean e2ee; /* If media from this publisher is end-to-end encrypted */
janus_mutex mutex; /* Mutex to lock this instance */
volatile gint destroyed;
janus_refcount ref;
} janus_videoroom_publisher;
Expand Down Expand Up @@ -2500,6 +2501,7 @@ static void janus_videoroom_publisher_free(const janus_refcount *p_ref) {

janus_mutex_destroy(&p->subscribers_mutex);
janus_mutex_destroy(&p->rtp_forwarders_mutex);
janus_mutex_destroy(&p->mutex);

/* If this is a dummy publisher, get rid of the session too */
if(p->dummy && p->session)
Expand Down Expand Up @@ -2820,6 +2822,7 @@ static void janus_videoroom_create_dummy_publisher(janus_videoroom *room, GHashT
publisher->rtp_forwarders = g_hash_table_new(NULL, NULL);
publisher->udp_sock = -1;
g_atomic_int_set(&publisher->destroyed, 0);
janus_mutex_init(&publisher->mutex);
janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free);
/* Now we create a separate publisher stream for each supported codec in the room */
janus_videoroom_publisher_stream *ps = NULL;
Expand Down Expand Up @@ -4141,7 +4144,9 @@ static void janus_videoroom_leave_or_unpublish(janus_videoroom_publisher *partic
g_hash_table_remove(participant->room->participants,
string_ids ? (gpointer)participant->user_id_str : (gpointer)&participant->user_id);
g_hash_table_remove(participant->room->private_ids, GUINT_TO_POINTER(participant->pvt_id));
janus_mutex_lock(&participant->mutex);
g_clear_pointer(&participant->room, janus_videoroom_room_dereference);
janus_mutex_unlock(&participant->mutex);
}
janus_mutex_unlock(&room->mutex);
janus_refcount_decrease(&room->ref);
Expand Down Expand Up @@ -5198,7 +5203,9 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
while (g_hash_table_iter_next(&iter, NULL, &value)) {
janus_videoroom_publisher *p = value;
if(p && !g_atomic_int_get(&p->destroyed) && p->session && p->room) {
janus_mutex_lock(&p->mutex);
g_clear_pointer(&p->room, janus_videoroom_room_dereference);
janus_mutex_unlock(&p->mutex);
/* Notify the user we're going to destroy the room... */
int ret = gateway->push_event(p->session->handle, &janus_videoroom_plugin, NULL, destroyed, NULL);
JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret));
Expand Down Expand Up @@ -7410,6 +7417,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi
publisher->rtp_forwarders = g_hash_table_new(NULL, NULL);
publisher->udp_sock = -1;
g_atomic_int_set(&publisher->destroyed, 0);
janus_mutex_init(&publisher->mutex);
janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free);
/* Create publisher streams for all the things that the remote publisher is sending */
janus_videoroom_publisher_stream *ps = NULL;
Expand Down Expand Up @@ -8123,11 +8131,19 @@ void janus_videoroom_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp
janus_videoroom_incoming_rtp_internal(session, participant, pkt);
}
static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *session, janus_videoroom_publisher *participant, janus_plugin_rtp *pkt) {
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) {
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams) {
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_mutex_lock(&participant->mutex);
janus_videoroom *videoroom = participant->room;
if(videoroom == NULL) {
janus_mutex_unlock(&participant->mutex);
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_refcount_increase_nodebug(&videoroom->ref);
janus_mutex_unlock(&participant->mutex);

/* Find the stream this packet belongs to */
janus_mutex_lock(&participant->streams_mutex);
Expand All @@ -8140,6 +8156,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
if(ps != NULL)
janus_refcount_decrease_nodebug(&ps->ref);
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}

Expand Down Expand Up @@ -8284,6 +8301,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
char *payload = janus_rtp_payload(buf, len, &plen);
if(payload == NULL) {
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}
if(ps->vcodec == JANUS_VIDEOCODEC_VP9) {
Expand Down Expand Up @@ -8354,6 +8372,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
char *payload = janus_rtp_payload(buf, len, &plen);
if(payload == NULL) {
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}
if(ps->vcodec == JANUS_VIDEOCODEC_VP8) {
Expand Down Expand Up @@ -8381,6 +8400,7 @@ static void janus_videoroom_incoming_rtp_internal(janus_videoroom_session *sessi
}
janus_refcount_decrease_nodebug(&ps->ref);
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
}

void janus_videoroom_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet) {
Expand Down Expand Up @@ -8453,12 +8473,22 @@ static void janus_videoroom_incoming_data_internal(janus_videoroom_session *sess
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams || participant->room == NULL) {
if(g_atomic_int_get(&participant->destroyed) || participant->kicked || !participant->streams) {
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_mutex_lock(&participant->mutex);
janus_videoroom *videoroom = participant->room;
if(videoroom == NULL) {
janus_mutex_unlock(&participant->mutex);
janus_videoroom_publisher_dereference_nodebug(participant);
return;
}
janus_refcount_increase_nodebug(&videoroom->ref);
janus_mutex_unlock(&participant->mutex);
if(g_atomic_int_get(&participant->destroyed) || participant->data_mindex < 0 || !participant->streams || participant->kicked) {
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
return;
}
char *buf = packet->buffer;
Expand Down Expand Up @@ -8530,14 +8560,15 @@ static void janus_videoroom_incoming_data_internal(janus_videoroom_session *sess
pkt.is_rtp = FALSE;
pkt.textdata = !packet->binary;
janus_mutex_lock_nodebug(&ps->subscribers_mutex);
if(participant->room->helper_threads > 0) {
g_list_foreach(participant->room->threads, janus_videoroom_helper_rtpdata_packet, &pkt);
if(videoroom->helper_threads > 0) {
g_list_foreach(videoroom->threads, janus_videoroom_helper_rtpdata_packet, &pkt);
} else {
g_slist_foreach(ps->subscribers, janus_videoroom_relay_data_packet, &pkt);
}
janus_mutex_unlock_nodebug(&ps->subscribers_mutex);
janus_refcount_decrease_nodebug(&ps->ref);
janus_videoroom_publisher_dereference_nodebug(participant);
janus_refcount_decrease_nodebug(&videoroom->ref);
}

void janus_videoroom_data_ready(janus_plugin_session *handle) {
Expand Down Expand Up @@ -8794,7 +8825,11 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) {
g_list_free_full(mappings, (GDestroyNotify)g_free);
}
/* Any subscriber session to update? */
janus_mutex_lock(&participant->mutex);
janus_videoroom *room = participant->room;
if(room)
janus_refcount_increase_nodebug(&room->ref);
janus_mutex_unlock(&participant->mutex);
if(subscribers != NULL) {
temp = subscribers;
while(temp) {
Expand Down Expand Up @@ -8858,6 +8893,8 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) {
janus_mutex_unlock(&participant->streams_mutex);
janus_videoroom_leave_or_unpublish(participant, FALSE, FALSE);
janus_refcount_decrease(&participant->ref);
if(room)
janus_refcount_decrease_nodebug(&room->ref);
} else if(session->participant_type == janus_videoroom_p_type_subscriber) {
/* Get rid of subscriber */
janus_videoroom_subscriber *subscriber = janus_videoroom_session_get_subscriber(session);
Expand Down Expand Up @@ -9192,6 +9229,7 @@ static void *janus_videoroom_handler(void *data) {
}
}
g_atomic_int_set(&publisher->destroyed, 0);
janus_mutex_init(&publisher->mutex);
janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free);
/* In case we also wanted to configure */
if(audiocodec && json_string_value(json_object_get(msg->jsep, "sdp")) != NULL) {
Expand Down