Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channels concept #939

Open
tetron opened this issue Mar 23, 2022 · 7 comments
Open

channels concept #939

tetron opened this issue Mar 23, 2022 · 7 comments

Comments

@tetron
Copy link
Member

tetron commented Mar 23, 2022

Design sketch for a CWL "channel" type.

Motivation

  • Be able to trigger downstream computation when an upstream computation hasn't finished yet, but has produced partial results
  • To decouple dependencies that cross workflow boundaries (such as passing a channel to a subworkflow and providing a value later) e.g. https://cwl.discourse.group/t/deferred-input-for-scattered-subworkflow/484
  • To provide a model where external events / triggers not known at workflow start can be incorporated into workflow execution
  • To facilitate translation from other workflow languages that are channel based, e.g. NextFlow

Model

A channel is an asynchronous queue of items. It has two logical variables

  • contents: empty or non-empty
  • state: open or closed

The channel has three operations

  • get: get the next item in contents (last in first out order)
  • put: add an item to contents
  • close: close the channel for further writing
empty non-empty
open put: allowed; get: blocks until content available or channel is closed put: allowed; get: next item in queue
closed put: not allowed; get: processing ends put: not allowed; get: next item in queue

Channels are never explicitly created, closed, written to or read within the workflow. This model exists only to inform the behavior of workflow steps that produce or consume channel objects.

type: channel
items: (channel content type, e.g. string, File)

Passing a channenl to a workflow step

When a channel is passed to a workflow step, it checks the type of the parameter on the run step.

  • If the parameter is a regular type (string, File, etc) it blocks until all the contents are available, and produces an array.
  • If the parameter is a channel type, it passes a reference to the channel

Use in scattering

A channel object can be produced by a scatter operation:

stepOutputType: one of "normal" or "channel"

When "stepOutputType: channel", then a scattered workflow step immediately produces channels for all its outputs. Each individual scatter step puts its result into the channel as it becomes available. When all scatter steps complete, the channel is closed.

A workflow step can "scatter" over a channel.

This will start a scatter instance for each item in the channel, until the channel is closed and empty.

Scattering over multiple channels follow similar dot product / cross product behavior as for arrays.

Channels could also be produced by the proposed iterative loop feature.

Single item channels

type: channel
items: (channel content type, e.g. string, File)
singleItem: true

If singleItem is true, the channel will only ever have at most one item. The first "put" closes the channel.

A non-scattered step can produce channels with stepOutputType: "channel"

On a non-scattered workflow step, this produces results immediately as channels with singleItem: true

When a channel with singleItem: true is passed to a workflow step, it checks the type of the parameter on the run step.

  • If the parameter is a regular type (string, File, etc) it blocks until the item is available, and passes that
  • If the parameter is a channel type, it passes a reference to the channel

TBD

What happens if a channel is passed to two downstream workflow steps. Are the items copied to both, do they race to get the items, or is this an error?

The deferred input case described here https://cwl.discourse.group/t/deferred-input-for-scattered-subworkflow/484 favors the "items copied" pattern.

Proposed model for items copied pattern: when calling get() you provide a "get id" for that reader instance. Each unique id starts at position 0 and reads to close.

This also more explicitly models the channel as an array whose final size is not known when it is created, but facilitates the "automatic behavior" conversion of a channel to an array when connected to an array type input.

@tom-tan
Copy link
Member

tom-tan commented Mar 24, 2022

In my understanding, it is to support lazy range of strings, Files and other objects.
If so, what is the difference between a channel of string and File with streamable: true, for example?

How about extending a concept of streamable to other types instead of introducing channel types?

@GlassOfWhiskey
Copy link
Collaborator

I agree that channels and streamable contents have something in common, and this is why we probably should call them streams instead of channels. Nevertheless, this new data type is much more flexible than the streamable feature, as it can

  • Represent infinite (i.e. explicitly terminated) data containers;
  • Do explicit operations on data (combinations with dot/cross prodcuts, filtering with conditions, but I envision also new operators like windowing for example);

In my opinion, we could think to extend streamable as a quick way to specify channels as array optimizations, but then make schema salad to automatically compile these directives to channel type ports.

@tetron
Copy link
Member Author

tetron commented Mar 24, 2022

In my understanding, it is to support lazy range of strings, Files and other objects. If so, what is the difference between a channel of string and File with streamable: true, for example?

