Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nik012003 committed May 28, 2023
1 parent 85c81e2 commit e73bed7
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 54 deletions.
50 changes: 25 additions & 25 deletions src/client_message_handler.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use mpvipc::Mpv;
use mpvipc::*;
use std::vec;
use std::{net::SocketAddr, sync::Arc};
use tokio::{net::TcpStream, sync::Mutex};
use url::Url;

use crate::{proto::*, Peer, Shared};
use crate::{proto::*, Peer, Settings, Shared};

pub async fn handle_connection(
mut mpv: Mpv,
addr: SocketAddr,
stream: TcpStream,
state: Arc<Mutex<Shared>>,
is_serving: bool,
username: &str,
accept_source: bool,
standalone: bool,
settings: Settings,
) {
println!("accepted connection");
let (rx, tx) = stream.into_split();
Expand All @@ -29,8 +25,8 @@ pub async fn handle_connection(
},
);

if !is_serving {
if accept_source {
if !settings.is_serving {
if settings.accept_source {
state
.lock()
.await
Expand All @@ -40,7 +36,10 @@ pub async fn handle_connection(
state
.lock()
.await
.send(addr, VoyeursCommand::NewConnection(username.to_string()))
.send(
addr,
VoyeursCommand::NewConnection(settings.username.to_string()),
)
.await
}
}
Expand All @@ -51,12 +50,12 @@ pub async fn handle_connection(
match packet.command {
VoyeursCommand::Ready(p) => {
let mut s = state.lock().await;
if standalone {
if settings.standalone {
if mpv.get_property::<bool>("pause").unwrap() == p {
s.ignore_next = true;
mpv.set_property("pause", !p).unwrap();
}
if is_serving {
if settings.is_serving {
s.broadcast(VoyeursCommand::Ready(p)).await;
}
} else {
Expand All @@ -67,21 +66,19 @@ pub async fn handle_connection(
s.ignore_next = true;
mpv.set_property("pause", true).unwrap();
}
if is_serving {
if settings.is_serving {
s.broadcast_excluding(VoyeursCommand::Ready(false), addr)
.await;
}
}
true => {
if dbg!(s.is_ready)
&& dbg!(s.peers.values().into_iter().all(|r| r.ready))
{
if dbg!(s.is_ready) && dbg!(s.peers.values().all(|r| r.ready)) {
if mpv.get_property::<bool>("pause").unwrap() {
s.ignore_next = true;
mpv.set_property("pause", false).unwrap();
}

if is_serving {
if settings.is_serving {
s.broadcast(VoyeursCommand::Ready(true)).await;
}
}
Expand All @@ -99,7 +96,7 @@ pub async fn handle_connection(
// If the file isn't loaded yet, the seek will fail
while mpv.seek(t, SeekOptions::Absolute).is_err() {}

if is_serving {
if settings.is_serving {
s.broadcast_excluding(VoyeursCommand::Seek(t), addr).await;
}
}
Expand All @@ -112,7 +109,7 @@ pub async fn handle_connection(
mpv.pause().unwrap();
mpv.run_command_raw(
"show-text",
&vec![format!("{username}: connected").as_str(), "2000"],
&[format!("{username}: connected").as_str(), "2000"],
)
.unwrap();

Expand All @@ -128,7 +125,7 @@ pub async fn handle_connection(
s.send(addr, VoyeursCommand::Ready(!pause)).await;
}
VoyeursCommand::GetStreamName => {
if is_serving {
if settings.is_serving {
// Check if path is a valid URL
// TODO: the correct way to check this is by using stream-open-filename and parsing its data

Expand All @@ -142,7 +139,7 @@ pub async fn handle_connection(
}
}
VoyeursCommand::StreamName(stream) => {
if accept_source {
if settings.accept_source {
if stream.is_empty() {
println!("Server is not streaming from a valid url")
}
Expand All @@ -153,15 +150,18 @@ pub async fn handle_connection(
.unwrap();
while !matches!(mpv.event_listen().unwrap(), Event::FileLoaded) {}
let mut s = state.lock().await;
s.send(addr, VoyeursCommand::NewConnection(username.to_string()))
.await
s.send(
addr,
VoyeursCommand::NewConnection(settings.username.to_string()),
)
.await
}
}
VoyeursCommand::Filename(f) => {
if f != mpv.get_property::<String>("filename").unwrap_or_default() {
mpv.run_command_raw(
"show-text",
&vec!["filename does not match with server's filename", "2000"],
&["filename does not match with server's filename", "2000"],
)
.unwrap();
}
Expand All @@ -170,7 +170,7 @@ pub async fn handle_connection(
if t != mpv.get_property::<f64>("duration").unwrap_or_default() {
mpv.run_command_raw(
"show-text",
&vec!["duration does not match with server's duration", "2000"],
&["duration does not match with server's duration", "2000"],
)
.unwrap();
}
Expand All @@ -182,7 +182,7 @@ pub async fn handle_connection(
let peer = s.peers.remove(&addr).unwrap();
mpv.run_command_raw(
"show-text",
&vec![format!("{} : disconnected", peer.username).as_str(), "2000"],
&[format!("{} : disconnected", peer.username).as_str(), "2000"],
)
.unwrap();
peer.tx.forget();
Expand Down
48 changes: 21 additions & 27 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ impl Shared {
}
}
}
#[derive(Clone, Debug)]
pub struct Settings {
is_serving: bool,
username: String,
accept_source: bool,
standalone: bool,
}

#[tokio::main]
async fn main() {
Expand All @@ -109,12 +116,17 @@ async fn main() {

let cloned_state = Arc::clone(&state);
let mpv_socket = start_mpv(args.accept_source, args.mpv_args);

let settings = Settings {
is_serving: args.serve,
username: args.username,
accept_source: args.accept_source,
standalone: args.standalone,
};
// Handle server
if args.serve {
let listener = TcpListener::bind(&args.address)
.await
.expect(&format!("Couldn't bind address to {}", args.address));
.expect("Couldn't bind address");
println!("Starting server on {}", args.address);
let mpv = Mpv::connect(mpv_socket.as_str()).expect("Task coudln't attach to mpv socket");
tokio::task::spawn_blocking(move || handle_mpv_event(mpv, cloned_state, args.standalone));
Expand All @@ -129,18 +141,9 @@ async fn main() {
Mpv::connect(mpv_socket.as_str()).expect("Task coudln't attach to mpv socket");

// Spawn our handler to be run asynchronously.
let cloned_settings = settings.clone();
tokio::spawn(async move {
handle_connection(
mpv,
addr,
stream,
state,
true,
"server",
false,
args.standalone,
)
.await
handle_connection(mpv, addr, stream, state, cloned_settings).await
});
}
}
Expand All @@ -157,19 +160,10 @@ async fn main() {
.await
.expect("Could not connect to server");
let mpv = Mpv::connect(mpv_socket.as_str()).expect("Task coudln't attach to mpv socket");
let communication_task = tokio::spawn(async move {
handle_connection(
mpv,
addr,
stream,
state,
false,
&args.username,
args.accept_source,
args.standalone,
)
.await
});
let communication_task =
tokio::spawn(
async move { handle_connection(mpv, addr, stream, state, settings).await },
);
let mpv = Mpv::connect(mpv_socket.as_str()).expect("Task coudln't attach to mpv socket");
tokio::task::spawn_blocking(move || handle_mpv_event(mpv, cloned_state, args.standalone));
let _ = tokio::join!(communication_task);
Expand Down Expand Up @@ -207,7 +201,7 @@ fn start_mpv(accept_source: bool, mpv_args: Vec<String>) -> String {
let mpv = mpv.unwrap();
mpv.pause().unwrap();

mpv.run_command_raw("show-text", &vec!["Connected to voyeurs", "5000"])
mpv.run_command_raw("show-text", &["Connected to voyeurs", "5000"])
.unwrap();

mpv_socket
Expand Down
4 changes: 2 additions & 2 deletions src/mpv_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ pub fn handle_mpv_event(mut mpv: Mpv, state: Arc<Mutex<Shared>>, standalone: boo
handle.block_on(s.broadcast(VoyeursCommand::Ready(false)));
}
true => {
if s.peers.values().into_iter().any(|r| !r.ready) {
if s.peers.values().any(|r| !r.ready) {
mpv.run_command_raw(
"show-text",
&vec!["Somebody isn't ready", "2000"],
&["Somebody isn't ready", "2000"],
)
.unwrap();
s.ignore_next = true;
Expand Down

0 comments on commit e73bed7

Please sign in to comment.