Skip to content

Commit

Permalink
fixed sockin keep-alive mode to use a single output pid - cf #2783
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanlf committed Apr 9, 2024
1 parent 3cb4dd8 commit b17e6e4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 33 deletions.
24 changes: 16 additions & 8 deletions share/doc/man/gpac-filters.1
Expand Up @@ -1416,6 +1416,14 @@ When ports are specified in the URL and the default option separators are used (
- use gpac separator, e.g. udp://localhost:1234[:gpac:opts]
.br

.br
When the socket is listening in keep-alive .I ka mode:
.br
- a single connection is allowed and a single output PID will be produced
.br
- each connection close event will triger a pipeline flush
.br

.br
On OSX with VM packet replay you will need to force multicast routing, e.g. route add -net 239.255.1.4/32 -interface vboxnet0
.br
Expand Down Expand Up @@ -8040,7 +8048,7 @@ cicp_idx (uint, default: 2): target output layout CICP index (see gpac -h layo
.br
Description: FFMPEG demultiplexer
.br
Version: Lavf60.18.100
Version: Lavf61.3.100
.br

.br
Expand Down Expand Up @@ -8078,7 +8086,7 @@ strbuf_min (uint, default: 1MB): internal buffer size when demuxing from GPAC's
.br
Description: FFMPEG decoder
.br
Version: Lavc60.35.100
Version: Lavc61.5.101
.br

.br
Expand Down Expand Up @@ -8137,7 +8145,7 @@ c (str): codec name (GPAC or ffmpeg), only used to query p
.br
Description: FFMPEG AV Capture
.br
Version: Lavd60.4.100
Version: Lavd61.2.100
.br

.br
Expand Down Expand Up @@ -8230,7 +8238,7 @@ probes (uint, default: 10, minmax: 0-100): probe a given number of video frames
.br
Description: FFMPEG video rescaler
.br
Version: SwS7.6.100
Version: SwS8.2.100
.br

.br
Expand Down Expand Up @@ -8348,7 +8356,7 @@ osar (frac, default: 0/1): force output pixel aspect ratio
.br
Description: FFMPEG encoder
.br
Version: Lavc60.35.100
Version: Lavc61.5.101
.br

.br
Expand Down Expand Up @@ -8426,7 +8434,7 @@ rld (bool, default: false, updatable): force reloading of encoder when arguments
.br
Description: FFMPEG multiplexer
.br
Version: Lavf60.18.100
Version: Lavf61.3.100
.br

.br
Expand Down Expand Up @@ -8490,7 +8498,7 @@ keepts (bool, default: true): do not shift input timeline back to 0
.br
Description: FFMPEG AVFilter
.br
Version: Lavfi9.14.100
Version: Lavfi10.2.101
.br

.br
Expand Down Expand Up @@ -8633,7 +8641,7 @@ dump (bool, default: false, updatable): dump graph as log media@info or stderr i
.br
Description: FFMPEG BitStream filter
.br
Version: Lavu58.34.100
Version: Lavu59.13.100
.br

.br
Expand Down
2 changes: 1 addition & 1 deletion share/doc/man/gpac.1
Expand Up @@ -3956,7 +3956,7 @@ Time shift in number of segments for HAS streams, only set by dashin and dasher
.TP
.B IsManifest (PHSM,uint,D )
.br
PID is a HAS manifest
PID is a HAS manifest (MSB=1 if live)
.br
* 0: not a manifest
.br
Expand Down
79 changes: 55 additions & 24 deletions src/filters/in_sock.c
Expand Up @@ -2,7 +2,7 @@
* GPAC - Multimedia Framework C SDK
*
* Authors: Jean Le Feuvre
* Copyright (c) Telecom ParisTech 2017-2023
* Copyright (c) Telecom ParisTech 2017-2024
* All rights reserved
*
* This file is part of GPAC / generic TCP/UDP input filter
Expand Down Expand Up @@ -47,10 +47,11 @@ typedef struct
#endif
char address[GF_MAX_IP_NAME_LEN];

u64 start_time, last_stats_time;
u32 init_time;
Bool done, first_pck;
//stats
u64 nb_bytes;
Bool done;
u64 start_time, last_stats_time;
} GF_SockInClient;

typedef struct
Expand Down Expand Up @@ -123,6 +124,7 @@ static GF_Err sockin_initialize(GF_Filter *filter)
GF_LOG(GF_LOG_ERROR, GF_LOG_NETWORK, ("[SockIn] Failed to open socket for %s\n", ctx->src));
return GF_IO_ERR;
}
ctx->sock_c.first_pck = GF_TRUE;

