Skip to content

Commit

Permalink
publish for scalding base missing (#1996)
Browse files Browse the repository at this point in the history
* publish for scalding base missing

* pin java8 version

* scalding-base was not being formatted

* we should really fix the optimization tests some time...
  • Loading branch information
daniel-sudz committed Apr 28, 2022
1 parent e96797b commit a0516e0
Show file tree
Hide file tree
Showing 17 changed files with 89 additions and 98 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Expand Up @@ -115,7 +115,7 @@ jobs:
- uses: actions/setup-java@v2
with:
distribution: 'adopt-openj9'
java-version: 8
java-version: '8.0.322+6' # non hadoop 3.3 versions build break https://issues.apache.org/jira/browse/HADOOP-16590

- uses: coursier/cache-action@v6
- name: Set up Ruby
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/publish.yml
Expand Up @@ -4,8 +4,8 @@ on:
push:
branches:
- "develop"
release:
types: [created]
tags:
- "v*"

jobs:
publish:
Expand All @@ -20,7 +20,7 @@ jobs:
- uses: actions/setup-java@v2
with:
distribution: "adopt-openj9"
java-version: 8
java-version: '8.0.322+6' # non hadoop 3.3 versions build break https://issues.apache.org/jira/browse/HADOOP-16590

- name: Set up Ruby
uses: ruby/setup-ruby@v1
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Expand Up @@ -191,6 +191,7 @@ lazy val scalding = Project(id = "scalding", base = file("."))
scaldingQuotation,
scaldingCats,
scaldingDagon,
scaldingBase,
scaldingCore,
scaldingCommons,
scaldingAvro,
Expand Down
55 changes: 29 additions & 26 deletions scalding-base/src/main/scala/com/twitter/scalding/Config.scala
Expand Up @@ -15,11 +15,11 @@ limitations under the License.
*/
package com.twitter.scalding

import com.twitter.scalding.serialization.{Serialization, RequireOrderedSerializationMode}
import com.twitter.scalding.serialization.{RequireOrderedSerializationMode, Serialization}
import com.twitter.scalding.serialization.macros.impl.BinaryOrdering.{ordSer => serializer}
import java.util.Base64
import java.security.MessageDigest
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -296,27 +296,27 @@ abstract class Config extends Serializable {
getBoolean(Config.ScaldingCheckHfsTaps, false)

/*
* Used in joins to determine how much of the "right hand side" of
* the join to keep in memory
*/
* Used in joins to determine how much of the "right hand side" of
* the join to keep in memory
*/
def setListSpillThreshold(count: Int): Config =
this + (CascadingSpillablePropListThreshold -> count.toString)

/*
* Used in hashJoin/joinWithTiny to determine how big the map
* can be before spilling to disk. Generally, as big as you can
* allow here without OOM will help performance.
*/
* Used in hashJoin/joinWithTiny to determine how big the map
* can be before spilling to disk. Generally, as big as you can
* allow here without OOM will help performance.
*/
def setMapSpillThreshold(count: Int): Config =
this + (CascadingSpillablePropMapThreshold -> count.toString)

/*
* Used in map-side aggregation of associative operations (Semigroup/Monoid)
* This controls how many keys are in an in-memory cache. If a significant
* probability mass of the key-space is far bigger than this value, it
* does not help much (and may hurt, so experiment with disabling to get
* the best results
*/
* Used in map-side aggregation of associative operations (Semigroup/Monoid)
* This controls how many keys are in an in-memory cache. If a significant
* probability mass of the key-space is far bigger than this value, it
* does not help much (and may hurt, so experiment with disabling to get
* the best results
*/
def setMapSideAggregationThreshold(count: Int): Config =
this + (CascadingAggregateByThreshold -> count.toString)

Expand Down Expand Up @@ -414,9 +414,9 @@ object Config {
val empty: Config = Config(Map.empty)

/*
* Here is a config that will work, but perhaps is not optimally tuned for
* your cluster
*/
* Here is a config that will work, but perhaps is not optimally tuned for
* your cluster
*/
val default: Config =
Config.empty
.setListSpillThreshold(100 * 1000)
Expand All @@ -425,13 +425,15 @@ object Config {
.setScaldingVersion

/*
* Extensions to the Default Config to tune it for unit tests
*/
* Extensions to the Default Config to tune it for unit tests
*/
def unitTestDefault: Config =
Config.default ++ Config.from(
Map(
("cascading.update.skip" -> "true"),
(Config.RuntimeFrameworkKey -> Config.RuntimeFrameworkValueLocal)))
(Config.RuntimeFrameworkKey -> Config.RuntimeFrameworkValueLocal)
)
)

def apply(m: Map[String, String]): Config = new Config { def toMap = m }
/*
Expand Down Expand Up @@ -497,10 +499,10 @@ object Config {
}

/*
* Legacy code that uses Map[AnyRef, AnyRef] can call this
* function to get a Config.
* If there are unrecognized non-string values, this may fail.
*/
* Legacy code that uses Map[AnyRef, AnyRef] can call this
* function to get a Config.
* If there are unrecognized non-string values, this may fail.
*/
def tryFrom(maybeConf: Map[AnyRef, AnyRef]): Try[Config] = {
val (nonStrings, strings) = Config.stringsFrom(maybeConf)
val initConf = Config.from(strings)
Expand All @@ -523,5 +525,6 @@ object Config {
}
}

private def argMapSerializer: Serialization[Map[String, List[String]]] = serializer[Map[String, List[String]]]
private def argMapSerializer: Serialization[Map[String, List[String]]] =
serializer[Map[String, List[String]]]
}
29 changes: 13 additions & 16 deletions scalding-base/src/main/scala/com/twitter/scalding/Execution.scala
Expand Up @@ -15,7 +15,7 @@ limitations under the License.
*/
package com.twitter.scalding

import com.twitter.scalding.typed.{TypedPipe, Output}
import com.twitter.scalding.typed.{Output, TypedPipe}
import com.twitter.scalding.dagon.{Dag, Id, Rule}
import com.twitter.algebird.monad.Trampoline
import com.twitter.algebird.{Monad, Monoid, Semigroup}
Expand Down Expand Up @@ -319,7 +319,6 @@ object Execution {
def withConfig[T](ex: Execution[T])(c: Config => Config): Execution[T] =
TransformedConfig(ex, c)


/**
* This function allows running the passed execution with its own cache. This will mean anything inside
* won't benefit from Execution's global attempts to avoid repeated executions.
Expand Down Expand Up @@ -526,7 +525,7 @@ object Execution {
cec: ConcurrentExecutionContext
) =
Trampoline.call(prev.runStats(conf, mode, cache)).map { case CFuture(fut, cancelHandler) =>
lazy val uncachedFut = {
lazy val uncachedFut =
fut
.map(v => (v, CancellationHandler.empty)) // map this to the right shape
.recoverWith {
Expand All @@ -542,7 +541,6 @@ object Execution {
f.map(v => (v, c))
})
}
}

val recoveredFut = cache.getOrElseInsert(
conf,
Expand Down Expand Up @@ -651,16 +649,16 @@ object Execution {
* This allows you to run platform specific executions
*/
private[scalding] final case class BackendExecution[A](
result: (Config, Mode, Writer, ConcurrentExecutionContext) => CFuture[(Long, ExecutionCounters, A)])
extends Execution[A] {
result: (Config, Mode, Writer, ConcurrentExecutionContext) => CFuture[(Long, ExecutionCounters, A)]
) extends Execution[A] {
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit
cec: ConcurrentExecutionContext
) =
Trampoline(
cache.getOrElseInsert(
conf,
this,
try result(conf, mode, cache.writer, cec).map { case (id, c, a) => (a, Map(id -> c))}
try result(conf, mode, cache.writer, cec).map { case (id, c, a) => (a, Map(id -> c)) }
catch {
case NonFatal(e) => CFuture.failed(e)
}
Expand Down Expand Up @@ -931,18 +929,17 @@ object Execution {
val unit: Execution[Unit] = from(())

/**
* This should be avoided if at all possible. It is here to allow backend authors to implement
* custom executions which should very rarely be needed.
*
* This should be avoided if at all possible. It is here to allow backend authors to implement custom
* executions which should very rarely be needed.
*
* The CFuture returned should have three elements:
* 1. unique ID (Long) for the scope of the Writer
* 2. Counter values created by this Execution
* 3. the final result of the Execution (maybe Unit)
* 1. unique ID (Long) for the scope of the Writer 2. Counter values created by this Execution 3. the
* final result of the Execution (maybe Unit)
*/
def backendSpecific[A](
fn: (Config, Mode, Writer, ConcurrentExecutionContext) => CFuture[(Long, ExecutionCounters, A)]
fn: (Config, Mode, Writer, ConcurrentExecutionContext) => CFuture[(Long, ExecutionCounters, A)]
): Execution[A] =
BackendExecution(fn)
BackendExecution(fn)

def forceToDisk[T](t: TypedPipe[T]): Execution[TypedPipe[T]] =
WriteExecution(ToWrite.Force(t), Nil, { case (conf, _, w, cec) => w.getForced(conf, t)(cec) })
Expand Down Expand Up @@ -984,7 +981,7 @@ object Execution {

/**
* Use this to use counters/stats with Execution. You do this: Execution.withId { implicit uid => val myStat
* = Stat("myStat") // uid is implicitly pulled in pipe.map { t => if(someCase(t)) myStat.inc fn(t) }
* \= Stat("myStat") // uid is implicitly pulled in pipe.map { t => if(someCase(t)) myStat.inc fn(t) }
* .writeExecution(mySink) }
*/
def withId[T](fn: UniqueID => Execution[T]): Execution[T] = UniqueIdExecution(fn)
Expand Down
Expand Up @@ -67,8 +67,8 @@ object ExecutionOptimizationRules {
)

/**
* If `Execution` is `WriteExecution`, we are considering those executions as slow,
* since they will schedule some expensive work, like Hadoop or Spark Job.
* If `Execution` is `WriteExecution`, we are considering those executions as slow, since they will schedule
* some expensive work, like Hadoop or Spark Job.
*
* If `Execution` is `FlatMapped` or `UniqueIdExecution`, we are considering those executions as slow, since
* we don't know which execution they can produce.
Expand Down
Expand Up @@ -19,6 +19,7 @@ import scala.util.{Failure, Try}

object JobStats {
def empty: JobStats = new JobStats(Map("counters" -> Map.empty))

/**
* Returns the counters with Group String -> Counter String -> Long
*/
Expand Down
7 changes: 3 additions & 4 deletions scalding-base/src/main/scala/com/twitter/scalding/Mode.scala
Expand Up @@ -8,9 +8,8 @@ trait Mode extends java.io.Serializable {
def newWriter(): Execution.Writer

/**
* Config.defaultForMode converts this map into
* a Config (we don't use Config here to avoid
* a circular dependency)
* Config.defaultForMode converts this map into a Config (we don't use Config here to avoid a circular
* dependency)
*/
def defaultConfig: Map[String, String] = Map.empty
}
Expand All @@ -37,4 +36,4 @@ object Mode {
}

case class ModeException(message: String) extends RuntimeException(message)
case class ModeLoadException(message: String, origin: ClassNotFoundException) extends RuntimeException(origin)
case class ModeLoadException(message: String, origin: ClassNotFoundException) extends RuntimeException(origin)
Expand Up @@ -12,4 +12,4 @@ object StatKey {
StatKey(counter, ScaldingGroup)

val ScaldingGroup = "Scalding Custom"
}
}
Expand Up @@ -21,13 +21,10 @@ object UniqueID {
}

/**
* This is only safe if you use something known to have
* a single instance in the relevant scope.
*
* This is only safe if you use something known to have a single instance in the relevant scope.
*
* In cascading, the FlowDef has been used here
*/
def fromSystemHashCode(ar: AnyRef): UniqueID =
UniqueID(System.identityHashCode(ar).toString)
}


Expand Up @@ -16,7 +16,7 @@ limitations under the License.
package com.twitter.scalding.mathematics

import com.twitter.scalding.serialization.OrderedSerialization2
import com.twitter.scalding.typed.{ComputedValue, EmptyValue, LiteralValue, ValuePipe, TypedPipe, Input}
import com.twitter.scalding.typed.{ComputedValue, EmptyValue, Input, LiteralValue, TypedPipe, ValuePipe}
import com.twitter.algebird.{Field, Group, Monoid, Ring, Semigroup}
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -99,7 +99,7 @@ sealed trait Matrix2[R, C, V] extends Serializable {
vec: Matrix2[C, C2, VecV]
)(implicit ev: =:=[V, Boolean], mon: Monoid[VecV], mj: MatrixJoiner2): Matrix2[R, C2, VecV] = {

//This cast will always succeed:
// This cast will always succeed:
lazy val joinedBool = mj.join(this.asInstanceOf[Matrix2[R, C, Boolean]], vec)
implicit val ord2: Ordering[C2] = vec.colOrd
lazy val resultPipe = joinedBool
Expand Down Expand Up @@ -335,7 +335,7 @@ final case class Product[R, C, C2, V](
}

// represents `\sum_{i j} M_{i j}` where `M_{i j}` is the Matrix with exactly one element at `row=i, col = j`.
lazy val toOuterSum: TypedPipe[(R, C2, V)] = {
lazy val toOuterSum: TypedPipe[(R, C2, V)] =
if (optimal) {
if (isSpecialCase) {
specialCase
Expand All @@ -351,7 +351,6 @@ final case class Product[R, C, C2, V](
// Maybe it is Product[R, _, C2, V]
optimizedSelf.asInstanceOf[Product[R, _, C2, V]].toOuterSum
}
}

private def computePipe(joined: TypedPipe[(R, C2, V)] = toOuterSum): TypedPipe[(R, C2, V)] =
if (isSpecialCase) {
Expand All @@ -366,7 +365,7 @@ final case class Product[R, C, C2, V](
.map { case ((r, c), v) => (r, c, v) }
}

override lazy val toTypedPipe: TypedPipe[(R, C2, V)] = {
override lazy val toTypedPipe: TypedPipe[(R, C2, V)] =
expressions match {
case Some(m) =>
m.get(this).getOrElse {
Expand All @@ -376,7 +375,6 @@ final case class Product[R, C, C2, V](
}
case None => optimizedSelf.toTypedPipe
}
}

override val sizeHint = left.sizeHint * right.sizeHint

Expand Down Expand Up @@ -445,7 +443,7 @@ final case class Sum[R, C, V](left: Matrix2[R, C, V], right: Matrix2[R, C, V], m
}
}

override lazy val toTypedPipe: TypedPipe[(R, C, V)] = {
override lazy val toTypedPipe: TypedPipe[(R, C, V)] =
if (left.equals(right)) {
left.optimizedSelf.toTypedPipe.map(v => (v._1, v._2, mon.plus(v._3, v._3)))
} else {
Expand All @@ -457,7 +455,6 @@ final case class Sum[R, C, V](left: Matrix2[R, C, V], right: Matrix2[R, C, V], m
.filter(kv => mon.isNonZero(kv._2))
.map { case ((r, c), v) => (r, c, v) }
}
}

override val sizeHint = left.sizeHint + right.sizeHint

Expand Down Expand Up @@ -489,7 +486,7 @@ final case class HadamardProduct[R, C, V](left: Matrix2[R, C, V], right: Matrix2
extends Matrix2[R, C, V] {

// TODO: optimize / combine with Sums: https://github.com/tomtau/scalding/issues/14#issuecomment-22971582
override lazy val toTypedPipe: TypedPipe[(R, C, V)] = {
override lazy val toTypedPipe: TypedPipe[(R, C, V)] =
if (left.equals(right)) {
left.optimizedSelf.toTypedPipe.map(v => (v._1, v._2, ring.times(v._3, v._3)))
} else {
Expand All @@ -502,7 +499,6 @@ final case class HadamardProduct[R, C, V](left: Matrix2[R, C, V], right: Matrix2
.filter(kv => kv._2._2 && ring.isNonZero(kv._2._1))
.map { case ((r, c), v) => (r, c, v._1) }
}
}

override lazy val transpose: MatrixLiteral[C, R, V] =
MatrixLiteral(toTypedPipe.map(x => (x._2, x._1, x._3)), sizeHint.transpose)(colOrd, rowOrd)
Expand Down
Expand Up @@ -624,7 +624,7 @@ final case class IdentityReduce[K, V1, V2](
toUIR.mapValues(fn)

// This is not correct in the type-system, but would be nice to encode
//override def mapValues[V3](fn: V1 => V3) = IdentityReduce(keyOrdering, mapped.mapValues(fn), reducers)
// override def mapValues[V3](fn: V1 => V3) = IdentityReduce(keyOrdering, mapped.mapValues(fn), reducers)

override def sum[U >: V2](implicit sg: Semigroup[U]) = {
// there is no sort, mapValueStream or force to reducers:
Expand Down

0 comments on commit a0516e0

Please sign in to comment.