Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: streaming, incremental computing #8

Open
mariusae opened this issue Oct 21, 2019 · 2 comments
Open

Proposal: streaming, incremental computing #8

mariusae opened this issue Oct 21, 2019 · 2 comments
Labels
enhancement New feature or request proposal

Comments

@mariusae
Copy link
Collaborator

This issue describes a design for adding streaming, incremental computing to Bigslice.

Bigslice provides a useful model for performing batch computing, but many uses of Bigslice could also benefit from streaming and incremental computing: some datasets are unbounded (e.g., the output of a web crawler), and others could benefit from using incremental computing as a checkpoint mechanism (for very large datasets). Incremental and streaming computing also provides a mechanism to overlap compute with I/O: for example, data processing can begin while sequencer outputs are still being uploaded. Streaming computing is also useful for monitoring purposes: for example, to highlight QC concerns at a very early stage.

The primitives in Bigslice were designed with incremental computation in mind; we aim to add incremental and streaming computation to Bigslice with few changes to its core.

The main idea is to endow Bigslice tasks with temporal sharding: as well as sharding by data ("space"), we also shard by discrete intervals of time ("epochs"). By so doing, we can effectively partition unbounded inputs in a user-controlled manner (e.g., windowing in fixed-time intervals, or windowing by data sizes), while providing incremental and streaming computing semantics on top of Bigslice with minimal changes to its core runtime semantics.

Computing tasks with time sharding requires few changes to the core APIs in Bigslice. We modify Slice to account for temporal sharding directly, and Reader is modified to provide a checkpointing mechanism, discussed further below.

type Slice interface {
	// (The rest of interface Slice is identical.)

	// NumEpoch returns the number of epochs of data provided
	// by this Slice operation.
	NumEpoch() int
	
	// Reader returns a Reader for a shard and epoch of this Slice. The
	// reader itself computes the shard's values on demand. The caller
	// must provide Readers for all of this shard's dependencies,
	// constructed according to the dependency type (see Dep). The
	// dependencies provided are for the same shard and epoch.
	Reader(shard, epoch int, deps []sliceio.Reader) sliceio.Reader
}

With these modifications, we can subtly alter the execution model so that incremental, streaming computing is treated as a superset of Bigslice's current batch-oriented model. When a Reader is instantiated from a Slice, it is provided with readers to its dependencies that represent the same epoch. Incremental computation requires no additional changes to the operators; streaming computing requires that some operators emit data only for those records that would be emitted for that epoch. For example, a streaming join emits records only for those keys present in the current epoch, while an incremental join must emit all records. Likewise, a windowed reduce must store enough state to maintain a window of values for which values accumulated in the window are emitted.

We now discuss the implications of this model for the Bigslice runtime. First, since some operators are stateful, they must be able to checkpoint their state. We provide this by allowing a sliceio.Reader to implement an interface that lets it persist state to an underlying storage mechanism state managed by Bigslice:

// Checkpointer can be implemented by stateful sliceio.Readers if 
// it wishes for the runtime to maintain state.
type Checkpointer interface {
	// Save checkpoints the state of this reader to the provided
	// Storage.
	Save(Storage) error

	// Restore restores the state of this reader from the provided
	// storage, as checkpointed by Save.
	Restore(Storage) error
}

For example, a streaming join may use this interface to persist a mapio table to perform streaming joins. Checkpoints are managed by epoch: they are always restored before processing an epoch, and saved on completion.

Time sharding changes the meaning of a task's output. It is now indexed on (task, epoch, shard) instead of just (task, shard). The task state is also changed to include its current epoch (starting at 0). A task at epoch E indicates that outputs for epochs e<E are available; the output for epoch E is available only if the task state is TaskOk.

Finally, task scheduling must change to accomodate time sharding. The basics of Bigslice's current task scheduling does not change: tasks maintain a single state, and their outputs are tracked by the executor. (Though the outputs are indexed by epoch as well, as described above.)

When evaluating a task graph, we also maintain a target epoch E: the role of task evaluation is to get the frontier tasks to an epoch e>=E. For simplicity, we assume that graph evaluation is monotonic in epochs: we don't go back in time, though there's nothing in the model that would prohibit this, and it could be implemented later. It is also possible to have overlapping evaluations of multiple epochs; this is supported by simply running multiple evaluations concurrently with different target epochs. Thus, task evaluation proceeds as it does today, but nodes are considered ready only if its dependency nodes are at epoch e>=E. Evaluation of a task is performed step-wise by epoch: a task at epoch e is evaluated to epoch e+1. This is also a restriction that can be removed later.

Note that this model also allows seamless integration of batch computing with streaming computing: A batch computing Slice would set NumEpoch() to 1; its dataset would be computed once and be seamlessly integrated with streaming Slices.

@mariusae mariusae added enhancement New feature or request proposal labels Oct 21, 2019
@cosnicolaou
Copy link
Collaborator

I think I can envision how this would work in practice, namely, I can tradeoff the efficiency of processing larger batches to gain more streaming/pipelining, which I think is totally appropriate. How would operations like cogroup work though? Can they still be incremental, I would imagine not?

@mariusae
Copy link
Collaborator Author

I think I can envision how this would work in practice, namely, I can tradeoff the efficiency of processing larger batches to gain more streaming/pipelining, which I think is totally appropriate. How would operations like cogroup work though? Can they still be incremental, I would imagine not?

They can be: cogroup would checkpoint its state as, e.g., a mapio sstable; it can then emit rows for any newly modified keys.

There is also the opportunity, later, to introduce a notion of a "diff+patch" approach, where operators emit fine-grained changes that can be reconstructed downstream. But this requires a fairly different data model, though it could nevertheless be built on top of this proposal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request proposal
Projects
None yet
Development

No branches or pull requests

2 participants