/*setup port and src*/
port = ctx->port;
Expand Down Expand Up @@ -209,8 +211,10 @@ static GF_Err sockin_initialize(GF_Filter *filter)
static void sockin_client_reset(GF_SockInClient *sc)
{
if (sc->socket) gf_sk_del(sc->socket);
sc->socket = NULL;
#ifndef GPAC_DISABLE_STREAMING
if (sc->rtp_reorder) gf_rtp_reorderer_del(sc->rtp_reorder);
sc->rtp_reorder = NULL;
#endif
}

Expand Down Expand Up @@ -308,6 +312,10 @@ static GF_Err sockin_read_client(GF_Filter *filter, GF_SockInCtx *ctx, GF_SockIn
case GF_IP_CONNECTION_CLOSED:
if (!sock_c->done) {
sock_c->done = GF_TRUE;
if (ctx->ka) {
gf_filter_pid_send_flush(sock_c->pid);
return GF_IP_CONNECTION_CLOSED;
}
gf_filter_pid_set_eos(sock_c->pid);
}
return GF_EOS;
Expand All @@ -328,7 +336,7 @@ static GF_Err sockin_read_client(GF_Filter *filter, GF_SockInCtx *ctx, GF_SockIn
}
if (!nb_read) return GF_OK;

if (!sock_c->nb_bytes) {
if (sock_c->first_pck) {
GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] Reception started after %u ms\n", gf_sys_clock() - sock_c->init_time));
}

Expand Down Expand Up @@ -390,11 +398,12 @@ static GF_Err sockin_read_client(GF_Filter *filter, GF_SockInCtx *ctx, GF_SockIn
if (pck) {
dst_pck = gf_filter_pck_new_shared(sock_c->pid, pck+12, nb_read-12, sockin_rtp_destructor);
if (dst_pck) {
gf_filter_pck_set_framing(dst_pck, GF_TRUE, GF_TRUE);
gf_filter_pck_set_framing(dst_pck, sock_c->first_pck, GF_FALSE);
gf_filter_pck_send(dst_pck);
sock_c->first_pck = GF_FALSE;
}
}
return GF_OK;
goto do_stats;
}
#else
if (sock_c->is_rtp) {
Expand All @@ -408,19 +417,22 @@ static GF_Err sockin_read_client(GF_Filter *filter, GF_SockInCtx *ctx, GF_SockIn

memcpy(out_data, in_data, nb_read);

gf_filter_pck_set_framing(dst_pck, (sock_c->nb_bytes == nb_read) ? GF_TRUE : GF_FALSE, GF_FALSE);
gf_filter_pck_set_framing(dst_pck, sock_c->first_pck, GF_FALSE);
gf_filter_pck_send(dst_pck);
sock_c->first_pck = GF_FALSE;

//send bitrate
do_stats:
//send bitrate ever half sec
now = gf_sys_clock_high_res();
if (now > sock_c->last_stats_time + 100000) {
if (now > sock_c->last_stats_time + 500000) {
sock_c->last_stats_time = now;
u64 bitrate = (now - sock_c->start_time );
if (bitrate) {
bitrate = (sock_c->nb_bytes * 8 * 1000000) / bitrate;
bitrate = (sock_c->nb_bytes * 8 * 500000) / bitrate;
gf_filter_pid_set_info(sock_c->pid, GF_PROP_PID_DOWN_RATE, &PROP_UINT((u32) bitrate) );
GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] Receiving from %s at %d kbps\r", sock_c->address, (u32) (bitrate/10)));
GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] Receiving from %s at %d kbps\r", sock_c->address, (u32) (bitrate/1000)));
}
sock_c->nb_bytes = 0;
}

return GF_OK;
Expand Down Expand Up @@ -448,7 +460,7 @@ static GF_Err sockin_check_eos(GF_Filter *filter, GF_SockInCtx *ctx)
if (ctx->sock_c.pid)
gf_filter_pid_set_eos(ctx->sock_c.pid);
ctx->sock_c.done = GF_TRUE;
if (ctx->sock_c.nb_bytes) {
if (!ctx->sock_c.first_pck) {
GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] No data received for %d ms, assuming end of stream\n", ctx->timeout));
} else {
GF_LOG(GF_LOG_WARNING, GF_LOG_NETWORK, ("[SockIn] No data received after %d ms, aborting\n", ctx->timeout));
Expand All @@ -473,7 +485,7 @@ static GF_Err sockin_process(GF_Filter *filter)
if (ctx->is_udp) {
e = sockin_check_eos(filter, ctx);
if (e) return e;
if (!ctx->sock_c.nb_bytes) {
if (ctx->sock_c.first_pck) {
gf_filter_ask_rt_reschedule(filter, 10000);
return GF_OK;
}
Expand Down Expand Up @@ -506,17 +518,29 @@ static GF_Err sockin_process(GF_Filter *filter)
if (gf_sk_group_sock_is_set(ctx->active_sockets, ctx->sock_c.socket, GF_SK_SELECT_READ)) {
e = gf_sk_accept(ctx->sock_c.socket, &new_conn);
if ((e==GF_OK) && new_conn) {
GF_SockInClient *sc;
GF_SAFEALLOC(sc, GF_SockInClient);
if (!sc) return GF_OUT_OF_MEM;

GF_SockInClient *sc=NULL;
if (ctx->ka) {
sc = gf_list_get(ctx->clients, 0);
if (sc && sc->socket) {
gf_sk_del(new_conn);
GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] Rejecting connection since one client is already connected and keep-alive is enabled\n", sc->address));
return GF_OK;
}
}
if (!sc) {
GF_SAFEALLOC(sc, GF_SockInClient);
if (!sc) return GF_OUT_OF_MEM;
gf_list_add(ctx->clients, sc);
sc->first_pck = GF_TRUE;
}
sc->done = GF_FALSE;

sc->socket = new_conn;
strcpy(sc->address, "unknown");
gf_sk_get_remote_address(new_conn, sc->address);
gf_sk_set_block_mode(new_conn, !ctx->block);

GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] Accepting new connection from %s\n", sc->address));
gf_list_add(ctx->clients, sc);
ctx->had_clients = GF_TRUE;
gf_sk_group_register(ctx->active_sockets, sc->socket);
sc->init_time = gf_sys_clock();
Expand All @@ -528,16 +552,18 @@ static GF_Err sockin_process(GF_Filter *filter)
count = gf_list_count(ctx->clients);
for (i=0; i<count; i++) {
GF_SockInClient *sc = gf_list_get(ctx->clients, i);
if (!sc->socket) continue;

if (!gf_sk_group_sock_is_set(ctx->active_sockets, sc->socket, GF_SK_SELECT_READ)) continue;

e = sockin_read_client(filter, ctx, sc);
if (e == GF_IP_CONNECTION_CLOSED) {
GF_LOG(GF_LOG_WARNING, GF_LOG_NETWORK, ("[SockIn] Connection to %s lost, removing input\n", sc->address));
GF_LOG(ctx->ka ? GF_LOG_INFO : GF_LOG_WARNING, GF_LOG_NETWORK, ("[SockIn] Connection to %s lost, %s\n", sc->address, ctx->ka ? "entering keepalive" : "removing input"));
if (sc->socket)
gf_sk_group_unregister(ctx->active_sockets, sc->socket);

sockin_client_reset(sc);
sockin_client_reset(sc);
if (ctx->ka) continue;
if (sc->pid) {
gf_filter_pid_set_eos(sc->pid);
gf_filter_pid_remove(sc->pid);
Expand Down Expand Up @@ -613,17 +639,22 @@ GF_FilterRegister SockInRegister = {
#ifdef GPAC_HAS_SOCK_UN
"- UDP unix domain sockets are used for source URLs formatted as `udpu://NAME`\n"
"- TCP unix domain sockets are used for source URLs formatted as `tcpu://NAME`\n"
#else
"Your platform does not supports unix domain sockets, udpu:// and tcpu:// schemes not supported."
#endif
"\n"
"When ports are specified in the URL and the default option separators are used (see `gpac -h doc`), the URL must either:\n"
"- have a trailing '/', e.g. `udp://localhost:1234/[:opts]`\n"
"- use `gpac` separator, e.g. `udp://localhost:1234[:gpac:opts]`\n"
"\n"
"When the socket is listening in keep-alive [-ka]() mode:\n"
"- a single connection is allowed and a single output PID will be produced\n"
"- each connection close event will triger a pipeline flush\n"
"\n"
#ifdef GPAC_CONFIG_DARWIN
"\nOn OSX with VM packet replay you will need to force multicast routing, e.g. `route add -net 239.255.1.4/32 -interface vboxnet0`"
"\nOn OSX with VM packet replay you will need to force multicast routing, e.g. `route add -net 239.255.1.4/32 -interface vboxnet0`"
#endif
""
#else
"Your platform does not supports unix domain sockets, udpu:// and tcpu:// schemes not supported."
#endif
,
#endif //GPAC_DISABLE_DOC
.private_size = sizeof(GF_SockInCtx),
Expand Down

0 comments on commit b17e6e4

Please sign in to comment.