Skip to content

Releases: twitter/scalding

Execution.flatMap(polished)

02 Dec 18:24
Compare
Choose a tag to compare

We are very excited about the hottest scalding yet!

Better join syntax

In the typed API you no longer need to type, “.group” before a join or in the argument to a join. Using a join method seems an explicit enough marker that you are crossing a map/reduce boundary, so this restriction is removed. It is still needed before a reduce (as TypedPipe.sum means a total sum, where as .group.sum means for each key group, sum the values). So from now on:

    pipe1.join(pipe2).join(pipe3)

is perfectly correct.

While the above runs 1 map/reduce job, the type of the values is a nested tuple: ((v1, v2), v3). In same cases for small joins this is fine, but for large joins this can be a pain. We added the MultiJoin object flatten tuples as you might expect:

    MultiJoin(pipe1, pipe2, pipe3) : CoGrouped[K, (V1, V2, V3)]
    MultiJoin.left(pipe1, pipe2, pipe3) : CoGrouped[K, (V1, Option[V2], Option[V3])]
    MultiJoin.outer(pipe1, pipe2, pipe3) : CoGrouped[K, (Option[V1], Option[V2], Option[V3])]

Hopefully this makes joining even cleaner and more powerful than it was in scalding 0.11.

TypedPipe is immutable and reusable

TypedPipe is now immutable. This means you can pass TypedPipe (or Grouped or any object in the Typed API) between jobs safely. When we improved the REPL, we had to handle the fact that some parts of the job may be run independently. To make the REPL behave as you expect, we had to make a fake immutable version behind the scenes. In this release, TypedPipe itself is immutable so the REPL becomes much simpler and other applications, such as passing TypedPipes between jobs or executing them in loops, becomes possible.

Execution[T]: a composable way to write Scalding Jobs

Have you ever wanted to convert a TypedPipe into an Iterator or List? Now you can:

    val myPipe: TypedPipe[T] = getPipe
    val iterable: Iterable[T] = myPipe.toIterableExecution.waitFor

Since the beginning, scalding.Job has had a primitive way to start a follow up job after the current one using Job.next. This had a number of issues. First, since TypedPipes could not be passed between jobs, the user had to manually write to intermediate files, and then read those files in the next job. Managing these temporary files was very painful. Also, the types of jobs were quite limited because there had to be a linear succession of Jobs (and though tricks could work around this, it was rarely done and ugly). Scalding 0.12 introduces Execution[T]. This type represents a scalding job that runs and returns T, and naturally, you can flatMap it. If you want to run two jobs in parallel, you can zip them together. An Execution[T] is basically a “run” function that takes the configuration, Mode, and then returns a Future[T]. This means you can also wire in service calls in the beginning, middle or end of your scalding job by lifting your call into an Execution.fromFuture.

Execution also enables looping. Consider PageRank: we want to run until we reach a level of convergence. We can do this by writing a function:

    def step(graph: TypedPipe[Edge], rank: TypedPipe[(Long, Double)]): Execution[(Double, TypedPipe[(Long, Double)])]

In the above, we express our algorithm step by propagation of the rank vector over the graph. The Execution returns the pair of the error between the old vector and the new vector and the new vector. Now we look:

val graph: TypedPipe[Edge]
def go(vector: TypedPipe[(Long, Double)]): Execution[TypedPipe[(Long, Double)]] =
  step(graph, vector).flatMap {
    case (err, v) if err < threshold => Execution.from(v)
    case (_, v) => go(v) // loop again
  }

// Now run the job.
val result = go(initVector).waitFor

For now, you need to put some operation in you scalding Job unless you use ExecutionJob or bypass using Job all-together if you choose, as done in ExecutionApp. ExecutionApp allows you do this:

object MyJob extends ExecutionApp {
  def job: Execution[Unit] = someExecutionLoopToDoAwesomeStuff
}

Then you can run MyJob as a normal class with the hadoop command. It parses arguments and makes them available in the Config object (see Execution.getConfig to access the Config and from there the Args).

Check out: .forceToDiskExecution, .writeExecution, .toIterableExecution, .forceToDiskExecution on TypedPipe to create executions you can use in loops, also .toOptionExecution and .getExecution on ValuePipe to get single values materialized.

As an example, see Kmeans: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/examples/KMeans.scala

Reducer Estimation:

The reducer estimation we developed has been made part of the open source release, meaning users can get automatic reducer setting based either on the input file sizes or on the history of the job. This promises to help us make more efficient use of our cluster. Estimation strategies are easy to implement, and will be an area of future improvement.

