Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-thread bug when using usrsctp in N Worker threads in Rust #1352

Open
ibc opened this issue Mar 5, 2024 · 1 comment · May be fixed by #1353
Open

Multi-thread bug when using usrsctp in N Worker threads in Rust #1352

ibc opened this issue Mar 5, 2024 · 1 comment · May be fixed by #1353

Comments

@ibc
Copy link
Member

ibc commented Mar 5, 2024

After investigating it in depth we have found a bad design in DepUsrSCTP class that allows a thread A make calls directly in thread B. This explains the crash reported in issue #1320.

Overview of the usrsctp library integration in mediasoup

usrsctp was integrated in mediasoup once it merged the "Single thread" feature (PR sctplab/usrsctp#339). That PR added the usrsctp_init_nothreads() API to run usrsctp in mono thread mode, meaning that usrsctp no longer runs its own timer into an internal thread but instead exposes an API for the application to be aware of when it should "ping" usrsctp so it can send pending SCTP messages (basically SCTP retransmissions due to non received SCTP ACK from the remote endpoint).

The way we integrated it was by wrapping the usrsctp API within the DepUsrSCTP and SctpAssociation class in mediasoup.

It's important to notice that this was designed having mediasoup for Node usage, this is: each mediasoup worker runs in a separate process with a single thread. From now on, this overview section describes the behavior in mediasoup for Node:

  1. The application creates a Worker so a new process is created with a single libuv loop.
  2. During the Worker creation, DepUsrSCTP::ClassInit() is created which calls usrsctp_init_nothreads() by passing a static onSendSctpData() callback as argument, which will be later invoked by usrsctp when it needs to send SCTP retransmissions to remote endpoints.
  3. The the Worker C++ constructor calls to DepUsrSctp::CreateChecker() which creates a singleton instance of the DepUsrSCTP::Checker class which is stored as a static member in DepUsrSCTP. Such a Checker inherits from TimerHandler since it's intended to "ping" usrsctp every 10ms so usrsctp can send pending SCTP retransmissions.
  4. Of course, since mediasoup for Node launches a separate process for each mediasoup worker, there is no multi threading at all.
  5. When the first XxxxTransport with SCTP support is created in the worker, its SctpAssociation instance calls to DepUsrSCTP::RegisterSctpAssociation(this) to register itself in DepUsrSCTP. If it's the first SCTP association then the Checker is started, meaning that it runs a periodic timer every 10ms.
  6. So every 10ms usrsctp_handle_timers() is called and usrsctp may invoke the onSendSctpData callback to send pending SCTP retransmissions. In such a callback, the address of the associated SctpAssociation is given so we call sctpAssociation->OnUsrSctpSendSctpData(data, len) to send the SCTP data to the endpoint.

There is a single thread in the whole process (this is mediasoup for Node, remember) so everything is good. And it doesn't matter that the Node app creates many Workers since each Worker is a separate process with a single thread.

Problem in Rust with multi threaded Workers

When in mefiasoup for Rust, each Worker doesn't run in a separate process but in a separate native thread. However usrsctp has global state so problems start showing up. Here an example:

  1. The Rust app creates first Worker 1 which creates a new thread 1 with its own libuv loop 1.
  2. DepUsrSCTP::ClassInit() is called. Here a GlobalInstances static member was added which is incremented every time ClassInit() is called. If it was 0 then usrsctp_init_nothreads() is called by passing the static onSendSctpData callback as argument.
  3. Then when the first SctpAssociation is created, DepUsrSCTP::RegisterSctpAssociation() is called from it. It increments a static numSctpAssociations member and, if it was 0, then the Checker is started.
  4. Here the thing: The Checker::Start() method runs a periodic timer. In which thread? In the thread from where the method was called. In this case the thread 1 and its associated libuv loop 1.
  5. But later the Rust app may create more Workers. It creates Worker 2 which creates a new thread 2 with its own libuv loop 2.
  6. DepUsrSCTP::ClassInit() is called but since GlobalInstances static member was not 0 it won't call usrsctp_init_nothreads() again.
  7. Then the first SctpAssociation in Worker 2 is created, DepUsrSCTP::RegisterSctpAssociation() is called from it. It increments a static numSctpAssociations member and, since it was not 0, the Checker is not started again (it's already started).
  8. Now there are SCTP traffic in both Worker 1 and Worker 2. Let's imagine there is packet loss so some SCTP messages created by usrsctp didn't reach the remote endpoints.
  9. Eventually the periodic timer (**which is running in thread 1 under libuv loop 1 under Worker 1) fires, so usrsctp_handle_timers() is called.
  10. usrsctp_handle_timers() checks pending SCTP messages to be sent or resent and invokes the onSendSctpData() static callback. Again: such a callback is invoked within thread 1.
  11. Such a callback may be called multiple times and within its function body it checks the associated StcpAssociation to which the SCTP message should be sent.
  12. Problem here is that such a SctpAssociation could belong to a WebRtcTransport in Worker 2. In that case, it calls sctpAssociation->OnUsrSctpSendSctpData(data, len) from thread 1, but such a SctpAssociation belongs to thread 2 (which runs the libuv loop 2). This is a violation of libuv design since no other threads should directly invoke libuv methods on another thread. And it's also a problem in mediasoup C++ code since no we have thread 1 and thread 2 invoking methods in parallel in mediasoup instances in Worker 2. Memory corruption or whatever can happen in this case.

So this is the problem.

Solution

  1. When the first Worker is created (and hence its thread 2) DepUsrSCTP::ClassInit() its called and it must create a separate thread X, create a separate libuv loop X into it, call usrsctp_init_nothreads(onSendSctpData) from that thread X and create a Checker singleton in that thread X.
  2. Later more Workers could be created (so we have thread 2, 3, etc).
  3. When the first SctpAssociation is created in any of those Workers and it calls DepUsrSCTP::RegisterSctpAssociation(), such a method calls DepUsrSCTP::checker->Start() as usual, but it should use uv_async_send() to make such a Start() method run within the thread X (instead than running it within the callling thread 1 or 2 or whatever).
  4. Eventually the static callback onSendSctpData() will be called by usrsctp (from different threads X, 1, 2, etc).
  5. Within onSendSctpData() callback, we get the target SctpAssociation and from it we need to get its associated libuv loop, or perhaps create a uv_async_t handle within the previously called DepUsrSCTP::RegisterSctpAssociation(), and retrieve it among the SctpAssociation instance itself.
  6. And we should use uv_async_send() to call sctpAssociation->OnUsrSctpSendSctpData(data, len) within the thread associated to that uv_async_t handle. This way, each call to SctpAssociation::OnUsrSctpSendSctpData() is guaranteed to be called within the same thread in which such a SctpAssociation lives.
  7. When DepUsrSCTP::ClassDestroy() is called by the last remaining Worker (so GlobalInstances was 1), we must again use uv_async_send() to invoke usrsctp_finish() and call checker->Stop() and should also destroy/close/free the thread X. All those things will be created again if a new Worker is created later.
@ibc
Copy link
Member Author

ibc commented Mar 5, 2024

Usage of uv_async stuff should be easy:

  1. The SctpAssociation class should have a new uv_async_t* uvAsyncHandle private member.
  2. The SctpAssociation class should have a new public getter uv_async_t* GetAsyncHandle() method that returns this->uvAsyncHandle.
  3. The SctpAssociation should expose a public method InitializeSyncHandle(uv_async_cb callback) which internally should call uv_async_init(DepLibUV::GetLoop(), this->uvAsyncHandle, callback).
  4. We store each SctpAssociation in the existing DepUsrSCTP::mapIdSctpAssociation map (nothing new here).
  5. In addition to that, DepUsrSCTP::RegisterSctpAssociation should call sctpAssociation->InitializeSyncHandle(static_cast<uv_async_cb>(xxxx)) where xxxx is a uv_async_cb callback that somehow runs the mutex to read data and len in a new map associated to the StcpAssociation and calls sctpAssociation->OnUsrSctpSendSctpData(data, len) and then remove that entry from that map. Or similar.
  6. The onSendSctpData() static function, once it retrieves the associated SctpAssociation, should call auto* uvAsyncHandle = GetAsyncHandle() on it, and then invoke uv_async_send(uvAsyncHandle).
  7. But this should only be done in Rust since in Node it's not needed at all.

That's all. Then something similar should be done in ClassInit() to initialize usrsctp and the Checker singleton within the thread X.

ibc added a commit that referenced this issue Mar 5, 2024
**WIP**

Fixes #1352

### Details

- Basically as described in the ticket. But not everything is done at all.
- Also, I'm testing this in Node by using UV async stuff (which doesn't make sense in mediasoup for Node but anyway).

### TODO

- None of these changes should take effect when in Node, so we need to pass (or to NOT pass) some `define` only from Rust to enable this in the C++ code. We don't want to deal with UV async stuff when in Node because it's not needed at all, so let's see how to do it.

- Missing thread X to initialize usrsctp and run the `Checker` singleton. And many other things.

- Crash when a `SctpAssociation` is closed. I think it's because somehow the `onAsync` callback is invoked asynchronously (of course) so when it calls `sctpAssociation->OnUsrSctpSendSctpData()` it happens that such a `SctpAssociation` has already been freed. Not sure how to resolve it. Here the logs:
  ```
  mediasoup:Transport close() +18s
  mediasoup:Channel request() [method:ROUTER_CLOSE_TRANSPORT] +8s
  mediasoup:Producer transportClosed() +19s
  mediasoup:DataProducer transportClosed() +18s
  mediasoup:DataProducer transportClosed() +0ms
  mediasoup:Transport close() +1ms
  mediasoup:Channel request() [method:ROUTER_CLOSE_TRANSPORT] +1ms
  mediasoup:Consumer transportClosed() +19s
  mediasoup:DataConsumer transportClosed() +18s
  mediasoup:DataConsumer transportClosed() +1ms
  mediasoup:Channel [pid:98040] RTC::SctpAssociation::ResetSctpStream() | SCTP_RESET_STREAMS sent [streamId:1] +1ms
  mediasoup:Channel request succeeded [method:ROUTER_CLOSE_TRANSPORT, id:39] +0ms
  DepUsrSCTP::onAsync() | ---------- onAsync!!
  DepUsrSCTP::onAsync() | ---------- onAsync, sending SCTP data!!
  mediasoup:Channel Producer Channel ended by the worker process +1ms
  mediasoup:ERROR:Worker worker process died unexpectedly [pid:98040, code:null, signal:SIGSEGV] +0ms
  ```
@ibc ibc linked a pull request Mar 5, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging a pull request may close this issue.

1 participant