How about extending a concept of streamable to other types instead of introducing channel types?

The streamable flag for file types is a byte stream intended to correspond directly to (and be implemented by) a Unix pipe, and is specific to the concrete invocation of command line tools.

The channel proposal is a concept within the workflow runner, operating at the level of communication between workflow steps, which is a level of abstraction above files and pipes and invoking concrete commands.

Because it's different from streamable that suggests to me that it is actually more important to give it a different name to avoid confusion. Channels are not byte streams.

@GlassOfWhiskey
Copy link
Collaborator

The CWL Channel type

Resuming briefly, this PR advocates the introduction of a new CWLType called channel<T>, which represents a stream of elements of type T with these characteristics:

  • Its length is potentially unknown (theoretically, it could also be infinite). This means that a channel implementation requires explicit termination, but this does not affect the workflow language;
  • Contrary to arrays, the elements of a channel are received as soon as they are available;
  • To ensure reproducibility, I think we should also preserve the order of the elements. Unordered channels can improve performance whenever the order does not matter, but this is probably not particularly useful in a scientific workflow.

Channel sources

As stated by @tetron in the initial proposal, channels can be generated by scatter and Loop steps. Since the combination of loop and scatter directives in the same step is not allowed, we can analyse them separately.

I propose to reuse the outputMethod field of the loop proposal to define a channel output type. This field will be different in the two cases. For loops, we will have:

symbol description
last Default. Propagates only the last computed element to the subsequent steps when the loop terminates.
all Propagates a single array with all output values to the subsequent steps when the loop terminates.
channel Propagates a channel with all output values to the subsequent steps after every loop iteration.

Note that the channel directive substitutes the original all_propagate directive. For the scatter directive, we will instead have:

symbol description
gather Default. Propagates an array with all output values to the subsequent steps when the step terminates.
channel Propagates a channel with all output values to the subsequent steps when the step terminates.

Note that if a channel preserves the order of elements, it can be seen as an incremental view of the original array.

Channel ports

Since channel objects can be propagated, both Workflow and WorkflowStep objects can list channel elements among their inputs and outputs. However, the appealing option to let channels be passed to or produced by CommandLineTool and ExpressionTool objects is dangerous, and I suggest disallowing this feature. The reasons are the following:

  • Passing a channel to a CommandLineTool (typically a shell script) is ambiguous. The first interpretation that comes into mind is that the CommandLineTool is a streaming application which receives data from the input channel to produce outputs. This requires that the command is launched once at the beginning and continues receiving messages as the workflow proceeds (as it happens now for the nodejs process receiving expressions to evaluate). However, this hides many assumptions:

    • CommandLineTool, which now is generic, must target an application of a specific kind;
    • The CWL runtime must be able to detect termination of this application, which at the moment cannot be encoded in the CWL model.
  • Receiving a channel from a CommandLineTool is even worse. It requires the CWL runtime to monitor the application continuously, or even worse it requires the application to contact the CWL runtime directly.

    • The first option is computationally-heavy and application-specific, as the CWL runtime must know what to monitor. Again there is no way for the user to specify this behaviour with CWL at the moment;
    • The second option is terrible, as it breaks the host/coordination separation of concerns and requires bidirectional connections between CWL runtime and application executor, which is not always possible (e.g. Docker containers or Virtual Machines could not be able to contact the host node).
  • Other interpretations of passing a channel to a CommandLineTool are possible. For example, we could extend the inputBinding field to extract portions of data from the channel. However, I think this is not the right place to manipulate a channel, as it does not allow the static type-checking provided by the CWL Schema.

For these reasons, I suggest disallow CommandLineTool objects from receiving channel data. Channels must be converted into something else before being passed to a CommandLineTool. The following section discusses the channel sinks that do this kind of transformation.

Furthermore, I suggest to disallow the possibility to return a channel<T> as the output of a CWL execution, as it introduces subtleties in the termination process. Instead, a channel should be gathered into an array (either manually or automatically by the runtime, but in a clear and standardised way) prior to terminate the workflow execution.

Channel sinks

Since channel data can only be produced by WorkflowStep elements, I suggest that they should also be transformed only in WorkflowStep elements, for the sake of consistency. The alternative would be to allow also Workflow and CommandLineTool objects to manipulate channels. This is an optimisation that could be discussed.

