Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some tips after studying your code #121

Open
shiqifeng2000 opened this issue Apr 18, 2024 · 4 comments
Open

Some tips after studying your code #121

shiqifeng2000 opened this issue Apr 18, 2024 · 4 comments
Labels
enhancement New feature or request

Comments

@shiqifeng2000
Copy link

shiqifeng2000 commented Apr 18, 2024

Hi

I've recently studying your code, rtmp part, it seems your are using a "stream hub" as the media distributor and some stream_handler to handle meta data. I see the StreamHubEvent::Publish event did the job to register the stream to hub, but why bother sending a "sender" and receive another "sender"? here's the code

 pub async fn publish_to_channels(
        &mut self,
        app_name: String,
        stream_name: String,
        pub_id: Uuid,
        gop_num: usize,
    ) -> Result<(), SessionError> {
        self.stream_handler
            .set_cache(Cache::new(app_name.clone(), stream_name.clone(), gop_num))
            .await;

        let (event_result_sender, event_result_receiver) = oneshot::channel();
        let publish_event = StreamHubEvent::Publish {
            identifier: StreamIdentifier::Rtmp {
                app_name,
                stream_name,
            },
            info: self.get_publisher_info(pub_id),
            stream_handler: self.stream_handler.clone(),
            result_sender: event_result_sender,
        };

        if self.event_producer.send(publish_event).is_err() {
            return Err(SessionError {
                value: SessionErrorValue::StreamHubEventSendErr,
            });
        }

        let result = event_result_receiver.await??;
        self.data_sender = result.0.unwrap();
        Ok(())
    }

I guess there's many alternatives, such as a tokio::sync::broadcast channel, sending the tokio::sync::mpsc::Receiver, or maybe a Arc<Rwlock/Mutex>, could you share some idea why it is designed like this?

@harlanc
Copy link
Owner

harlanc commented Apr 18, 2024

One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I want to use as few locks as possible in the project which can be achieved by using channels..

Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

@shiqifeng2000
Copy link
Author

> One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I guess an actix-actor message will well suit that

> I want to use as few locks as possible in the project which can be achieved by using channels..

I understand, locker can be blocking

> Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

A broadcast sender impl clone trait and can create another subscriber, but would drop items when it is full or lagged. For your case, unless you want to keep ALL the media data, you could simple create a broadcast channel and share/clone the sender to the stream hub. When a subscriber comes, send him the sender so that he could create a receiver.

@harlanc
Copy link
Owner

harlanc commented Apr 22, 2024

One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I guess an actix-actor message will well suit that

The implementation of the codes about publish/subscribe is indeed not clear enough, I was wondering if it could be replaced by a simple RPC framework. I will consider your suggestions. Thanks.

Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

A broadcast sender impl clone trait and can create another subscriber, but would drop items when it is full or lagged. For your case, unless you want to keep ALL the media data, you could simple create a broadcast channel and share/clone the sender to the stream hub. When a subscriber comes, send him the sender so that he could create a receiver.

The RTMP protocol is a little special: you should not send the publisher's real-time data immediately after the client's subscribe. Instead, before that ,some cached data (avc/aac sequence header) needs to be sent to client . So if we subscribe a broadcast channel, can we insert them into the channel firstly?

@harlanc harlanc added the enhancement New feature or request label Apr 26, 2024
@shiqifeng2000
Copy link
Author

One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I guess an actix-actor message will well suit that

The implementation of the codes about publish/subscribe is indeed not clear enough, I was wondering if it could be replaced by a simple RPC framework. I will consider your suggestions. Thanks.

Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

A broadcast sender impl clone trait and can create another subscriber, but would drop items when it is full or lagged. For your case, unless you want to keep ALL the media data, you could simple create a broadcast channel and share/clone the sender to the stream hub. When a subscriber comes, send him the sender so that he could create a receiver.

The RTMP protocol is a little special: you should not send the publisher's real-time data immediately after the client's subscribe. Instead, before that ,some cached data (avc/aac sequence header) needs to be sent to client . So if we subscribe a broadcast channel, can we insert them into the channel firstly?

I think it's quit possible, by not just relying to the channel way. when you publish the stream, you could just send the side data/meta/sequence header to the hub, when subscribing, you could pull all these down, invoke a send_channel_data before setting up the subscribing could do the job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants