Skip to content

Commit

Permalink
Perform audio-level detection in AudioBridge participant thread (#3312)
Browse files Browse the repository at this point in the history
  • Loading branch information
lminiero committed Jan 9, 2024
1 parent 26e1f62 commit e78232e
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 83 deletions.
173 changes: 91 additions & 82 deletions src/plugins/janus_audiobridge.c
Expand Up @@ -1729,30 +1729,28 @@ typedef struct janus_audiobridge_rtp_relay_packet {
/* Buffered audio/video packet */
typedef struct janus_audiobridge_buffer_packet {
/* Pointer to the packet data, if RTP */
char *buffer;
/* Size of the packet */
int len;
/* Whether the packet contains silence, according to the RTP extension */
gboolean silence;
janus_plugin_rtp *rtp;
/* Monotonic insert time */
int64_t inserted;
} janus_audiobridge_buffer_packet;
static janus_audiobridge_buffer_packet *janus_audiobridge_buffer_packet_create(char *buffer, int len, gboolean silence) {
static janus_audiobridge_buffer_packet *janus_audiobridge_buffer_packet_create(janus_plugin_rtp *rtp) {
janus_audiobridge_buffer_packet *pkt = g_malloc(sizeof(janus_audiobridge_buffer_packet));
pkt->buffer = g_malloc(len);
pkt->len = len;
pkt->silence = silence;
memcpy(pkt->buffer, buffer, len);
pkt->rtp = janus_plugin_rtp_duplicate(rtp);
pkt->inserted = janus_get_monotonic_time();
return pkt;
}
static void janus_audiobridge_buffer_packet_destroy(janus_audiobridge_buffer_packet *pkt) {
if(!pkt)
return;
g_free(pkt->buffer);
if(pkt->rtp)
g_free(pkt->rtp->buffer);
g_free(pkt->rtp);
g_free(pkt);
}

static void janus_audiobridge_participant_istalking(janus_audiobridge_session *session,
janus_audiobridge_participant *participant, janus_plugin_rtp *packet, gboolean *silence);

static void janus_audiobridge_participant_clear_jitter_buffer(janus_audiobridge_participant *participant) {
if(participant->jitter) {
jitter_buffer_reset(participant->jitter);
Expand Down Expand Up @@ -5891,74 +5889,9 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r
rtp->type, participant->codec == JANUS_AUDIOCODEC_PCMA ? 8 : 0);
return;
}
/* Check the audio levels, in case we need to notify participants about who's talking */
gboolean silence = FALSE;
if(participant->extmap_id > 0) {
/* Check the audio levels, in case we need to notify participants about who's talking */
int level = packet->extensions.audio_level;
if(level != -1) {
/* Is this silence? */
silence = (level == 127);
if(participant->room && participant->room->audiolevel_event) {
/* We also need to detect who's talking: update our monitoring stuff */
int audio_active_packets = participant->room ? participant->room->audio_active_packets : 100;
int audio_level_average = participant->room ? participant->room->audio_level_average : 25;
/* Check if we need to override those with user specific properties */
if(participant->user_audio_active_packets > 0)
audio_active_packets = participant->user_audio_active_packets;
if(participant->user_audio_level_average > 0)
audio_level_average = participant->user_audio_level_average;
participant->audio_dBov_sum += level;
participant->audio_active_packets++;
participant->dBov_level = level;
if(participant->audio_active_packets > 0 && participant->audio_active_packets == audio_active_packets) {
gboolean notify_talk_event = FALSE;
if((float) participant->audio_dBov_sum / (float) participant->audio_active_packets < audio_level_average) {
/* Participant talking, should we notify all participants? */
if(!participant->talking)
notify_talk_event = TRUE;
participant->talking = TRUE;
} else {
/* Participant not talking anymore, should we notify all participants? */
if(participant->talking)
notify_talk_event = TRUE;
participant->talking = FALSE;
}
participant->audio_active_packets = 0;
participant->audio_dBov_sum = 0;
/* Only notify in case of state changes */
if(participant->room && notify_talk_event) {
janus_mutex_lock(&participant->room->mutex);
json_t *event = json_object();
json_object_set_new(event, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking"));
json_object_set_new(event, "room",
string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) :
json_integer(participant->room ? participant->room->room_id : 0));
json_object_set_new(event, "id",
string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id));
/* Notify the speaker this event is related to as well */
janus_audiobridge_notify_participants(participant, event, TRUE);
json_decref(event);
janus_mutex_unlock(&participant->room->mutex);
/* Also notify event handlers */
if(notify_events && gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking"));
json_object_set_new(info, "room",
string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) :
json_integer(participant->room ? participant->room->room_id : 0));
json_object_set_new(info, "id",
string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id));
gateway->notify_event(&janus_audiobridge_plugin, session->handle, info);
}
}
}
}
}
}
/* Queue the audio packet in the jitter buffer (we won't decode now, there might be buffering involved) */
if(participant->jitter) {
janus_audiobridge_buffer_packet *pkt = janus_audiobridge_buffer_packet_create(buf, len, silence);
janus_audiobridge_buffer_packet *pkt = janus_audiobridge_buffer_packet_create(packet);
janus_mutex_lock(&participant->qmutex);
/* Limit the size of the jitter buffer */
gint64 now = janus_get_monotonic_time();
Expand Down Expand Up @@ -7955,6 +7888,7 @@ static void *janus_audiobridge_mixer_thread(void *data) {
janus_audiobridge_room *audiobridge = (janus_audiobridge_room *)data;
if(!audiobridge) {
JANUS_LOG(LOG_ERR, "Invalid room!\n");
g_thread_unref(g_thread_self());
return NULL;
}
JANUS_LOG(LOG_VERB, "Thread is for mixing room %s (%s) at rate %"SCNu32"...\n",
Expand Down Expand Up @@ -8578,6 +8512,7 @@ static void *janus_audiobridge_mixer_thread(void *data) {

janus_refcount_decrease(&audiobridge->ref);

g_thread_unref(g_thread_self());
return NULL;
}

Expand Down Expand Up @@ -8635,7 +8570,11 @@ static void *janus_audiobridge_participant_thread(void *data) {
if(participant->jitter) {
ret = jitter_buffer_get(participant->jitter, &jbp, participant->codec == JANUS_AUDIOCODEC_OPUS ? 960 : 160, NULL);
jitter_buffer_tick(participant->jitter);
if(ret == JITTER_BUFFER_OK) {
if(ret != JITTER_BUFFER_OK) {
/* No packet in the jitter buffer? Move on the talking detection, if needed */
janus_audiobridge_participant_istalking(session, participant, NULL, NULL);
} else {
/* Decode the audio packet */
bpkt = (janus_audiobridge_buffer_packet *)jbp.data;
janus_mutex_unlock(&participant->qmutex);
locked = FALSE;
Expand All @@ -8645,16 +8584,18 @@ static void *janus_audiobridge_participant_thread(void *data) {
break;
}
/* Access the payload */
char *buffer = bpkt->rtp ? bpkt->rtp->buffer : NULL;
uint16_t len = bpkt->rtp ? bpkt->rtp->length : 0;
int plen = 0;
const unsigned char *payload = (const unsigned char *)janus_rtp_payload(bpkt->buffer, bpkt->len, &plen);
const unsigned char *payload = (const unsigned char *)janus_rtp_payload(buffer, len, &plen);
if(!payload) {
g_atomic_int_set(&participant->decoding, 0);
JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n",
participant->codec == JANUS_AUDIOCODEC_OPUS ? "Opus" : "G.711");
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
}
rtp = (janus_rtp_header *)bpkt->buffer;
rtp = (janus_rtp_header *)buffer;
/* If this is Opus, check if there's a packet gap we should fix with FEC */
use_fec = FALSE;
if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) {
Expand Down Expand Up @@ -8689,8 +8630,9 @@ static void *janus_audiobridge_participant_thread(void *data) {
pkt->ssrc = 0;
pkt->timestamp = ntohl(rtp->timestamp);
pkt->seq_number = ntohs(rtp->seq_number);
/* We might check the audio level extension to see if this is silence */
pkt->silence = bpkt->silence;
/* Check the audio level extension to see if this is silence */
pkt->silence = FALSE;
janus_audiobridge_participant_istalking(session, participant, bpkt->rtp, &pkt->silence);
pkt->length = 0;
if(participant->codec == JANUS_AUDIOCODEC_OPUS) {
/* Opus */
Expand Down Expand Up @@ -9080,3 +9022,70 @@ static void *janus_audiobridge_plainrtp_relay_thread(void *data) {
g_thread_unref(g_thread_self());
return NULL;
}

static void janus_audiobridge_participant_istalking(janus_audiobridge_session *session,
janus_audiobridge_participant *participant, janus_plugin_rtp *packet, gboolean *silence) {
/* Check the audio levels, in case we need to notify participants about who's talking */
if(participant == NULL || participant->extmap_id < 1)
return;
int level = packet ? packet->extensions.audio_level : 127;
if(level == -1)
return;
if(level == 127 && silence)
*silence = TRUE;
if(participant->room && participant->room->audiolevel_event) {
/* We need to detect who's talking: update our monitoring stuff */
int audio_active_packets = participant->room ? participant->room->audio_active_packets : 100;
int audio_level_average = participant->room ? participant->room->audio_level_average : 25;
/* Check if we need to override those with user specific properties */
if(participant->user_audio_active_packets > 0)
audio_active_packets = participant->user_audio_active_packets;
if(participant->user_audio_level_average > 0)
audio_level_average = participant->user_audio_level_average;
participant->audio_dBov_sum += level;
participant->audio_active_packets++;
participant->dBov_level = level;
if(participant->audio_active_packets > 0 && participant->audio_active_packets == audio_active_packets) {
gboolean notify_talk_event = FALSE;
if((float) participant->audio_dBov_sum / (float) participant->audio_active_packets < audio_level_average) {
/* Participant talking, should we notify all participants? */
if(!participant->talking)
notify_talk_event = TRUE;
participant->talking = TRUE;
} else {
/* Participant not talking anymore, should we notify all participants? */
if(participant->talking)
notify_talk_event = TRUE;
participant->talking = FALSE;
}
participant->audio_active_packets = 0;
participant->audio_dBov_sum = 0;
/* Only notify in case of state changes */
if(participant->room && notify_talk_event) {
janus_mutex_lock(&participant->room->mutex);
json_t *event = json_object();
json_object_set_new(event, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking"));
json_object_set_new(event, "room",
string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) :
json_integer(participant->room ? participant->room->room_id : 0));
json_object_set_new(event, "id",
string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id));
/* Notify the speaker this event is related to as well */
janus_audiobridge_notify_participants(participant, event, TRUE);
json_decref(event);
janus_mutex_unlock(&participant->room->mutex);
/* Also notify event handlers */
if(notify_events && gateway->events_is_enabled()) {
json_t *info = json_object();
json_object_set_new(info, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking"));
json_object_set_new(info, "room",
string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) :
json_integer(participant->room ? participant->room->room_id : 0));
json_object_set_new(info, "id",
string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id));
gateway->notify_event(&janus_audiobridge_plugin, session->handle, info);
}
}
}
}
}
18 changes: 18 additions & 0 deletions src/plugins/plugin.c
Expand Up @@ -57,6 +57,24 @@ void janus_plugin_rtp_reset(janus_plugin_rtp *packet) {
janus_plugin_rtp_extensions_reset(&packet->extensions);
}
}
janus_plugin_rtp *janus_plugin_rtp_duplicate(janus_plugin_rtp *packet) {
janus_plugin_rtp *p = NULL;
if(packet) {
p = g_malloc(sizeof(janus_plugin_rtp));
p->mindex = packet->mindex;
p->video = packet->video;
if(packet->buffer == NULL || packet->length == 0) {
p->buffer = NULL;
p->length = 0;
} else {
p->buffer = g_malloc(packet->length);
memcpy(p->buffer, packet->buffer, packet->length);
p->length = packet->length;
}
p->extensions = packet->extensions;
}
return p;
}
void janus_plugin_rtcp_reset(janus_plugin_rtcp *packet) {
if(packet) {
memset(packet, 0, sizeof(janus_plugin_rtcp));
Expand Down
9 changes: 8 additions & 1 deletion src/plugins/plugin.h
Expand Up @@ -171,7 +171,7 @@ janus_plugin *create(void) {
* Janus instance or it will crash.
*
*/
#define JANUS_PLUGIN_API_VERSION 103
#define JANUS_PLUGIN_API_VERSION 104

/*! \brief Initialization of all plugin properties to NULL
*
Expand Down Expand Up @@ -610,6 +610,13 @@ struct janus_plugin_rtp {
* @param[in] packet Pointer to the janus_plugin_rtp packet to reset
*/
void janus_plugin_rtp_reset(janus_plugin_rtp *packet);
/*! \brief Helper method to duplicate the RTP packet and its buffer
* @note The core will always pass non-allocated packets to plugins, which
* means they may have to duplicate them in case they need them for more time.
* @param[in] packet Pointer to the janus_plugin_rtp packet to duplicate
* @returns A pointer to the new janus_plugin_rtp, if successful, or NULL otherwise
*/
janus_plugin_rtp *janus_plugin_rtp_duplicate(janus_plugin_rtp *packet);

/*! \brief Janus plugin RTCP packet */
struct janus_plugin_rtcp {
Expand Down

0 comments on commit e78232e

Please sign in to comment.