Skip to content

Commit

Permalink
Introduce @stream directive
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Apr 7, 2024
1 parent 54724d9 commit 0ca18c5
Showing 1 changed file with 96 additions and 9 deletions.
105 changes: 96 additions & 9 deletions spec/Section 6 -- Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ YieldIncrementalResults(data, errors, incrementalDataRecords):
- Yield an unordered map containing {data}, {errors}, {pending}, and {hasNext}.
- For each completed child Future node of a root node in {graph}:
- Let {completedFuture} be that Future; let {result} be its result.
- If {data} on {result} is {null}:
- If {FutureCompletedWithoutData(completedFuture, result)} is {true}:
- Initialize {completed} to an empty list.
- Let {parents} be the parents of {completedFuture}.
- Initialize {completed} to an empty list.
Expand All @@ -382,8 +382,8 @@ YieldIncrementalResults(data, errors, incrementalDataRecords):
- Add each {incrementalDataRecord} of {incrementalDataRecords} on {result} to
{graph} via the same procedure as above.
- Let {incrementalDataRecordsToRelease} be the set of completed Future nodes
in {graph} completing a Deferred Fragment root node where all of the sibling
Incremental Data Records are also complete.
in {graph} either completing a Stream root node, or completing a Deferred
Fragment root node where all of the sibling Futures are also complete.
- If {incrementalDataRecordsToRelease} is empty, continue to the next
completed child Future node in {graph}.
- Initialize {incremental} to an empty lists.
Expand All @@ -401,9 +401,8 @@ YieldIncrementalResults(data, errors, incrementalDataRecords):
- Prune root nodes of {graph} containing no direct child Incremental Data
Records, as above.
- Let {hasNext} be {false} if {graph} is empty.
- Let {incrementalResult} be an unordered map containing {hasNext}.
- If {incremental} is not empty, set the corresponding entry on
{incrementalResult} to {incremental}.
- Let {incrementalResult} be an unordered map containing {incremental} and
{hasNext}.
- If {completed} is not empty, set the corresponding entry on
{incrementalResult} to {completed}.
- Let {newPendingResults} be the set of new root nodes in {graph}, promoted by
Expand All @@ -424,8 +423,22 @@ GetPending(newPendingResults):
- Append {pendingEntry} to {pending}.
- Return {pending}.

FutureCompletedWithoutData(completedFuture, result):

- If {completedFuture} incrementally completes Deferred Fragments:
- If {data} on {result} is {null}, return {true}.
- Otherwise:
- If {items} on {result} is not defined or {null}, return {true}.
- Return {false}.

GetIncrementalEntry(incrementalDataRecord, graph):

- If {incrementalDataRecord} completes a Stream:
- Let {stream} be the Stream incrementally completed by
{incrementalDataRecord}.
- Let {items} and {errors} be the corresponding entries on {result}.
- Let {id} be the unique identifier for {stream}.
- Return an unordered map containing {id}, {items}, and {errors}.
- Let {deferredFragments} be the Deferred Fragments incrementally completed by
{incrementalDataRecord} at {path}.
- Let {result} be the result of {incrementalDataRecord}.
Expand Down Expand Up @@ -973,18 +986,92 @@ CompleteListValue(innerType, fieldDetailsList, result, variableValues, path,
deferUsageSet, deferMap):

- Initialize {items} and {incrementalDataRecords} to empty lists.
- Let {index} be {0}.
- For each {resultItem} of {result}:
- Let {fieldDetails} be the first entry in {fieldDetailsList}.
- Let {field} be the corresponding entry on {fieldDetails}.
- If {field} provides the directive `@stream` and its {if} argument is not
{false} and is not a variable in {variableValues} with the value {false} and
{innerType} is the outermost inner type of the list type defined for
{fieldDetailsList}:
- Let {streamDirective} be that directive.
- If this execution is for a subscription operation, raise a _field error_.
- Let {initialCount} be the value or variable provided to {streamDirective}'s
{initialCount} argument.
- If {initialCount} is less than zero, raise a _field error_.
- Let {label} be the value or variable provided to {streamDirective}'s {label}
argument.
- Let {iterator} be an iterator for {result}.
- Let {index} be zero.
- While {result} is not closed:
- If {streamDirective} is defined and {index} is greater than or equal to
{initialCount}:
- Initialize {parents} to an empty list.
- For each {deferUsage} in {deferUsageSet}:
- Let {parent} be the entry in {deferMap} for {deferUsage}.
- Append {parent} to {parents}.
- Let {stream} be an unordered map containing {parents}, {path}, and
{label}.
- Let {streamFieldDetails} be the result of
{GetStreamFieldDetailsList(fieldDetailsList)}.
- Let {incrementalDataRecord} represent the future execution of
{ExecuteStreamField(stream, iterator, streamFieldDetailsList, index,
innerType, variableValues)}.
- Schedule initiation of execution of {incrementalDataRecord} following any
implementation specific deferral.
- Append {incrementalDataRecord} to {incrementalDataRecords}.
- Return {items} and {incrementalDataRecords}.
- Wait for the next item from {result} via the {iterator}.
- If an item is not retrieved because of an error, raise a _field error_.
- Let {item} be the item retrieved from {result}.
- Let {itemPath} be {path} with {index} appended.
- Let {completedItem} and {itemIncrementalDataRecords} be the result of
calling {CompleteValue(innerType, fieldDetailsList, item, variableValues,
itemPath)}.
- Append {completedItem} to {items}.
- Append all items in {itemIncrementalDataRecords} to
{incrementalDataRecords}.
- Increment {index} by {1}.
- Return {items} and {incrementalDataRecords}.

Note: {incrementalDataRecord} can be safely initiated without blocking higher
priority data once {stream} is released as pending.

GetStreamFieldDetailsList(fieldDetailsList):

- Let {streamFields} be an empty list.
- For each {fieldDetails} in {fieldDetailsList}:
- Let {field} be the corresponding entry on {fieldDetails}.
- Let {newFieldDetails} be a new Field Details record created from {field}.
- Append {newFieldDetails} to {streamFields}.
- Return {streamFields}.

#### Execute Stream Field

ExecuteStreamField(stream, iterator, fieldDetailsList, index, innerType,
variableValues, eventEmitter):

- Let {path} be the corresponding entry on {stream}.
- Let {itemPath} be {path} with {index} appended.
- Wait for the next item from {iterator}.
- If {iterator} is closed, complete this data stream and return.
- Let {item} be the next item retrieved via {iterator}.
- Let {nextIndex} be {index} plus one.
- Let {completedItem} and {incrementalDataRecords} be the result of
{CompleteValue(innerType, fields, item, variableValues, itemPath)}.
- Initialize {items} to an empty list.
- Append {completedItem} to {items}.
- Let {errors} be the list of all _field error_ raised while completing the
item.
- Let {incrementalDataRecord} represent the future execution of
{ExecuteStreamField(stream, path, iterator, fieldDetailsList, nextIndex,
innerType, variableValues, eventEmitter)}.
- Schedule initiation of execution of {incrementalDataRecord} following any
implementation specific deferral.
- Append {incrementalDataRecord} to {incrementalDataRecords}.
- Return an unordered map containing {items}, {errors}, and
{incrementalDataRecords}.

Note: {incrementalDataRecord} can be safely initiated without blocking higher
priority data once {items} is released.

**Coercing Results**

The primary purpose of value completion is to ensure that the values returned by
Expand Down

0 comments on commit 0ca18c5

Please sign in to comment.