The first way to manipulate a channel, which has already been discussed by @tetron, is a scatter directive. A scatter receiving a channel behaves similarly to a scatter that receives an array, so nothing changes. In case of multiple scatter inputs, we should probably allow also array + channel products.

The proposed syntax can also transform a channel back into an array. This could be useful to include barriers in workflows, i.e. when a step must wait for all inputs before executing. This can be achieved by using a WorkflowStep with a scatter directive on a channel input, a gather-type outputMethod and an inner logic that forwards the data from inputs to outputs. However, this is both cumbersome and quite limited.

Indeed, there could be steps that do not need the entire array of elements but only a portion of it. For example, a step could process data as long as they come, but for each new element, it needs the whole history up to that point (think, for example, to Kahn process network). Alternatively, a step could always need the last n elements to perform a moving average. To support these scenarios, I propose to add a viewType directive to the WorkflowStepInput object. The list of supported methods must be discussed carefully, as there can be a lot of valuable patterns here. I propose a few of them that I consider necessary:

symbol description
none Default. Propagates the channel as is to the inner steps.
array Propagates an array with all the channel inputs to the inner steps. It blocks until the channel termination signal is received.
window Propagates an array with a windowed view of the channel to the inner steps.

The window mode needs further specification. This can be passed through an additional windowSize parameter of type int, defaulting to 1 (an array with a single element). Plus, a value of -1 or a special string unbounded (to be discussed) could represent a window with the entire history up to the current element. Finally, if we want to be as generic as possible, windowSize could also contain an Expression evaluated on the step context to allow for variable-size windows. More complex patterns can be obtained by combining a viewType with an internal ExpressionTool.

Intuitively, the viewType directive should be evaluated before pickValue, allowing to rely on the pickValue directive to filter out null values. If there is a scatter directive, it will be evaluated after pickValue so it does not interfere with the viewType evaluation. However, the linkMerge evaluation with the current supported values merge_nested and merge_flattened could be problematic in the case of multiple channel inputs. Probably the best solution would be to disallow linkMerge with channels and evaluate viewType and all inputs before evaluating linkMerge. This will recover a situation where linkMerge is evaluated on arrays, but it could lead to an unexpected interpretation (if the user expects to merge the entire channels). This is a delicate aspect that should be discussed carefully.

@tetron
Copy link
Member Author

tetron commented Jun 21, 2022

I propose that a channel object is represented at the CWL level (in expressions) this way:

{
  "class": "Channel",
  "items": "string",
  "id": "_:123456"
}

Where items is the type of items in the channel and id is a unique runtime id.

To model "get" operations, I propose a "get" operation consists of a "channel id" and a "reader id". Each distinct reader id starts at the beginning, so two readers on the same channel do not interfere with one another (the "broadcast" pattern).

I'd like to suggest that a CommandLineTool can be allowed to accept a channel as input, but only in the plain json data form above, it can't do anything with it except print it out or pass it through.

By analogy with File objects, where the file path is just a handle that you use to request data from the operating system, in the future we could introduce some kind of API where you could exchange the channel id for a pipe or socket where you read or write streaming data, but that should be out of scope for this initial design.

The window concept is not something I had thought about. I do think being able to go back and forth between arrays and channels is going to be important, and the window concept seems like a useful generalization of "collect all channel items into an array" or "emit each array item into a channel".

@tetron
Copy link
Member Author

tetron commented Mar 27, 2023

@kinow and I had a discussion about using CWL for climate models, and the need to be able to send events between processes that haven't actually finished yet. This means, building on the channel concept, a running process itself should be able to send and receive channel messages to/from the workflow engine.

I wanted to make sure I recorded a couple of ideas for communication protocols:

  • Use HTTP. Each channel gets a URL which is an endpoint to communicate with the workflow engine. The URL is passed to the command line tool when it is launched. The command line tool can use GET requests to fetch channel messages and POST to send channel messages.
  • Use file system. Each channel corresponds to a file on a filesystem that is shared between the tool and the workflow engine. The file path is passed to the command line tool which it is launched. The command line tool or workflow engine reads the file to fetch messages and appends to the file to send messages. The format of the file is a multi-part YAML document, where each message is separated by a line with three dashes ---. Example:
message1
---
message2
---
message3
---

@tetron
Copy link
Member Author

tetron commented Mar 28, 2023

Related, for fetching/posting dynamic channel events, an implementation-specific program with a standard name like cwl-channel-event could be provided which would then communicate back to the WMS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants