Skip to content

Commit

Permalink
add quotation to external APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Flavio Brasil committed Jan 8, 2018
1 parent de3948c commit 36dec0d
Show file tree
Hide file tree
Showing 27 changed files with 423 additions and 428 deletions.
Expand Up @@ -15,6 +15,7 @@

package com.twitter.scalding

import com.twitter.scalding.quotation.Quoted
import cascading.flow.FlowDef
import org.apache.avro.Schema
import collection.JavaConverters._
Expand All @@ -26,7 +27,8 @@ package object avro {
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
mode: Mode,
q: Quoted): Unit = {
val sink = PackedAvroSource[T](path)
pipe.write(sink)
}
Expand All @@ -35,7 +37,8 @@ package object avro {
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
mode: Mode,
q: Quoted): Unit = {
import Dsl._
val sink = UnpackedAvroSource[T](path, Some(schema))
val outFields = {
Expand Down
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package com.twitter.scalding.commons.extensions

import com.twitter.scalding._
import com.twitter.scalding.quotation.Quoted
import com.twitter.scalding.Dsl._

import cascading.flow.FlowDef
Expand Down Expand Up @@ -111,7 +112,7 @@ object Checkpoint {

// Wrapper for Checkpoint when using a TypedPipe
def apply[A](checkpointName: String)(flow: => TypedPipe[A])(implicit args: Args, mode: Mode, flowDef: FlowDef,
conv: TupleConverter[A], setter: TupleSetter[A]): TypedPipe[A] = {
conv: TupleConverter[A], setter: TupleSetter[A], q: Quoted): TypedPipe[A] = {
val rPipe = apply(checkpointName, Dsl.intFields(0 until conv.arity)) {
flow.toPipe(Dsl.intFields(0 until conv.arity))
}
Expand Down
Expand Up @@ -32,6 +32,7 @@ import com.twitter.scalding.commons.tap.VersionedTap.TapMode
import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck }
import com.twitter.scalding.typed.KeyedListLike
import com.twitter.scalding.typed.TypedSink
import com.twitter.scalding.quotation.Quoted
import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -224,7 +225,7 @@ class TypedRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]) extends j
// the pipe in using an implicit `Monoid[V]` and sinks all results
// into the `sinkVersion` of data (or a new version) specified by
// `src`.
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode): TypedPipe[(K, V)] = {
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode, q: Quoted): TypedPipe[(K, V)] = {
val outPipe =
if (!src.resourceExists(mode))
pipe
Expand Down
Expand Up @@ -2,6 +2,7 @@ package com.twitter.scalding.examples

import com.twitter.scalding._
import com.twitter.scalding.typed.ComputedValue
import com.twitter.scalding.quotation.Quoted

object KMeans {

Expand Down Expand Up @@ -88,12 +89,12 @@ object KMeans {
}
}

def initializeClusters(k: Int, points: TypedPipe[Vector[Double]]): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
def initializeClusters(k: Int, points: TypedPipe[Vector[Double]])(implicit q: Quoted): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
val rng = new java.util.Random(123)
// take a random k vectors:
val clusters = points.map { v => (rng.nextDouble, v) }
.groupAll
.sortedTake(k)(Ordering.by(_._1))
.sortedTake(k)(Ordering.by(_._1), q)
.mapValues { randk =>
randk.iterator
.zipWithIndex
Expand Down
@@ -1,6 +1,7 @@
package com.twitter.scalding.typed

import com.twitter.algebird._
import com.twitter.scalding.quotation.Quoted

/**
* Extension for TypedPipe to add a cumulativeSum method.
Expand Down Expand Up @@ -39,7 +40,8 @@ object CumulativeSum {
def cumulativeSum(
implicit sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
ordK: Ordering[K],
quoted: Quoted): SortedGrouped[K, (U, V)] = {
pipe.group
.sortBy { case (u, _) => u }
.scanLeft(Nil: List[(U, V)]) {
Expand All @@ -62,7 +64,8 @@ object CumulativeSum {
implicit ordS: Ordering[S],
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): TypedPipe[(K, (U, V))] = {
ordK: Ordering[K],
q: Quoted): TypedPipe[(K, (U, V))] = {

val sumPerS = pipe
.map { case (k, (u, v)) => (k, partition(u)) -> v }
Expand Down
Expand Up @@ -5,9 +5,12 @@ import com.twitter.scalding._
import com.twitter.scalding.source.TypedText
import scala.collection.mutable.Buffer
import TDsl._
import com.twitter.scalding.quotation.Quoted

trait TBddDsl extends FieldConversions with TypedPipeOperationsConversions {

private implicit val q: Quoted = Quoted.internal

def Given[TypeIn](source: TypedTestSource[TypeIn]): TestCaseGiven1[TypeIn] = new TestCaseGiven1[TypeIn](source)

def GivenSources(sources: List[TypedTestSource[_]]): TestCaseGivenList = new TestCaseGivenList(sources)
Expand Down
Expand Up @@ -30,6 +30,7 @@ import cascading.tap._
import com.twitter.scalding.Dsl._
import scala.math.max
import scala.annotation.tailrec
import com.twitter.scalding.quotation.Quoted

/**
* Matrix class - represents an infinite (hopefully sparse) matrix.
Expand Down Expand Up @@ -138,10 +139,11 @@ class MatrixMappableExtensions[T](mappable: Mappable[T])(implicit fd: FlowDef, m
}

def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: Ordering[(Group, Row)],
setter: TupleSetter[(Group, Row, Col, Val)]): BlockMatrix[Group, Row, Col, Val] =
setter: TupleSetter[(Group, Row, Col, Val)],
q: Quoted): BlockMatrix[Group, Row, Col, Val] =
mapToBlockMatrix { _.asInstanceOf[(Group, Row, Col, Val)] }

def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = {
def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)], q: Quoted): BlockMatrix[Group, Row, Col, Val] = {
val matPipe = TypedPipe
.from(mappable)
.map(fn)
Expand Down

0 comments on commit 36dec0d

Please sign in to comment.