Skip to content

Commit

Permalink
Introduce @stream directive
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Feb 18, 2024
1 parent a6ec668 commit 6865978
Showing 1 changed file with 107 additions and 7 deletions.
114 changes: 107 additions & 7 deletions spec/Section 6 -- Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,28 @@ serial):
{data} entry of {null}.
- Let {completedDeferredFragments} be the set of Deferred Fragment root nodes
in {pendingResults} containing only completed Future nodes.
- If {erroredDeferredFragments} or {completedDeferredFragments} is not empty:
- If {releasedStreams}, {erroredDeferredFragments}, or
{completedDeferredFragments} is not empty:
- Initialize {incremental} and {completed} to empty lists.
- Let {completedStreamItems} be the set of completed Future nodes in
{pendingResults} that are children of {releasedStreams}.
- For each {future} of {completedStreamItems}:
- Let {result} be the result of that {future}.
- Let {id} be the unique identifier for the parent {stream} of {future}.
- Let {items} and {errors} be the corresponding entries on {result}.
- If {items} is not defined or {null}:
- Remove {stream} from {pendingResults}.
- Let {completedEntry} be an unordered map containing {id}.
- If {items} is {null}, set the corresponding entry on {completedEntry}
to the value for {errors} on {result}.
- Append {completedEntry} to {completed}.
- Otherwise:
- Let {incrementalEntry} be an unordered map containing {id}, {items},
and {errors}.
- Let {incremental} be a new list containing {incrementalEntry}.
- Append {incrementalEntry} to {incremental}.
- Remove {future} from {pendingResults}.
- Append all items in {futures} on {result} to {futures}.
- For each {deferredFragment} of {erroredDeferredFragments}:
- Remove {deferredFragment} and all of its descendant nodes from
{pendingResults}, except for any descendant future nodes with other
Expand Down Expand Up @@ -393,10 +413,15 @@ serial):
- If {completed} is not empty, set the corresponding entry on
{incrementalResult} to {completed}.
- For each {future} of {futures}:
- Add {future} to {pendingResults} as a node directed from each of the
{deferredFragments} it completes, adding each of {deferredFragments} to
{pendingResults} as necessary as a new node directed from its {parent},
or, if its {parent} is undefined, as a root node.
- If {future} incrementally completes a Stream:
- Add {future} to {pendingResults} as a node directed from {stream},
adding {stream} as necessary as a new node directed from each of its
{parents}, or, if it has no {parents}, as a root node.
- Otherwise, {future} incrementally completes Deferred Fragments:
- Add {future} to {pendingResults} as a node directed from each of the
{deferredFragments} it completes, adding each of {deferredFragments} to
{pendingResults} as necessary as a new node directed from its {parent},
or, if its {parent} is undefined, as a root node.
- Reset {futures} to an empty list.
- While any root Deferred Fragment nodes in {pendingResults} contain no direct
child Futures:
Expand Down Expand Up @@ -970,15 +995,90 @@ pendingResults, path, deferUsageSet, deferMap):
CompleteListValue(innerType, fieldDetailsList, result, variableValues,
pendingResults, path, deferUsageSet, deferMap):

- Initialize {items} and {futures} to empty lists.
- For each {item} of {result}:
- Initialize {futures} to an empty list.
- Let {fieldDetails} be the first entry in {fieldDetailsList}.
- Let {field} be the corresponding entry on {fieldDetails}.
- If {field} provides the directive `@stream` and its {if} argument is not
{false} and is not a variable in {variableValues} with the value {false} and
{innerType} is the outermost inner type of the list type defined for
{fieldDetailsList}:
- Let {streamDirective} be that directive.
- If this execution is for a subscription operation, raise a _field error_.
- Let {initialCount} be the value or variable provided to {streamDirective}'s
{initialCount} argument.
- If {initialCount} is less than zero, raise a _field error_.
- Let {label} be the value or variable provided to {streamDirective}'s {label}
argument.
- Let {iterator} be an iterator for {result}.
- Let {items} be an empty list.
- Let {index} be zero.
- While {result} is not closed:
- If {streamDirective} is defined and {index} is greater than or equal to
{initialCount}:
- Initialize {parents} to an empty list.
- For each {deferUsage} in {deferUsageSet}:
- Let {parent} be the entry in {deferMap} for {deferUsage}.
- Append {parent} to {parents}.
- Let {stream} be an unordered map containing {parents}, {path}, and
{label}.
- Let {streamFieldDetails} be the result of
{GetStreamFieldDetailsList(fieldDetailsList)}.
- Let {future} represent the future execution of {ExecuteStreamField(stream,
iterator, streamFieldDetailsList, index, innerType, variableValues,
pendingResults)}.
- Let initiation of {future} be triggered by the presence of {stream}
and{future} within {pendingResults} and the presence of {stream} within
{pendingResults} as a root node, or if early execution is desired,
following any implementation specific deferral, whichever occurs first.
- Append {future} to {futures}.
- Return {items} and {futures}.
- Wait for the next item from {result} via the {iterator}.
- If an item is not retrieved because of an error, raise a _field error_.
- Let {item} be the item retrieved from {result}.
- Let {itemPath} be {path} with {index} appended.
- Let {completedItem} and {itemFutures} be the result of calling
{CompleteValue(innerType, fieldDetailsList, item, variableValues,
pendingResults, itemPath)}.
- Append {completedItem} to {items}.
- Append all items in {itemFutures} to {futures}.
- Return {items} and {futures}.

GetStreamFieldDetailsList(fieldDetailsList):

- Let {streamFields} be an empty list.
- For each {fieldDetails} in {fieldDetailsList}:
- Let {field} be the corresponding entry on {fieldDetails}.
- Let {newFieldDetails} be a new Field Details record created from {field}.
- Append {newFieldDetails} to {streamFields}.
- Return {streamFields}.

#### Execute Stream Field

ExecuteStreamField(stream, iterator, fieldDetailsList, index, innerType,
variableValues, pendingResults):

- Let {path} be the corresponding entry on {stream}.
- Let {itemPath} be {path} with {index} appended.
- Wait for the next item from {iterator}.
- If {iterator} is closed, complete this data stream and return.
- Let {item} be the next item retrieved via {iterator}.
- Let {nextIndex} be {index} plus one.
- Let {completedItem} and {futures} be the result of {CompleteValue(innerType,
fields, item, variableValues, itemPath)}.
- Initialize {items} to an empty list.
- Append {completedItem} to {items}.
- Let {errors} be the list of all _field error_ raised while completing the
item.
- Let {future} represent the future execution of {ExecuteStreamField(stream,
path, iterator, fieldDetailsList, nextIndex, innerType, variableValues,
pendingResults)}.
- Let initiation of {future} be triggered by the presence of {stream}
and{future} within {pendingResults} and the presence of {stream} within
{pendingResults} as a root node, or if early execution is desired, following
any implementation specific deferral, whichever occurs first.
- Append {future} to {futures}.
- Return an unordered map containing {items}, {errors}, and {futures}.

**Coercing Results**

The primary purpose of value completion is to ensure that the values returned by
Expand Down

0 comments on commit 6865978

Please sign in to comment.