Skip to content

Commit

Permalink
added latency calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
nik012003 committed Jun 2, 2023
1 parent fbe9f53 commit 3415e87
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 34 deletions.
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
]
}
29 changes: 22 additions & 7 deletions src/client_message_handler.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use mpvipc::Mpv;
use mpvipc::*;
use std::{net::SocketAddr, sync::Arc};
use std::{collections::VecDeque, net::SocketAddr, sync::Arc};
use tokio::{net::TcpStream, sync::Mutex};
use url::Url;

use crate::{proto::*, Peer, Settings, Shared};
use crate::{
proto::*,
time::{get_timestamp, get_weighted_latency, MAX_QUEUE_LATENCY},
Peer, Settings, Shared,
};

pub async fn handle_connection(
mut mpv: Mpv,
Expand All @@ -22,6 +26,7 @@ pub async fn handle_connection(
tx,
username: Default::default(),
ready: false,
latency: VecDeque::with_capacity(MAX_QUEUE_LATENCY),
},
);

Expand All @@ -47,9 +52,23 @@ pub async fn handle_connection(
loop {
match reader.read_packet().await {
Ok(packet) => {
let mut s = state.lock().await;

let t_delta = get_timestamp() - packet.timestamp;
let latency_vec = &mut s.peers.get_mut(&addr).unwrap().latency;
if latency_vec.len() == MAX_QUEUE_LATENCY {
latency_vec.pop_back();
}
latency_vec.push_front(t_delta);

println!(
"Avg Latency : {}ms , Current Latency: {}",
get_weighted_latency(latency_vec),
t_delta
);

match packet.command {
VoyeursCommand::Ready(p) => {
let mut s = state.lock().await;
if settings.standalone {
if mpv.get_property::<bool>("pause").unwrap() == p {
s.ignore_next = true;
Expand Down Expand Up @@ -89,7 +108,6 @@ pub async fn handle_connection(
VoyeursCommand::Seek(t) => {
let current_time: f64 =
mpv.get_property("playback-time").unwrap_or_default();
let mut s = state.lock().await;
if t != current_time {
s.ignore_next = true;

Expand Down Expand Up @@ -117,7 +135,6 @@ pub async fn handle_connection(
let duration = mpv.get_property("duration").unwrap_or_default();
let pause: bool = mpv.get_property("pause").unwrap_or_default();
let current_time = mpv.get_property("playback-time").unwrap_or_default();
let mut s = state.lock().await;
s.peers.get_mut(&addr).unwrap().username = username;
s.send(addr, VoyeursCommand::Filename(filename)).await;
s.send(addr, VoyeursCommand::Duration(duration)).await;
Expand All @@ -134,7 +151,6 @@ pub async fn handle_connection(
if Url::parse(&streamname).is_err() {
streamname = "".to_owned();
}
let mut s = state.lock().await;
s.send(addr, VoyeursCommand::StreamName(streamname)).await;
}
}
Expand All @@ -149,7 +165,6 @@ pub async fn handle_connection(
})
.unwrap();
while !matches!(mpv.event_listen().unwrap(), Event::FileLoaded) {}
let mut s = state.lock().await;
s.send(
addr,
VoyeursCommand::NewConnection(settings.username.to_string()),
Expand Down
18 changes: 5 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
mod client_message_handler;
mod mpv_event_handler;
mod proto;
mod time;

use clap::Parser;
use client_message_handler::*;
use mpv_event_handler::*;
use mpvipc::*;
use proto::*;
use rsntp::SntpClient;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::time::{SystemTime, UNIX_EPOCH};
use std::vec;
use std::{collections::HashMap, process::Command, sync::Arc};
use tempfile::tempdir;
use time::set_time_delta;
use tokio::{
io::AsyncWriteExt,
net::{lookup_host, tcp::OwnedWriteHalf, TcpListener, TcpStream},
Expand Down Expand Up @@ -63,6 +64,7 @@ pub struct Peer {
tx: OwnedWriteHalf,
username: String,
ready: bool,
latency: VecDeque<u64>,
}

pub struct Shared {
Expand Down Expand Up @@ -127,17 +129,7 @@ async fn main() {
let args = Cli::parse();

if !args.trust_system_time {
let client = SntpClient::new();
let result = client
.synchronize(args.ntp_server)
.expect("Coudn't syncronize time with ntp server");
let delta: i64 = result.datetime().unix_timestamp().expect("msg").as_millis() as i64
- SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Couldn't get system time")
.as_millis() as i64;
println!("Clock skew: {} ms", delta);
TIME_DELTA.store(delta, std::sync::atomic::Ordering::SeqCst);
set_time_delta(args.ntp_server);
}

let state = Arc::new(Mutex::new(Shared::new()));
Expand Down
18 changes: 4 additions & 14 deletions src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use lazy_static::lazy_static;
use std::mem::size_of;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::vec;
use std::{error::Error, fmt};
use tokio::net::tcp::OwnedReadHalf;

use crate::time::get_timestamp;

const PROTOCOL_VERSION: u16 = 1;

// Packet structure
Expand All @@ -21,10 +19,6 @@ pub type TsSize = u64;
pub type CmdSize = u8;
pub type LenSize = u16;

lazy_static! {
pub static ref TIME_DELTA: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
}

pub struct PacketReader {
pub inner: OwnedReadHalf,
}
Expand Down Expand Up @@ -198,14 +192,10 @@ impl VoyeursCommand {
}

pub fn craft_packet(self) -> Packet {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Couldn't get system time")
.as_millis() as i64
+ TIME_DELTA.load(std::sync::atomic::Ordering::SeqCst);
let timestamp = get_timestamp();

Packet {
timestamp: timestamp.try_into().unwrap(),
timestamp,
command: self,
}
}
Expand Down
45 changes: 45 additions & 0 deletions src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use lazy_static::lazy_static;
use rsntp::SntpClient;
use std::collections::VecDeque;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::proto::TsSize;

lazy_static! {
pub static ref TIME_DELTA: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
}

pub fn get_timestamp() -> TsSize {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Couldn't get system time")
.as_millis() as TsSize
+ TIME_DELTA.load(std::sync::atomic::Ordering::SeqCst) as TsSize
}

pub fn set_time_delta(ntp_server: String) {
let client = SntpClient::new();
let result = client
.synchronize(ntp_server)
.expect("Coudn't syncronize time with ntp server");
let delta: i64 = result.datetime().unix_timestamp().expect("msg").as_millis() as i64
- SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Couldn't get system time")
.as_millis() as i64;
println!("Clock skew: {} ms", delta);
TIME_DELTA.store(delta, std::sync::atomic::Ordering::SeqCst);
}

pub static MAX_QUEUE_LATENCY: usize = 10;

pub fn get_weighted_latency(latency: &VecDeque<u64>) -> u64 {
latency
.iter()
.enumerate()
.map(|(i, l)| l * (MAX_QUEUE_LATENCY as u64 / (i + 1) as u64))
.sum::<u64>()
/ latency.len() as u64
}

0 comments on commit 3415e87

Please sign in to comment.