Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: row streaming #9

Open
mariusae opened this issue Nov 7, 2019 · 0 comments
Open

Proposal: row streaming #9

mariusae opened this issue Nov 7, 2019 · 0 comments
Labels
enhancement New feature or request proposal

Comments

@mariusae
Copy link
Collaborator

mariusae commented Nov 7, 2019

This issue describes a design for implementing row-level streaming in Bigslice.

Bigslice streams all of its data during processing in micro-batches. For example, a bigslice.Map operator is given a vector of values over which to apply the operation, and fills a downstream vector of processed values.

Operators like bigslice.Flatmap, which can emit multiple rows for each input row, implement internal buffering if the user-returned data exceeds the requested chunk sizes. Likewise, bigslice.Cogroup, which implements data joining, treats joined data as a single row—i.e., each row contains within it all rows that match the join criteria.

This is limiting: namely, we cannot perform joins where the number of rows for a key exceeds what can fit in memory. (This is exacerbated by the fact that Bigslice itself operates on batches of rows.) We also cannot perform flat-map operations where the result of a computation for a single row does not fit in memory.

In other words, streaming in Bigslice is only “up to” rows.

We propose to repair this by:

  1. providing a generalized data processing function that permits streaming;
  2. changing the underlying data chunking to allow for streaming.

Generalized streaming operator

We propose a generalized streaming operator: Each. Each is invoked for each row of a slice. Its inputs are the slice columns. The last (optional) argument of each is an “emit” function that produces rows downstream. For example, the following acts as a combined map and filter: it filters out odd values, and emits the column “even” to each row.

bigslice.Each(slice, func(value int, emit func(string, int)) {
	if value%2 == 0 {
		emit(“even”, value)
	}
})

Functions passed to Each can also accept streams. These are functions that act as scanners to underlying streams of values.
For example, if we co-grouped the above slice, we could access the stream of values as follows, emitting their sum, producing a slice of ints.

bigslice.Each(slice, func(key string, values func(*int) bool, emit func(int)) {
	var sum, datum int
	for values(&datum) {
		sum += datum
	}
	emit(sum)
})

Note that this is a streaming equivalent of

bigslice.Map(slice, func(key string, values []int) int {
	var sum int
	for _, datum := range values {
		sum += datum
	}
	return sum
})

For complex co-groups, each co-grouped slice corresponds to a single streaming reader. For example:

slice1 := // Slice<int, string, string>
slice2 := // Slice<int, float64, int>
slice := bigslice.Cogroup(slice1, slice2)
slice = bigslice.Each(slice, func(key int, first func(*string, *string) bool, second func(*float64, *int) bool, emit func(…)) { … })

Streaming readers, data layout

The above API can be implemented using the current reader APIs, but the implementation would have to buffer (and perhaps spill to disk). This can be addressed by introducing nested readers: a sliceio.Reader would be amended to support sub-streams:

type Reader {
    // ...
    Child(index int) Reader
}

the new Child method would return a reader to an indexed sub-stream. The physical layout of output files would remain as chunks of rows, but would be prefixed by the stream index. Any stream reference must occur after the full sub-stream has been materialized. This allows a reader to, for example, spill a sub-stream to disk, and re-read it on demand.

@mariusae mariusae added enhancement New feature or request proposal labels Nov 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request proposal
Projects
None yet
Development

No branches or pull requests

1 participant