Full list of changes below:

  • Fix long compile time for MultiJoin helpers: #1109
  • Allows reducer estimation to operate on all hfs taps: #1080
  • Fix bufferedTake: #1107
  • Generate methods for flattening the results of many joins: #1097
  • Make TimePathedSource more configurable: #1105
  • Adding DailyPrefixSuffixLzoTsv: #1082
  • Option to select the fields for output in templatesource: #1061
  • Add a DailySuffixMostRecentLzoProtobuf source: #1104
  • Updates default scala version to 2.10.4: #1081
  • MultiSourceTap hashcode: #1101
  • scalding-core: merge flow step strategies to allow reducer estimation combined with other strategies: #1094
  • Improve command line handling of the execution app: #1083
  • More testing around the globifier with new properties: #1092
  • Refactor JDBCSource to add compile-time info about type of DB: #1087
  • Add a cumulative sum to KeyedList: #1085
  • Add in failing test case: #1090
  • Adds ability to also get the mode inside the Execution monad.: #1088
  • Enforce invariant: mapGroup iterators all nonempty: #1072
  • Allow PartitionSource to limit the number of open files: #1078
  • append to Cascading frameworks system property instead of setting it directly: #1076
  • Adds some output while assembly is building to keep travis happy: #1084
  • Only request necessary hadoop configs in hraven reducer estimator: #1067
  • Add parquet-scrooge sources: #1064
  • Outer join handles case when both are empty: #1065
  • Fix race in merging: #1063
  • Add support for column projection to parquet sources: #1056
  • Add typed version of RichPipe 'using': #1049
  • Add getExecution/getOrElseExecution: #1062
  • Change toIteratorExecution to toIterableExecution: #1058
  • Cache Execution evaluations: #1057
  • Add support for push down filters in parquet sources: #1050
  • Add support for Fold: #1053
  • move to use JobConf(true) for hadoop crazyness that causes host not foun...: #1051
  • Disable Cascading update check.: #1048
  • Respects -Dmapred.job.name when passed in on the command line: #1045
  • Add some instances from Algebird: #1039
  • Fix join.mapGroup issue: #1038
  • Add a defensive .forceToDisk in Sketched: #1035
  • Override toIterator for all Mappable with transformForRead: #1034
  • Make sinkFields in TypedDelimited final.: #1032
  • Fixed type of exception thrown by validateTaps: #1033
  • Add default local maven repo to the resolver list: #1024
  • Add an ExecutionApp trait for objects to skip the Job class: #1027
  • Make each head pipe have a unique name: #1025
  • Run REPL from SBT: #1021
  • Add Config to openForRead: #1023
  • Fix replConfig merging and evaluate values in Config.fromHadoop: #1015
  • REPL Autoload file: #1009
  • Fix hRaven Reducer Estimator: #1018
  • Update Cascading JDBC Version.: #1016
  • Some Execution fixes: #1007
  • Refactor InputSizeReducerEstimator to correctly unroll MultiSourceTaps: #1017
  • Fix issue #1011: Building develop branch fails: htt...
Read more

Fix REPL Snapshot Bug

07 Aug 00:27
Compare
Choose a tag to compare

Use Hadoop config's "hadoop.tmp.dir" for saving snapshots; fixes issues with multiple users trying to create snapshots in the same temporary directory.

Minor release bump

10 Jul 23:38
Compare
Choose a tag to compare
  • Fixes bad release portion where code wasn't updated for new scalding version number.
  • use cascading-jdbc 2.5.3 for table exists fix and cascading 2.5.5: #951
  • Bump build properties and sbt launcher: #950
  • Fixes the travis build: #944
  • Making the README.md consistent with 0.11.0 changes for the REPL.: #941

Scalding repl and library usage get lots of love

03 Jul 23:43
Compare
Choose a tag to compare
  • REPL: Add toIterator (and related methods): #929
  • Fix the build to use the shared module method: #938
  • Clean up the UniqueID stuff, to avoid plumbing it everywhere: #937
  • TypedPipe.from(List).distinct fails: #935
  • Clean up ExecutionContext a bit: #933
  • Fix Issue 932: no-op Jobs should not throw: #934
  • Use Execution to run flows in REPL: #928
  • Snapshot a pipe in the REPL: #918
  • Add support for AppJar in Config: #924
  • Fix LzoTextLine as a TypedSource: #921
  • Use externalizer in BijectedSourceSink: #926
  • Add an Executor to run flows without a Job: #915
  • This handles the case where scalding will save out a tsv and re-use it down stream leading to issues where the types are not strings: #913
  • Fix DailySuffixTsv for testability, remove leaked DailySuffixTsv: #919
  • Add a Config class to make configuration understandable: #914
  • Integrate the repl completely into scald.rb. Fixup scald-rb for better hdfs-local mode now with our provides: #902
  • Add some auto-reformats: #911
  • Update JDBCSource: #898
  • Allow tests for typed delimited by fixing swallowed bug: #910
  • Add Hadoop platform test to enable unit testing for Hadoop semantics: #858
  • Some minor improvements to typed joining code: #909
  • Fix #906: #908
  • Run the test target, so the tests are reformatted: #907
  • Enable scalariform: #905
  • Simplify "scald-repl.sh": #901
  • Typed Tutorial: #897
  • Adding a test for the scalding repl: #890
  • Properly close tuple iterator in test framework.: #896
  • Add constructors to ValuePipe: #893
  • contraMap and andThen on TypedSink/TypedSource: #892
  • Tiny fix to use an ImplicitBijection rather than Bijection: #887
  • Feature/bijected source sink: #886
  • Fix intersection equality error: #878
  • Add DailySuffixTypedTsv and HourlySuffixTypedTsv.: #873
  • add stepListner register support in Scalding: #875
  • Backport Meatlocker: #571

