Skip to content

Commit

Permalink
runtimes/js: perf tweaks
Browse files Browse the repository at this point in the history
Avoid needless task spawns
  • Loading branch information
eandre committed May 1, 2024
1 parent d3debe9 commit d99bba8
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 43 deletions.
11 changes: 4 additions & 7 deletions runtimes/js/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::{raw_api, request_meta};
use encore_runtime_core::api;
use encore_runtime_core::api::schema;
use encore_runtime_core::model::RequestData;
use napi::bindgen_prelude::spawn;
use napi::{Env, JsFunction, JsUnknown, NapiRaw, NapiValue};
use napi::{Env, JsFunction, JsUnknown, NapiRaw};
use napi_derive::napi;
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -156,7 +155,7 @@ impl PromiseHandler for APIPromiseHandler {

struct TypedRequestMessage {
req: Request,
tx: tokio::sync::mpsc::Sender<Result<schema::JSONPayload, api::Error>>,
tx: tokio::sync::mpsc::UnboundedSender<Result<schema::JSONPayload, api::Error>>,
}

pub struct JSTypedHandler {
Expand All @@ -170,7 +169,7 @@ impl api::BoxedHandler for JSTypedHandler {
) -> Pin<Box<dyn Future<Output = api::ResponseData> + Send + 'static>> {
Box::pin(async move {
// Create a one-shot channel
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

// Call the handler.
let req = Request::new(req);
Expand Down Expand Up @@ -203,9 +202,7 @@ fn typed_resolve_on_js_thread(ctx: ThreadSafeCallContext<TypedRequestMessage>) -
}
Err(err) => {
let res = handler.error(ctx.env, err);
spawn(async move {
_ = ctx.value.tx.send(res).await;
});
_ = ctx.value.tx.send(res);
Ok(())
}
}
Expand Down
8 changes: 3 additions & 5 deletions runtimes/js/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl api::TypedHandler for JSAuthHandler {
) -> Pin<Box<dyn Future<Output = api::HandlerResponse> + Send + 'static>> {
Box::pin(async move {
// Create a one-shot channel
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

// Call the handler.
let req = Request::new(req);
Expand All @@ -90,7 +90,7 @@ impl api::TypedHandler for JSAuthHandler {

struct AuthMessage {
req: Request,
tx: tokio::sync::mpsc::Sender<Result<schema::JSONPayload, api::Error>>,
tx: tokio::sync::mpsc::UnboundedSender<Result<schema::JSONPayload, api::Error>>,
}

fn resolve_on_js_thread(ctx: ThreadSafeCallContext<AuthMessage>) -> napi::Result<()> {
Expand All @@ -103,9 +103,7 @@ fn resolve_on_js_thread(ctx: ThreadSafeCallContext<AuthMessage>) -> napi::Result
}
Err(err) => {
let res = handler.error(ctx.env, err);
spawn(async move {
_ = ctx.value.tx.send(res).await;
});
_ = ctx.value.tx.send(res);
Ok(())
}
}
Expand Down
27 changes: 6 additions & 21 deletions runtimes/js/src/napi_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait PromiseHandler: Clone + Send + Sync + 'static {
pub fn await_promise<T, H>(
env: Env,
result: JsUnknown,
tx: tokio::sync::mpsc::Sender<T>,
tx: tokio::sync::mpsc::UnboundedSender<T>,
handler: H,
) where
H: PromiseHandler<Output = T>,
Expand All @@ -69,58 +69,43 @@ pub fn await_promise<T, H>(
let handler = handler.clone();
let tx = tx.clone();
let cb = env.create_function_from_closure("callback", move |ctx| {
let handler = handler.clone();
let res = match ctx.try_get::<JsUnknown>(0) {
Ok(Either::A(success)) => handler.resolve(env, Some(success)),
Ok(Either::B(_)) => handler.resolve(env, None),
Err(err) => handler.error(env, err),
};

let tx = tx.clone();
spawn(async move {
_ = tx.send(res).await;
});

_ = tx.send(res);
ctx.env.get_undefined()
})?;
cb
};

let eb = {
let handler = handler.clone();
let tx = tx.clone();
env.create_function_from_closure("error_callback", move |ctx| {
let res = match ctx.get(0) {
Ok(exception) => handler.reject(env, exception),
Err(err) => handler.error(env, err),
};

let tx = tx.clone();
spawn(async move {
_ = tx.send(res).await;
});
_ = tx.send(res);
ctx.env.get_undefined()
})?
};

then.call(Some(&result), &[cb, eb])?;
} else {
let res = handler.resolve(env, Some(result));
let tx = tx.clone();
spawn(async move {
_ = tx.send(res).await;
});
_ = tx.send(res);
}

Ok(())
};

let tx = outer_tx.clone();
inner().unwrap_or_else(|err| {
inner().unwrap_or_else(move |err| {
let res = outer_handler.error(env, err);
spawn(async move {
_ = tx.send(res).await;
});
_ = outer_tx.send(res);
});
}

Expand Down
8 changes: 3 additions & 5 deletions runtimes/js/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl PubSubSubscription {

struct PubSubMessageRequest {
req: Request,
tx: tokio::sync::mpsc::Sender<Result<(), api::Error>>,
tx: tokio::sync::mpsc::UnboundedSender<Result<(), api::Error>>,
}

#[derive(Debug)]
Expand All @@ -122,7 +122,7 @@ impl pubsub::SubscriptionHandler for JSSubscriptionHandler {
) -> Pin<Box<dyn Future<Output = Result<(), api::Error>> + Send + '_>> {
let handler = self.handler.clone();
Box::pin(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let req = Request::new(msg);
handler.call(
PubSubMessageRequest { req, tx },
Expand Down Expand Up @@ -216,9 +216,7 @@ fn resolve_on_js_thread(ctx: ThreadSafeCallContext<PubSubMessageRequest>) -> nap
}
Err(err) => {
let res = handler.error(ctx.env, err);
spawn(async move {
_ = ctx.value.tx.send(res).await;
});
_ = ctx.value.tx.send(res);
Ok(())
}
}
Expand Down
8 changes: 3 additions & 5 deletions runtimes/js/src/raw_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct RawRequestMessage {
req: Request,
resp: ResponseWriter,
body: BodyReader,
err_tx: mpsc::Sender<Result<(), api::Error>>,
err_tx: mpsc::UnboundedSender<Result<(), api::Error>>,
}

enum ResponseWriterState {
Expand Down Expand Up @@ -348,7 +348,7 @@ impl api::BoxedHandler for JSRawHandler {
};
let body = BodyReader::new(body.into_data_stream());

let (err_tx, mut err_rx) = mpsc::channel(1);
let (err_tx, mut err_rx) = mpsc::unbounded_channel();

self.handler.call(
RawRequestMessage {
Expand Down Expand Up @@ -458,9 +458,7 @@ fn raw_resolve_on_js_thread(ctx: ThreadSafeCallContext<RawRequestMessage>) -> na
}
Err(err) => {
let res = handler.error(ctx.env, err);
tokio::spawn(async move {
_ = ctx.value.err_tx.send(res).await;
});
_ = ctx.value.err_tx.send(res);
Ok(())
}
}
Expand Down

0 comments on commit d99bba8

Please sign in to comment.