Skip to content

Commit

Permalink
fixed potential lag pushin byte ranges from active file in http serve…
Browse files Browse the repository at this point in the history
…r - cf #2700
  • Loading branch information
jeanlf committed Dec 13, 2023
1 parent e78d31d commit 1729b01
Showing 1 changed file with 60 additions and 20 deletions.
80 changes: 60 additions & 20 deletions src/filters/out_http.c
Expand Up @@ -836,20 +836,39 @@ static Bool httpout_sess_parse_range(GF_HTTPOutSession *sess, char *range)
}
if (!request_ok) return GF_FALSE;

known_file_size = 0;
if (sess->in_source && !sess->resource) {
//cannot fetch end of file it is not yet known !
if (has_file_end) return GF_FALSE;
known_file_size = sess->in_source->nb_write;
} else {
known_file_size = sess->file_size;
//HTTP does not allow for "range: X-" to resolve in "content-range: X-/*"
//we support this in GPAC and allow other clients too if they set "X-Allow-Open-Partial-Range" (GPAC specific)
Bool allow_partial_unknown_size = GF_FALSE;
const char *str = gf_dm_sess_get_header(sess->http_sess, "User-Agent");
if (str && !strncmp(str, "GPAC/", 5))
allow_partial_unknown_size = GF_TRUE;
str = gf_dm_sess_get_header(sess->http_sess, "X-Allow-Open-Partial-Range");
if (str && (!strcmp(str, "1") || !strcmp(str, "true") || !strcmp(str, "yes")))
allow_partial_unknown_size = GF_TRUE;

if (!allow_partial_unknown_size || !sess->file_in_progress) {
known_file_size = sess->file_size;
}
}
sess->bytes_in_req = 0;
for (i=0; i<sess->nb_ranges; i++) {
if (sess->ranges[i].start>=0) {
//if start, end is a pos in bytes in size (0-based)
if (sess->ranges[i].end==-1) {
if (known_file_size==0) continue;

sess->ranges[i].end = known_file_size-1;
}
if (known_file_size==0) {
request_ok = GF_FALSE;
break;
}

if (sess->ranges[i].end >= (s64) known_file_size) {
request_ok = GF_FALSE;
Expand All @@ -860,6 +879,10 @@ static Bool httpout_sess_parse_range(GF_HTTPOutSession *sess, char *range)
break;
}
} else {
if (known_file_size==0) {
request_ok = GF_FALSE;
break;
}
//no start, end is a file size
if (sess->ranges[i].end >= (s64) known_file_size) {
request_ok = GF_FALSE;
Expand Down Expand Up @@ -1977,7 +2000,9 @@ static void httpout_sess_io(void *usr_cbk, GF_NETIO_Parameter *parameter)
char *ranges = NULL;
gf_dynstrcat(&ranges, "bytes=", NULL);
for (i=0; i<sess->nb_ranges; i++) {
if (sess->in_source || !sess->file_size) {
if (sess->ranges[i].end==-1) {
sprintf(szFmt, LLD"-/*", sess->ranges[i].start);
} else if (sess->in_source || !sess->file_size) {
sprintf(szFmt, LLD"-"LLD"/*", sess->ranges[i].start, sess->ranges[i].end);
} else {
sprintf(szFmt, LLD"-"LLD"/"LLU, sess->ranges[i].start, sess->ranges[i].end, sess->file_size);
Expand Down Expand Up @@ -3114,7 +3139,11 @@ static GF_Err httpout_sess_data_upload(GF_HTTPOutSession *sess, const u8 *data,
}
remain = size;
while (remain) {
to_write = (u32) (sess->ranges[sess->range_idx].end + 1 - sess->file_pos);
if (sess->ranges[sess->range_idx].end<0)
to_write = remain;
else
to_write = (u32) (sess->ranges[sess->range_idx].end + 1 - sess->file_pos);

if (to_write>=remain) {
write = (u32) gf_fwrite(data, remain, sess->resource);
if (write != remain) {
Expand Down Expand Up @@ -3166,17 +3195,6 @@ static void log_request_done(GF_HTTPOutSession *sess)
unit = "kbps";
bps/=1000;
}
#if 0
assert(sess->nb_bytes);
if (sess->nb_ranges) {
u32 i;
u64 tot_bytes = 0;
for (i=0; i<sess->nb_ranges; i++) {
tot_bytes += sess->ranges[i].end - sess->ranges[i].start + 1;
}
assert(tot_bytes==sess->nb_bytes);
}
#endif
GF_LOG(GF_LOG_INFO, GF_LOG_ALL, ("[HTTPOut] %sREQ#"LLU" %s done: reply %d - "LLU" bytes in %d ms at %g %s\n", sprefix, sess->req_id, get_method_name(sess->method_type), sess->reply_code, sess->nb_bytes, (u32) (diff_us/1000), bps, unit));
}
}
Expand All @@ -3186,6 +3204,7 @@ static void httpout_process_session(GF_Filter *filter, GF_HTTPOutCtx *ctx, GF_HT
u32 read;
u64 to_read=0;
GF_Err e = GF_OK;
Bool file_in_progress, last_range;
Bool close_session = ctx->close;

if (sess->force_destroy) {
Expand Down Expand Up @@ -3460,23 +3479,40 @@ static void httpout_process_session(GF_Filter *filter, GF_HTTPOutCtx *ctx, GF_HT
}

resend:

last_range=GF_FALSE;
file_in_progress = sess->file_in_progress;
to_read=0;
//we have ranges
if (sess->nb_ranges) {
Bool range_done=GF_FALSE;
//current range is done
if ((s64) sess->file_pos >= sess->ranges[sess->range_idx].end) {
if ((sess->ranges[sess->range_idx].end>0)
&& ((s64) sess->file_pos >= sess->ranges[sess->range_idx].end)
) {
//load next range, seeking file
if (sess->range_idx+1<sess->nb_ranges) {
sess->range_idx++;
sess->file_pos = (u64) sess->ranges[sess->range_idx].start;
gf_fseek(sess->resource, sess->file_pos, SEEK_SET);
} else {
range_done = GF_TRUE;
file_in_progress = GF_FALSE;
}
}
if (sess->range_idx<sess->nb_ranges) {
to_read = sess->ranges[sess->range_idx].end + 1 - sess->file_pos;
if (!range_done && (sess->range_idx<sess->nb_ranges)) {
if (sess->ranges[sess->range_idx].end>0) {
to_read = sess->ranges[sess->range_idx].end + 1 - sess->file_pos;
//we have an explicit closed range request, stop even if file is still being produced
file_in_progress = GF_FALSE;
if (sess->range_idx+1 == sess->nb_ranges)
last_range = GF_TRUE;
} else if (sess->file_size > sess->file_pos) {
to_read = sess->file_size - sess->file_pos;
}
}
} else if (sess->file_pos < sess->file_size) {
to_read = sess->file_size - sess->file_pos;
last_range = GF_TRUE;
}
#ifdef GPAC_HAS_QJS
else if (sess->cbk_read) {
Expand All @@ -3493,6 +3529,7 @@ static void httpout_process_session(GF_Filter *filter, GF_HTTPOutCtx *ctx, GF_HT
#endif

if (to_read) {
u64 remain = to_read;
//rescedule asap while we send
ctx->next_wake_us = 1;

Expand Down Expand Up @@ -3540,14 +3577,18 @@ static void httpout_process_session(GF_Filter *filter, GF_HTTPOutCtx *ctx, GF_HT
} else {
GF_LOG(GF_LOG_DEBUG, GF_LOG_HTTP, ("[HTTPOut] sending data to %s for %s: "LLU"/"LLU" bytes\n", sess->peer_address, sess->path, sess->nb_bytes, sess->bytes_in_req));

//not in progress and we are done, notify (for chunk-transfer or h2) right away
if (!file_in_progress && last_range && (remain==read))
goto session_done;

if (gf_dm_sess_flush_async(sess->http_sess, GF_FALSE)==GF_OK) {
goto resend;
}
}
return;
}
//file not done yet ...
if (sess->file_in_progress || (sess->put_in_progress==1)) {
if (file_in_progress || (sess->put_in_progress==1)) {
sess->last_active_time = gf_sys_clock_high_res();
return;
}
Expand Down Expand Up @@ -4601,7 +4642,6 @@ static void httpout_process_inputs(GF_HTTPOutCtx *ctx)
}
in->nb_write += nb_write;
}
GF_LOG(GF_LOG_DEBUG, GF_LOG_HTTP, ("[HTTPOut] Pushed %d bytes\n", pck_size));
} else if (hwf) {
u32 w, h, stride, stride_uv, pf;
u32 nb_planes, uv_height;
Expand Down

0 comments on commit 1729b01

Please sign in to comment.