Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add quotation to external APIs #1763

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -15,6 +15,7 @@

package com.twitter.scalding

import com.twitter.scalding.quotation.Quoted
import cascading.flow.FlowDef
import org.apache.avro.Schema
import collection.JavaConverters._
Expand All @@ -26,7 +27,8 @@ package object avro {
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
mode: Mode,
q: Quoted): Unit = {
val sink = PackedAvroSource[T](path)
pipe.write(sink)
}
Expand All @@ -35,7 +37,8 @@ package object avro {
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
mode: Mode,
q: Quoted): Unit = {
import Dsl._
val sink = UnpackedAvroSource[T](path, Some(schema))
val outFields = {
Expand Down
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package com.twitter.scalding.commons.extensions

import com.twitter.scalding._
import com.twitter.scalding.quotation.Quoted
import com.twitter.scalding.Dsl._

import cascading.flow.FlowDef
Expand Down Expand Up @@ -111,7 +112,7 @@ object Checkpoint {

// Wrapper for Checkpoint when using a TypedPipe
def apply[A](checkpointName: String)(flow: => TypedPipe[A])(implicit args: Args, mode: Mode, flowDef: FlowDef,
conv: TupleConverter[A], setter: TupleSetter[A]): TypedPipe[A] = {
conv: TupleConverter[A], setter: TupleSetter[A], q: Quoted): TypedPipe[A] = {
val rPipe = apply(checkpointName, Dsl.intFields(0 until conv.arity)) {
flow.toPipe(Dsl.intFields(0 until conv.arity))
}
Expand Down
Expand Up @@ -32,6 +32,7 @@ import com.twitter.scalding.commons.tap.VersionedTap.TapMode
import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck }
import com.twitter.scalding.typed.KeyedListLike
import com.twitter.scalding.typed.TypedSink
import com.twitter.scalding.quotation.Quoted
import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -225,7 +226,7 @@ class TypedRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]) extends j
// the pipe in using an implicit `Monoid[V]` and sinks all results
// into the `sinkVersion` of data (or a new version) specified by
// `src`.
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode): TypedPipe[(K, V)] = {
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode, q: Quoted): TypedPipe[(K, V)] = {
val outPipe =
if (!src.resourceExists(mode))
pipe
Expand Down
Expand Up @@ -2,6 +2,7 @@ package com.twitter.scalding.examples

import com.twitter.scalding._
import com.twitter.scalding.typed.ComputedValue
import com.twitter.scalding.quotation.Quoted

object KMeans {

Expand Down Expand Up @@ -88,12 +89,12 @@ object KMeans {
}
}

def initializeClusters(k: Int, points: TypedPipe[Vector[Double]]): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
def initializeClusters(k: Int, points: TypedPipe[Vector[Double]])(implicit q: Quoted): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
val rng = new java.util.Random(123)
// take a random k vectors:
val clusters = points.map { v => (rng.nextDouble, v) }
.groupAll
.sortedTake(k)(Ordering.by(_._1))
.sortedTake(k)(Ordering.by(_._1), q)
.mapValues { randk =>
randk.iterator
.zipWithIndex
Expand Down
@@ -1,6 +1,7 @@
package com.twitter.scalding.typed

import com.twitter.algebird._
import com.twitter.scalding.quotation.Quoted

/**
* Extension for TypedPipe to add a cumulativeSum method.
Expand Down Expand Up @@ -39,7 +40,8 @@ object CumulativeSum {
def cumulativeSum(
implicit sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
ordK: Ordering[K],
quoted: Quoted): SortedGrouped[K, (U, V)] = {
pipe.group
.sortBy { case (u, _) => u }
.scanLeft(Nil: List[(U, V)]) {
Expand All @@ -62,7 +64,8 @@ object CumulativeSum {
implicit ordS: Ordering[S],
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): TypedPipe[(K, (U, V))] = {
ordK: Ordering[K],
q: Quoted): TypedPipe[(K, (U, V))] = {

val sumPerS = pipe
.map { case (k, (u, v)) => (k, partition(u)) -> v }
Expand Down
Expand Up @@ -5,9 +5,12 @@ import com.twitter.scalding._
import com.twitter.scalding.source.TypedText
import scala.collection.mutable.Buffer
import TDsl._
import com.twitter.scalding.quotation.Quoted

trait TBddDsl extends FieldConversions with TypedPipeOperationsConversions {

private implicit val q: Quoted = Quoted.internal

def Given[TypeIn](source: TypedTestSource[TypeIn]): TestCaseGiven1[TypeIn] = new TestCaseGiven1[TypeIn](source)

def GivenSources(sources: List[TypedTestSource[_]]): TestCaseGivenList = new TestCaseGivenList(sources)
Expand Down
Expand Up @@ -30,6 +30,7 @@ import cascading.tap._
import com.twitter.scalding.Dsl._
import scala.math.max
import scala.annotation.tailrec
import com.twitter.scalding.quotation.Quoted

/**
* Matrix class - represents an infinite (hopefully sparse) matrix.
Expand Down Expand Up @@ -138,10 +139,11 @@ class MatrixMappableExtensions[T](mappable: Mappable[T])(implicit fd: FlowDef, m
}

def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: Ordering[(Group, Row)],
setter: TupleSetter[(Group, Row, Col, Val)]): BlockMatrix[Group, Row, Col, Val] =
setter: TupleSetter[(Group, Row, Col, Val)],
q: Quoted): BlockMatrix[Group, Row, Col, Val] =
mapToBlockMatrix { _.asInstanceOf[(Group, Row, Col, Val)] }

def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = {
def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)], q: Quoted): BlockMatrix[Group, Row, Col, Val] = {
val matPipe = TypedPipe
.from(mappable)
.map(fn)
Expand Down