Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure subgraph requests have access to their corresponding ExecutableDocuments #5016

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
42 changes: 35 additions & 7 deletions apollo-router/src/plugins/demand_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugins::demand_control::strategy::Strategy;
use crate::plugins::demand_control::strategy::StrategyFactory;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::register_plugin;
use crate::services::execution;
use crate::services::execution::BoxService;
Expand Down Expand Up @@ -206,9 +207,16 @@ impl<T> From<WithErrors<T>> for DemandControlError {
}
}

impl From<ValidationErrors> for DemandControlError {
fn from(value: ValidationErrors) -> Self {
DemandControlError::InvalidSubgraphQuery(value)
}
}

pub(crate) struct DemandControl {
config: DemandControlConfig,
strategy_factory: StrategyFactory,
subgraph_schemas: Arc<SubgraphSchemas>,
}

impl DemandControl {
Expand Down Expand Up @@ -237,6 +245,7 @@ impl Plugin for DemandControl {
init.subgraph_schemas.clone(),
),
config: init.config,
subgraph_schemas: init.subgraph_schemas.clone(),
})
}

Expand Down Expand Up @@ -331,6 +340,8 @@ impl Plugin for DemandControl {
if !self.config.enabled {
service
} else {
let subgraph_schemas = self.subgraph_schemas.clone();

ServiceBuilder::new()
.checkpoint(move |req: subgraph::Request| {
let strategy = req
Expand All @@ -357,11 +368,22 @@ impl Plugin for DemandControl {
})
})
.map_future_with_request_data(
|req: &subgraph::Request| {
//TODO convert this to expect
req.executable_document.clone().unwrap_or_else(|| {
Arc::new(Valid::assume_valid(ExecutableDocument::new()))
})
move |req: &subgraph::Request| {
req.subgraph_name
.as_ref()
.and_then(|subgraph| subgraph_schemas.get(subgraph))
.zip(req.operation.as_ref())
.ok_or(DemandControlError::QueryParseFailure(
"Cannot parse subgraph request due to missing subgraph schema"
.to_string(),
))
.and_then(|(schema, operation)| {
operation
.as_parsed(schema)
.map_err(DemandControlError::from)
})
.expect("must have a valid query")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should convert to an error here rather than using expect. DemandControlError::QueryParseFailure could be reused, or we could add something else.

.clone()
},
|req: Arc<Valid<ExecutableDocument>>, fut| async move {
let resp: subgraph::Response = fut.await?;
Expand Down Expand Up @@ -411,6 +433,7 @@ mod test {
use crate::plugins::demand_control::DemandControlError;
use crate::plugins::test::PluginTestHarness;
use crate::query_planner::fetch::QueryHash;
use crate::query_planner::fetch::SubgraphOperation;
use crate::services::execution;
use crate::services::layers::query_analysis::ParsedDocument;
use crate::services::layers::query_analysis::ParsedDocumentInner;
Expand Down Expand Up @@ -561,17 +584,22 @@ mod test {
async fn test_on_subgraph(config: &'static str) -> Response {
let plugin = PluginTestHarness::<DemandControl>::builder()
.config(config)
.schema(include_str!(
"cost_calculator/fixtures/federated_ships_schema.graphql"
))
.build()
.await;
let strategy = plugin.strategy_factory.create();

let ctx = context();
ctx.extensions().lock().insert(strategy);
let mut req = subgraph::Request::fake_builder()
.subgraph_name("test")
.subgraph_name("vehicles")
.context(ctx)
.build();
req.executable_document = Some(Arc::new(Valid::assume_valid(ExecutableDocument::new())));
req.operation = Some(Arc::new(SubgraphOperation::from_parsed(
Valid::assume_valid(ExecutableDocument::new()),
)));
let resp = plugin
.call_subgraph(req, |req| {
subgraph::Response::fake_builder()
Expand Down
6 changes: 3 additions & 3 deletions apollo-router/src/plugins/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ mod test {
connection_closed_signal: None,
query_hash: Default::default(),
authorization: Default::default(),
executable_document: None,
operation: None,
};
service.modify_request(&mut request);
let headers = request
Expand Down Expand Up @@ -911,7 +911,7 @@ mod test {
connection_closed_signal: None,
query_hash: Default::default(),
authorization: Default::default(),
executable_document: None,
operation: None,
};
service.modify_request(&mut request);
let headers = request
Expand Down Expand Up @@ -973,7 +973,7 @@ mod test {
connection_closed_signal: None,
query_hash: Default::default(),
authorization: Default::default(),
executable_document: None,
operation: None,
}
}

Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/query_planner/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl From<&'_ Box<next::FetchNode>> for plan::PlanNode {
requires: requires.as_deref().map(vec).unwrap_or_default(),
variable_usages: variable_usages.iter().map(|v| v.clone().into()).collect(),
// TODO: use Arc in apollo_federation to avoid this clone
operation: SubgraphOperation::from_parsed(Arc::new(operation_document.clone())),
operation: Arc::new(SubgraphOperation::from_parsed(Arc::new(
operation_document.clone(),
))),
operation_name: operation_name.clone(),
operation_kind: (*operation_kind).into(),
id: id.map(|id| id.to_string().into()),
Expand Down
7 changes: 7 additions & 0 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::BroadcastStream;
use tracing::Instrument;

use super::fetch::SubgraphSchemas;
use super::log;
use super::subscription::SubscriptionHandle;
use super::DeferredNode;
Expand Down Expand Up @@ -50,6 +51,7 @@ impl QueryPlan {
service_factory: &'a Arc<SubgraphServiceFactory>,
supergraph_request: &'a Arc<http::Request<Request>>,
schema: &'a Arc<Schema>,
subgraph_schemas: &'a Arc<SubgraphSchemas>,
sender: mpsc::Sender<Response>,
subscription_handle: Option<SubscriptionHandle>,
subscription_config: &'a Option<SubscriptionConfig>,
Expand All @@ -67,6 +69,7 @@ impl QueryPlan {
context,
service_factory,
schema,
subgraph_schemas,
supergraph_request,
deferred_fetches: &deferred_fetches,
query: &self.query,
Expand Down Expand Up @@ -100,6 +103,7 @@ pub(crate) struct ExecutionParameters<'a> {
pub(crate) context: &'a Context,
pub(crate) service_factory: &'a Arc<SubgraphServiceFactory>,
pub(crate) schema: &'a Arc<Schema>,
pub(crate) subgraph_schemas: &'a Arc<SubgraphSchemas>,
pub(crate) supergraph_request: &'a Arc<http::Request<Request>>,
pub(crate) deferred_fetches: &'a HashMap<NodeStr, broadcast::Sender<(Value, Vec<Error>)>>,
pub(crate) query: &'a Arc<Query>,
Expand Down Expand Up @@ -287,6 +291,7 @@ impl PlanNode {
context: parameters.context,
service_factory: parameters.service_factory,
schema: parameters.schema,
subgraph_schemas: parameters.subgraph_schemas,
supergraph_request: parameters.supergraph_request,
deferred_fetches: &deferred_fetches,
query: parameters.query,
Expand Down Expand Up @@ -435,6 +440,7 @@ impl DeferredNode {
let label = self.label.as_ref().map(|l| l.to_string());
let tx = sender;
let sc = parameters.schema.clone();
let subgraph_schemas = parameters.subgraph_schemas.clone();
let orig = parameters.supergraph_request.clone();
let sf = parameters.service_factory.clone();
let root_node = parameters.root_node.clone();
Expand Down Expand Up @@ -475,6 +481,7 @@ impl DeferredNode {
context: &ctx,
service_factory: &sf,
schema: &sc,
subgraph_schemas: &subgraph_schemas,
supergraph_request: &orig,
deferred_fetches: &deferred_fetches,
query: &query,
Expand Down
3 changes: 2 additions & 1 deletion apollo-router/src/query_planner/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub(crate) struct FetchNode {
pub(crate) variable_usages: Vec<NodeStr>,

/// The GraphQL subquery that is used for the fetch.
pub(crate) operation: SubgraphOperation,
pub(crate) operation: Arc<SubgraphOperation>,

/// The GraphQL subquery operation name.
pub(crate) operation_name: Option<NodeStr>,
Expand Down Expand Up @@ -403,6 +403,7 @@ impl FetchNode {
.build();
subgraph_request.query_hash = self.schema_aware_hash.clone();
subgraph_request.authorization = self.authorization.clone();
subgraph_request.operation = Some(self.operation.clone());

let service = parameters
.service_factory
Expand Down
16 changes: 12 additions & 4 deletions apollo-router/src/query_planner/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async fn mock_subgraph_service_withf_panics_should_be_reported_as_service_closed
&sf,
&Default::default(),
&Arc::new(Schema::parse_test(test_schema!(), &Default::default()).unwrap()),
&Default::default(),
sender,
None,
&None,
Expand Down Expand Up @@ -177,6 +178,7 @@ async fn fetch_includes_operation_name() {
&sf,
&Default::default(),
&Arc::new(Schema::parse_test(test_schema!(), &Default::default()).unwrap()),
&Default::default(),
sender,
None,
&None,
Expand Down Expand Up @@ -235,6 +237,7 @@ async fn fetch_makes_post_requests() {
&sf,
&Default::default(),
&Arc::new(Schema::parse_test(test_schema!(), &Default::default()).unwrap()),
&Default::default(),
sender,
None,
&None,
Expand All @@ -260,7 +263,7 @@ async fn defer() {
service_name: "X".into(),
requires: vec![],
variable_usages: vec![],
operation: SubgraphOperation::from_string("{ t { id __typename x } }"),
operation: Arc::new(SubgraphOperation::from_string("{ t { id __typename x } }")),
operation_name: Some("t".into()),
operation_kind: OperationKind::Query,
id: Some("fetch1".into()),
Expand Down Expand Up @@ -303,9 +306,9 @@ async fn defer() {
},
)],
variable_usages: vec![],
operation: SubgraphOperation::from_string(
operation: Arc::new(SubgraphOperation::from_string(
"query($representations:[_Any!]!){_entities(representations:$representations){...on T{y}}}"
),
)),
operation_name: None,
operation_kind: OperationKind::Query,
id: Some("fetch2".into()),
Expand Down Expand Up @@ -385,6 +388,7 @@ async fn defer() {
&sf,
&Default::default(),
&schema,
&Default::default(),
sender,
None,
&None,
Expand Down Expand Up @@ -493,6 +497,7 @@ async fn defer_if_condition() {
.unwrap(),
),
&schema,
&Default::default(),
sender,
None,
&None,
Expand All @@ -515,6 +520,7 @@ async fn defer_if_condition() {
&service_factory,
&Default::default(),
&schema,
&Default::default(),
default_sender,
None,
&None,
Expand Down Expand Up @@ -546,6 +552,7 @@ async fn defer_if_condition() {
.unwrap(),
),
&schema,
&Default::default(),
sender,
None,
&None,
Expand Down Expand Up @@ -667,6 +674,7 @@ async fn dependent_mutations() {
&sf,
&Default::default(),
&Arc::new(Schema::parse_test(schema, &Default::default()).unwrap()),
&Default::default(),
sender,
None,
&None,
Expand Down Expand Up @@ -1793,7 +1801,7 @@ fn broken_plan_does_not_panic() {
service_name: "X".into(),
requires: vec![],
variable_usages: vec![],
operation: SubgraphOperation::from_string(operation),
operation: Arc::new(SubgraphOperation::from_string(operation)),
operation_name: Some("t".into()),
operation_kind: OperationKind::Query,
id: Some("fetch1".into()),
Expand Down
4 changes: 4 additions & 0 deletions apollo-router/src/services/execution/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
use crate::plugins::subscription::Subscription;
use crate::plugins::subscription::SubscriptionConfig;
use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::query_planner::subscription::SubscriptionHandle;
use crate::services::execution;
use crate::services::new_service::ServiceFactory;
Expand All @@ -57,6 +58,7 @@ use crate::spec::Schema;
#[derive(Clone)]
pub(crate) struct ExecutionService {
pub(crate) schema: Arc<Schema>,
pub(crate) subgraph_schemas: Arc<SubgraphSchemas>,
pub(crate) subgraph_service_factory: Arc<SubgraphServiceFactory>,
/// Subscription config if enabled
subscription_config: Option<SubscriptionConfig>,
Expand Down Expand Up @@ -148,6 +150,7 @@ impl ExecutionService {
&self.subgraph_service_factory,
&Arc::new(req.supergraph_request),
&self.schema,
&self.subgraph_schemas,
sender,
subscription_handle.clone(),
&self.subscription_config,
Expand Down Expand Up @@ -616,6 +619,7 @@ impl ServiceFactory<ExecutionRequest> for ExecutionServiceFactory {
self.plugins.iter().rev().fold(
crate::services::execution::service::ExecutionService {
schema: self.schema.clone(),
subgraph_schemas: self.subgraph_schemas.clone(),
subgraph_service_factory: self.subgraph_service_factory.clone(),
subscription_config: subscription_plugin_conf,
}
Expand Down
8 changes: 4 additions & 4 deletions apollo-router/src/services/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::pin::Pin;
use std::sync::Arc;

use apollo_compiler::validation::Valid;
use http::StatusCode;
use http::Version;
use multimap::MultiMap;
Expand All @@ -29,6 +28,7 @@ use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
use crate::plugins::authorization::CacheKeyMetadata;
use crate::query_planner::fetch::OperationKind;
use crate::query_planner::fetch::QueryHash;
use crate::query_planner::fetch::SubgraphOperation;
use crate::Context;

pub type BoxService = tower::util::BoxService<Request, Response, BoxError>;
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct Request {
// authorization metadata for this request
pub(crate) authorization: Arc<CacheKeyMetadata>,

pub(crate) executable_document: Option<Arc<Valid<apollo_compiler::ExecutableDocument>>>,
pub(crate) operation: Option<Arc<SubgraphOperation>>,
}

#[buildstructor::buildstructor]
Expand Down Expand Up @@ -88,7 +88,7 @@ impl Request {
connection_closed_signal,
query_hash: Default::default(),
authorization: Default::default(),
executable_document: None,
operation: None,
}
}

Expand Down Expand Up @@ -152,7 +152,7 @@ impl Clone for Request {
.map(|s| s.resubscribe()),
query_hash: self.query_hash.clone(),
authorization: self.authorization.clone(),
executable_document: self.executable_document.clone(),
operation: self.operation.clone(),
}
}
}
Expand Down