Skip to content

Commit

Permalink
feat: update mediasoup (#104)
Browse files Browse the repository at this point in the history
* feat: update mediasoup

* perf: use thread pool for worker and async executor

* chore: bump version to 0.3.0

* fix: use fixed deadlock version for mediasoup
  • Loading branch information
satoren authored Dec 15, 2021
1 parent 4b0b011 commit d75d59b
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 49 deletions.
4 changes: 2 additions & 2 deletions lib/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ defmodule Mediasoup.Worker do

@type event_type ::
:on_close
| :on_worker_close
| :on_dead
@spec event(t, pid, event_types :: [event_type]) :: {:ok} | {:error, :terminated}
def event(
pid,
lisener,
event_types \\ [
:on_close,
:on_worker_close
:on_dead
]
) do
GenServer.call(pid, {:event, [lisener, event_types]})
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule MediasoupElixir.MixProject do
def project do
[
app: :mediasoup_elixir,
version: "0.2.1",
version: "0.3.0",
elixir: "~> 1.7",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
8 changes: 6 additions & 2 deletions native/mediasoup_elixir/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mediasoup_elixir"
version = "0.2.1"
version = "0.3.0"
authors = []
edition = "2018"

Expand All @@ -11,8 +11,12 @@ crate-type = ["cdylib"]

[dependencies]
rustler = "0.22.0"
mediasoup = "= 0.8.5"
# Use the version with the deadlock fixed for mediasoup. When this https://github.com/versatica/mediasoup/issues/728 issue fixed, use the upstream repository.
mediasoup = { git = "https://github.com/satoren/mediasoup.git", rev ="4dec56ed56eb576efd78a5cce7c6bb0dfa7ec9aa" }
futures-lite = "1.12.0"
once_cell = "1.8.0"
num_cpus = "1.13.0"
async-executor = "1.4.1"
serde = { version = "1.0.130", features = ["derive"] }
serde-transcode = "1.1"
serde_json = "1.0"
Expand Down
42 changes: 17 additions & 25 deletions native/mediasoup_elixir/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod pipe_transport;
mod producer;
mod resource;
mod router;
mod task;
mod webrtc_transport;
mod worker;

Expand Down Expand Up @@ -62,45 +63,36 @@ where
T: rustler::Encoder + Send + 'static,
{
let mut my_env = OwnedEnv::new();
let builder = std::thread::Builder::new().name("ex-mediasoup".into());
let _ = builder.spawn(move || {
task::spawn(async move {
my_env.send_and_clear(&pid, |env| value.encode(env));
});
})
.detach();
}

// workarounf for future created by async block is not `Send` at send_async_nif_result
pub fn async_nif_thread_spawn<T, E, Fn, Fut>(env: Env, future: Fn) -> NifResult<(Atom, Atom)>
pub fn send_async_nif_result<T, E, Fut>(env: Env, future: Fut) -> NifResult<(Atom, Atom)>
where
T: Encoder,
E: Encoder,
Fut: future::Future<Output = Result<T, E>>,
Fn: (FnOnce() -> Fut) + Send + 'static,
Fut: future::Future<Output = Result<T, E>> + Send + 'static,
{
let pid = env.pid();
let mut my_env = OwnedEnv::new();
let result_key = atoms::mediasoup_async_nif_result();
let builder = std::thread::Builder::new().name("ex-mediasoup".into());
let _ =
builder.spawn(move || {
let result = future::block_on(future());
match result {
Ok(worker) => my_env
.send_and_clear(&pid, |env| (result_key, (atoms::ok(), worker)).encode(env)),
Err(err) => my_env
.send_and_clear(&pid, |env| (result_key, (atoms::error(), err)).encode(env)),
task::spawn(async move {
let result = future.await;
match result {
Ok(worker) => {
my_env.send_and_clear(&pid, |env| (result_key, (atoms::ok(), worker)).encode(env))
}
Err(err) => {
my_env.send_and_clear(&pid, |env| (result_key, (atoms::error(), err)).encode(env))
}
});
}
})
.detach();

Ok((atoms::ok(), result_key))
}
pub fn send_async_nif_result<T, E, Fut>(env: Env, future: Fut) -> NifResult<(Atom, Atom)>
where
T: Encoder,
E: Encoder,
Fut: future::Future<Output = Result<T, E>> + Send + 'static,
{
async_nif_thread_spawn(env, || future)
}

rustler::init! {
"Elixir.Mediasoup.Nif",
Expand Down
14 changes: 7 additions & 7 deletions native/mediasoup_elixir/src/pipe_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::consumer::ConsumerOptionsStruct;
use crate::data_structure::SerNumSctpStreams;
use crate::json_serde::JsonSerdeWrap;
use crate::producer::ProducerOptionsStruct;
use crate::{async_nif_thread_spawn, ConsumerRef, PipeTransportRef, ProducerRef};
use crate::{send_async_nif_result, ConsumerRef, PipeTransportRef, ProducerRef};
use mediasoup::data_structures::{SctpState, TransportListenIp, TransportTuple};
use mediasoup::pipe_transport::{PipeTransportOptions, PipeTransportRemoteParameters};
use mediasoup::sctp_parameters::SctpParameters;
Expand Down Expand Up @@ -108,7 +108,7 @@ pub fn pipe_transport_consume(
let transport = transport.get_resource()?;

let option = option.to_option();
async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.consume(option)
.await
Expand All @@ -127,7 +127,7 @@ pub fn pipe_transport_connect(

let option = option.clone();

async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.connect(option)
.await
Expand All @@ -144,7 +144,7 @@ pub fn pipe_transport_produce(
let transport = transport.get_resource()?;
let option = option.to_option();

async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.produce(option)
.await
Expand All @@ -160,7 +160,7 @@ pub fn pipe_transport_get_stats(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.get_stats()
.await
Expand All @@ -177,7 +177,7 @@ pub fn pipe_transport_set_max_incoming_bitrate(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, move || async move {
send_async_nif_result(env, async move {
transport
.set_max_incoming_bitrate(bitrate)
.await
Expand Down Expand Up @@ -215,7 +215,7 @@ pub fn pipe_transport_dump(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.dump()
.await
Expand Down
36 changes: 36 additions & 0 deletions native/mediasoup_elixir/src/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use async_executor::{Executor, Task};
use futures_lite::future;
use once_cell::sync::Lazy;
use std::future::Future;
use std::sync::Arc;

static EXECUTOR: Lazy<Arc<Executor<'static>>> = Lazy::new(|| {
let executor = Arc::new(Executor::new());
let thread_count = std::cmp::max(2, num_cpus::get());

for _ in 0..thread_count {
create_thread_for_executor(Arc::clone(&executor));
}
executor
});

pub fn executor() -> Arc<Executor<'static>> {
EXECUTOR.clone()
}

pub fn spawn<T>(task: T) -> Task<()>
where
T: Future<Output = ()> + Send + 'static,
{
executor().spawn(task)
}

fn create_thread_for_executor(executor: Arc<Executor<'static>>) {
let builder = std::thread::Builder::new().name("ex-mediasoup-worker".into());
let _ = builder.spawn(move || {
let _ = future::block_on(executor.run(async {
let future = future::pending();
let () = future.await;
}));
});
}
19 changes: 9 additions & 10 deletions native/mediasoup_elixir/src/webrtc_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::data_structure::SerNumSctpStreams;
use crate::json_serde::JsonSerdeWrap;
use crate::producer::ProducerOptionsStruct;
use crate::{
async_nif_thread_spawn, send_msg_from_other_thread, ConsumerRef, ProducerRef,
WebRtcTransportRef,
send_async_nif_result, send_msg_from_other_thread, ConsumerRef, ProducerRef, WebRtcTransportRef,
};
use mediasoup::consumer::ConsumerOptions;
use mediasoup::data_structures::{
Expand Down Expand Up @@ -52,7 +51,7 @@ pub fn webrtc_transport_consume(

let option: ConsumerOptions = option.to_option();

async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.consume(option)
.await
Expand All @@ -70,7 +69,7 @@ pub fn webrtc_transport_connect(
let transport = transport.get_resource()?;
let option: WebRtcTransportRemoteParameters = option.clone();

async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.connect(option)
.await
Expand All @@ -87,7 +86,7 @@ pub fn webrtc_transport_produce(
let transport = transport.get_resource()?;
let option: ProducerOptions = option.to_option();

async_nif_thread_spawn(env, || async move {
send_async_nif_result(env, async move {
transport
.produce(option)
.await
Expand Down Expand Up @@ -136,7 +135,7 @@ pub fn webrtc_transport_set_max_incoming_bitrate(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, move || async move {
send_async_nif_result(env, async move {
transport
.set_max_incoming_bitrate(bitrate)
.await
Expand All @@ -152,7 +151,7 @@ pub fn webrtc_transport_set_max_outgoing_bitrate(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, move || async move {
send_async_nif_result(env, async move {
transport
.set_max_outgoing_bitrate(bitrate)
.await
Expand All @@ -175,7 +174,7 @@ pub fn webrtc_transport_restart_ice(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, move || async move {
send_async_nif_result(env, async move {
transport
.restart_ice()
.await
Expand All @@ -191,7 +190,7 @@ pub fn webrtc_transport_get_stats(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, move || async move {
send_async_nif_result(env, async move {
transport
.get_stats()
.await
Expand All @@ -207,7 +206,7 @@ pub fn webrtc_transport_dump(
) -> NifResult<(Atom, Atom)> {
let transport = transport.get_resource()?;

async_nif_thread_spawn(env, move || async move {
send_async_nif_result(env, async move {
transport
.dump()
.await
Expand Down
3 changes: 1 addition & 2 deletions test/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ defmodule WorkerTest do
end

test "worker with wrong settings cert" do
# TODO: workaround for deadlock
# IntegrateTest.WorkerTest.worker_with_wrong_settings_cert()
IntegrateTest.WorkerTest.worker_with_wrong_settings_cert()
end

test "worker_with_wrong_settings_port" do
Expand Down

0 comments on commit d75d59b

Please sign in to comment.