Skip to content

Commit

Permalink
split data and status processing into separate threads
Browse files Browse the repository at this point in the history
  • Loading branch information
ka9q committed May 4, 2024
1 parent bb39989 commit da03f94
Showing 1 changed file with 82 additions and 79 deletions.
161 changes: 82 additions & 79 deletions monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ static int close_session(struct session **);
static int pa_callback(void const *,void *,unsigned long,PaStreamCallbackTimeInfo const *,PaStreamCallbackFlags,void *);
static void *decode_task(void *x);
static void *dataproc(void *arg);
static void *statproc(void *arg);
static void *repeater_ctl(void *arg);
static char const *lookupid(uint32_t ssrc);
static float make_position(int);
Expand Down Expand Up @@ -486,10 +487,12 @@ int main(int argc,char * const argv[]){
// Spawn one thread per address
// All have to succeed in resolving their targets or we'll exit
// This allows a restart when started automatically from systemd before avahi is fully running
pthread_t sockthreads[Nfds];
for(int i=0; i<Nfds; i++)
pthread_create(&sockthreads[i],NULL,dataproc,Mcast_address_text[i]);

pthread_t datathreads[Nfds];
pthread_t statthreads[Nfds];
for(int i=0; i<Nfds; i++){
pthread_create(&datathreads[i],NULL,dataproc,Mcast_address_text[i]);
pthread_create(&statthreads[i],NULL,statproc,Mcast_address_text[i]);
}
Last_error_time = gps_time_ns();

// Spawn the display thread so it isn't charged with everybody's CPU time
Expand All @@ -505,7 +508,73 @@ int main(int argc,char * const argv[]){
exit(EX_OK); // calls cleanup() to clean up Portaudio and ncurses. Can't happen...
}

// Receive from data and status multicast streams, update local states, multiplex to decoder threads
// Receive status multicasts on output multicast groups, update local states
static void *statproc(void *arg){
char const *mcast_address_text = (char *)arg;
{
char name[100];
snprintf(name,sizeof(name),"stat %s",mcast_address_text);
pthread_setname(name);
}

int status_fd;
{
char iface[1024];
struct sockaddr sock;
resolve_mcast(mcast_address_text,&sock,DEFAULT_STAT_PORT,iface,sizeof(iface));
status_fd = listen_mcast(&sock,iface);
}
if(status_fd == -1)
pthread_exit(NULL);

// Main loop begins here - does not need to be realtime?
while(!Terminate){
uint8_t buffer[PKTSIZE];
struct sockaddr_storage sender;
socklen_t socksize = sizeof(sender);
int length = recvfrom(status_fd,buffer,sizeof(buffer),0,(struct sockaddr *)&sender,&socksize);
if(buffer[0] != STATUS) // not status, ignore
continue;

// Extract just the SSRC to see if the session exists
// NB! Assumes same IP address *and port* for status and data
// This is only true for recent versions of radiod, after the switch to unconnected output sockets
// But older versions don't send status on the output channel anyway, so no problem
uint32_t ssrc = get_ssrc(buffer+1,length-1);
pthread_mutex_lock(&Sess_mutex); // Protect Nsessions
struct session *sp = lookup_session(&sender,ssrc);
if(!sp){
// Status arrived before first RTP; create and init session
sp = create_session();
if(!sp){
pthread_mutex_unlock(&Sess_mutex);
fprintf(stderr,"No room!!\n");
continue;
}
sp->chan.inuse = true; // Not sent in status update
sp->ssrc = ssrc;
}
// Always decode directly into local copy, as not every parameter is updated in every status message
// Decoding into a temp copy and then memcpy would write zeroes into unsent parameters
decode_radio_status(&sp->frontend,&sp->chan,buffer+1,length-1);
// Update SNR calculation (not sent explicitly)
float const noise_bandwidth = fabsf(sp->chan.filter.max_IF - sp->chan.filter.min_IF);
float sig_power = sp->chan.sig.bb_power - noise_bandwidth * sp->chan.sig.n0;
if(sig_power < 0)
sig_power = 0; // Avoid log(-x) = nan
float const sn0 = sig_power/sp->chan.sig.n0;
float const snr = power2dB(sn0/noise_bandwidth);
sp->snr = sp->now_active ? snr : -INFINITY;
memcpy(&sp->sender,&sender,sizeof(sp->sender));
sp->last_active = gps_time_ns(); // Keep active time calc from blowing up before data packet arrives
sp->samprate = sp->chan.output.samprate;
pthread_mutex_unlock(&Sess_mutex);
}
return NULL;
}


// Receive from data multicast streams, multiplex to decoder threads
static void *dataproc(void *arg){
char const *mcast_address_text = (char *)arg;
{
Expand All @@ -521,84 +590,14 @@ static void *dataproc(void *arg){
resolve_mcast(mcast_address_text,&sock,DEFAULT_RTP_PORT,iface,sizeof(iface));
input_fd = listen_mcast(&sock,iface);
}

if(input_fd == -1)
pthread_exit(NULL);

int status_fd;
{
char iface[1024];
struct sockaddr sock;
// Could just change the port number from the RTP resolve above since the IP addresses are the same
resolve_mcast(mcast_address_text,&sock,DEFAULT_STAT_PORT,iface,sizeof(iface));
status_fd = listen_mcast(&sock,iface);
}
struct packet *pkt = NULL;

realtime();
// Main loop begins here
while(!Terminate){
// poll input status and data UDP sockets on same IP multicast group
int const nfds = 2;
struct pollfd fds[nfds];
fds[0].fd = input_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
fds[1].fd = status_fd;
fds[1].events = POLLIN;
fds[1].revents = 0;
int n = poll(fds,nfds,100); // timeout to poll terminate flag
if(n < 0)
perror("poll");
if(n <= 0)
continue;

if(fds[1].revents & (POLLIN|POLLPRI)){
// Got a status packet
uint8_t buffer[PKTSIZE];
struct sockaddr_storage sender;
socklen_t socksize = sizeof(sender);
int length = recvfrom(status_fd,buffer,sizeof(buffer),0,(struct sockaddr *)&sender,&socksize);
if(buffer[0] != STATUS) // not status, ignore
continue;

// Extract just the SSRC to see if the session exists
// NB! Assumes same IP address *and port* for status and data
// This is only true for recent versions of radiod, after the switch to unconnected output sockets
// But older versions don't send status on the output channel anyway, so no problem
uint32_t ssrc = get_ssrc(buffer+1,length-1);
pthread_mutex_lock(&Sess_mutex); // Protect Nsessions
struct session *sp = lookup_session(&sender,ssrc);
if(!sp){
// Status arrived before first RTP; create and init session
sp = create_session();
if(!sp){
pthread_mutex_unlock(&Sess_mutex);
fprintf(stderr,"No room!!\n");
continue;
}
sp->chan.inuse = true; // Not sent in status update
sp->ssrc = ssrc;
}
// Always decode directly into local copy, as not every parameter is updated in every status message
// Decoding into a temp copy and then memcpy would write zeroes into unsent parameters
decode_radio_status(&sp->frontend,&sp->chan,buffer+1,length-1);
// Update SNR calculation (not sent explicitly)
float const noise_bandwidth = fabsf(sp->chan.filter.max_IF - sp->chan.filter.min_IF);
float sig_power = sp->chan.sig.bb_power - noise_bandwidth * sp->chan.sig.n0;
if(sig_power < 0)
sig_power = 0; // Avoid log(-x) = nan
float const sn0 = sig_power/sp->chan.sig.n0;
sp->snr = power2dB(sn0/noise_bandwidth);
sp->snr = sp->now_active ? sp->snr : -INFINITY;
memcpy(&sp->sender,&sender,sizeof(sp->sender));
sp->last_active = gps_time_ns(); // Keep active time calc from blowing up before data packet arrives
sp->samprate = sp->chan.output.samprate;
pthread_mutex_unlock(&Sess_mutex);
continue; // next packet
}
if(!(fds[0].revents & (POLLIN|POLLPRI)))
continue; // Not ready to receive a data packet
// Need a new packet buffer?
if(!pkt)
pkt = malloc(sizeof(*pkt));
Expand Down Expand Up @@ -645,8 +644,6 @@ static void *dataproc(void *arg){
continue;
}
// Keep the lock while we initialize
pthread_cond_init(&sp->qcond,NULL);
pthread_mutex_init(&sp->qmutex,NULL);
sp->ssrc = pkt->rtp.ssrc;
memcpy(&sp->sender,&sender,sizeof(sender)); // Bind to specific host and sending port
sp->type = pkt->rtp.type;
Expand Down Expand Up @@ -959,14 +956,16 @@ static void *decode_task(void *arg){
Extreme gain differences can make the source sound like it's inside an ear
This can be uncomfortable in good headphones with extreme panning
-6dB for each channel in the center
when full to one side or the other, that channel is +6 dB and the other is -inf dB */
when full to one side or the other, that channel is +6 dB and the other is -inf dB
*/
float const left_gain = sp->gain * (1 - sp->pan)/2;
float const right_gain = sp->gain * (1 + sp->pan)/2;
/* Delay less favored channel 0 - 1.5 ms max (determined
empirically) This is really what drives source localization
in humans. The effect is so dramatic even with equal levels
you have to remove one earphone to convince yourself that the
levels really are the same! */
levels really are the same!
*/
int const left_delay = (sp->pan > 0) ? round(sp->pan * .0015 * DAC_samprate) : 0; // Delay left channel
int const right_delay = (sp->pan < 0) ? round(-sp->pan * .0015 * DAC_samprate) : 0; // Delay right channel

Expand Down Expand Up @@ -1669,6 +1668,10 @@ static struct session *create_session(void){

// Put at end of list
Sessions[Nsessions++] = sp;
sp->chan.inuse = true;
pthread_cond_init(&sp->qcond,NULL);
pthread_mutex_init(&sp->qmutex,NULL);

return sp;
}

Expand Down

0 comments on commit da03f94

Please sign in to comment.