Version 0.10.0

14 May 22:58
Compare
Choose a tag to compare
  • Upgrade cascading to 2.5.4, cascading jdbc to 2.5.2
  • Adding an hdfs mode for the Scalding REPL
  • Added implementation of PartitionSource with tests
  • Add helper methods to KeyedList and TypedPipe
  • Add addTrap to TypedPipe

Scalding 0.9: Get it while it’s hot!

04 Apr 02:36
Compare
Choose a tag to compare

It’s been just over two years since we open sourced Scalding and today we are very excited to release the 0.9 version. Scalding at Twitter powers everything from internal and external facing dashboards, to custom relevance and ad targeting algorithms, including many graph algorithms such as PageRank, approximate user cosine similarity and many more.

There have been a wide breadth of new features added to Scalding since the last release:

Joins
An area of particular activity and impact has been around joins. The Fields API already had an API to do left and right joins over multiple streams, but with 0.9 we bring this functionality to the Typed-API. In 0.9, joins followed by reductions followed by more joins are automatically planned as single map reduce jobs, potentially reducing the number of steps in your pipelines.

case class UserName(id: Long, handle: String)
case class UserFavs(byUser: Long, favs: List[Long])
case class UserTweets(byUser: Long, tweets: List[Long])

def users: TypedSource[UserName]
def favs: TypedSource[UserFavs]
def tweets: TypedSource[UserTweets]

def output: TypedSink[(UserName, UserFavs, UserTweets)]

// Do a three-way join in one map-reduce step, with type safety
users.groupBy(_.id)
  .join(favs.groupBy(_.byUser))
  .join(tweets.groupBy(_.byUser))
  .map { case (uid, ((user, favs), tweets)) =>
   (user, favs, tweets)
  }  
  .write(output)

This includes custom co-grouping, not just left and right joins. To handle skewed data there is a new count-min-sketch based algorithm to solve the curse of the last reducer, and a critical bug-fix for skewed joins in the Fields API.

Input/output
In addition to joins, we’ve added support for new input/output formats:

  • Parquet Format is a columnar storage format which we open sourced in collaboration with Cloudera. Parquet can dramatically accelerate map-reduce jobs that read only a subset of the columns in an dataset, and can similarly reduce storage cost with more efficiently serialization.
  • Avro is an Apache project to standardize serialization with self-describing IDLs. Ebay contributed the scalding-avro module to make it easy to work with Apache Avro serialized data.
  • TemplateTap support eases partitioned writes of data, where the output path depends on the value of the data.

Hadoop counters
We’re also adding support for incrementing Hadoop counters inside map and reduce functions. For cases where you need to share a medium sized data file across all your tasks, support for Hadoop’s distributed cache was added in this release cycle.

Typed API
The typed API saw many improvements. When doing data-cubing, partial aggregation should happen before key expansion and sumByLocalKeys enables this. The type-system enforces constraints on sorting and joining that previously would have caused run-time exceptions. When reducing a data-set to a single value, a ValuePipe is returned. Like TypedPipe is analogous to a program to produce a distributed list, a ValuePipe is a like a program to produce a single value, with which we might want to filter or transform some TypedPipe.

Matrix API
When it comes to linear algebra, Scalding 0.9 introduced a new Matrix API which will replace the former one in our next major release. Due to the associative nature of matrix multiplication we can choose to compute (AB)C or A(BC). One of those orders might create a much smaller intermediate product than the other. The new API includes a dynamic programming optimization of the order of multiplication chains of matrices to minimize realized size along with several other optimizations. We have seen some considerable speedups of matrix operations with this API. In addition to the new optimizing API, we added some functions to efficiently compute all-pair inner-products (A A^T) using DISCO and DIMSUM. These algorithms excel for cases of vectors highly skewed in their support, which is to say most vectors have few non-zero elements, but some are almost completely dense.

Upgrading and Acknowledgements
Some APIs were deprecated, some were removed entirely, and some added more constraints. We have some sed rules to aid in porting. All changes fixed significant warts. For instance, in the Fields API sum takes a type parameter, and works for any Semigroup or Monoid. Several changes improve the design to aid in using scalding more as a library and less as a framework.

This latest release is our biggest to date spanning over 800 commits from 57 contributors It is available today in maven central. We hope Scalding is as useful to you as it is for us and the growing community. Follow us @scalding, join us on IRC (#scalding) or via the mailing list.

Better Function Serialization

03 Sep 16:49
Compare
Choose a tag to compare

This release adds more fault tolerant function serialization for https://github.com/twitter/summingbird . We now try Kryo, then fall back to Java if Kryo throws serializing the function.

If your code is working, there is no reason to update.

PS: Summingbird: https://blog.twitter.com/2013/streaming-mapreduce-with-summingbird