Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintenance/release scala 0.2.0 b1 snapshot #1511

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.7.9
current_version = 0.7.10
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-(?P<release>[a-z]+)(?P<build>\d+))?
serialize =
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ src.python.pyc := $(shell find ./src -type f -name "*.pyc")
src.proto.dir := ./proto/src
src.proto := $(shell find $(src.proto.dir) -type f -name "*.proto")

version := 0.7.9
version := 0.7.10

dist.dir := dist
egg.dir := .eggs
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
# built documents.
#
# The short X.Y version.
version = "0.7.9"
version = "0.7.10-dev0"
# The full version, including alpha/beta/rc tags.
release = "" # Is set by calling `setup.py docs`

Expand Down
2 changes: 1 addition & 1 deletion java/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = "ai.whylabs"
version = "0.2.0-b1"
version = "0.2.0-b1-${project.properties.getOrDefault("versionType", "SNAPSHOT")}"
//version = "0.1.7-b1-${project.properties.getOrDefault("versionType", "SNAPSHOT")}"
extra["isReleaseVersion"] = !version.toString().endsWith("SNAPSHOT")

Expand Down
1 change: 1 addition & 0 deletions java/spark/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ artifacts {
dependencies {
api("org.slf4j:slf4j-api:1.7.27")
implementation(scalaPackage("org.apache.spark", "spark-core", sparkVersion))
implementation(scalaPackage("org.apache.spark", "spark-mllib", sparkVersion))
implementation(scalaPackage("org.apache.spark", "spark-sql", sparkVersion))

// project dependencies
Expand Down
89 changes: 86 additions & 3 deletions java/spark/src/main/scala/com/whylogs/spark/WhyLogs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import ai.whylabs.service.invoker.ApiClient
import ai.whylabs.service.model.{LogAsyncRequest, SegmentTag}
import com.whylogs.spark.WhyLogs.PROFILE_FIELD
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{DataTypes, NumericType, StructField}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.types.{ArrayType, DataTypes, NumericType, StructField}
import org.apache.spark.sql.{DataFrame, Dataset, RelationalGroupedDataset, Row}
import org.apache.spark.whylogs.{DatasetProfileAggregator, DatasetProfileMerger}
import org.slf4j.LoggerFactory

Expand All @@ -20,12 +20,15 @@ import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.ml.evaluation.RankingEvaluator

case class ModelProfileSession(predictionField: String, targetField: String, scoreField: String = null) {
def shouldExclude(field: String): Boolean = {
predictionField == field || targetField == field || scoreField == field
}
}
case class RankingMetricsSession(predictionField: String, targetField: String, scoreField: String = null, k: Option[Int] = None)

/**
* A class that enable easy access to the profiling API
Expand All @@ -40,7 +43,8 @@ case class WhyProfileSession(private val dataFrame: DataFrame,
private val timeColumn: String = null,
private val groupByColumns: Seq[String] = List(),
// model metrics
private val modelProfile: ModelProfileSession = null
private val modelProfile: ModelProfileSession = null,
private val rankingMetrics: RankingMetricsSession = null
) {
private val logger = LoggerFactory.getLogger(getClass)
private val columnNames = dataFrame.schema.fieldNames.toSet
Expand Down Expand Up @@ -107,6 +111,69 @@ case class WhyProfileSession(private val dataFrame: DataFrame,
this.copy(modelProfile = ModelProfileSession(predictionField, targetField))
}

def withRankingMetrics(predictionField: String, targetField: String, scoreField: String = null, k: Option[Int] = None) : WhyProfileSession = {
checkIfColumnExists(predictionField)
checkIfColumnExists(targetField)

val predFieldSchema: StructField = dataFrame.schema.apply(predictionField)
if (!predFieldSchema.dataType.isInstanceOf[ArrayType]) {
throw new IllegalStateException(s"Ranking Metrics prediction field MUST be of array of numeric type. Got: ${predFieldSchema.dataType}")
}
val targetFieldSchema: StructField = dataFrame.schema.apply(targetField)
if (!predFieldSchema.dataType.isInstanceOf[ArrayType]) {
throw new IllegalStateException(s"Ranking Metrics target field MUST be of array numeric type. Got: ${targetFieldSchema.dataType}")
}
this.copy(rankingMetrics = RankingMetricsSession(predictionField, targetField, scoreField, k))
}

def rankingMetricDF(df: DataFrame): DataFrame = {
val rankingMetricsFields = Option(rankingMetrics).toSeq.flatMap(m => {
Seq(m.targetField, m.predictionField, m.scoreField).filter(_ != null)
})

val rddOfTuples = df.select(rankingMetricsFields.map(col):_*).rdd
.map(row => (
row.getAs[Seq[Int]](rankingMetrics.predictionField).toArray,
row.getAs[Seq[Int]](rankingMetrics.targetField).toArray
))
val k = rankingMetrics.k.getOrElse(10)

val metrics = new RankingMetrics(rddOfTuples)
val metricsSequence = dataFrame.sparkSession.sparkContext.parallelize(
Seq((
metrics.precisionAt(k),
metrics.meanAveragePrecisionAt(k),
metrics.ndcgAt(k),
metrics.recallAt(k))),
2
)

val metricOutput = dataFrame.sparkSession.createDataFrame(metricsSequence).toDF(s"precision_k_$k", s"average_precision_k_$k", s"norm_dis_cumul_gain_k_$k", s"recall_k_$k")
metricOutput
}

def aggRankingMetricsProfiles(timestamp: Long): DataFrame = {
this.aggRankingMetricsProfiles(Instant.ofEpochMilli(timestamp))
}

def aggRankingMetricsProfiles(timestamp: Instant = Instant.now()): DataFrame = {
val timeInMillis = timestamp.toEpochMilli
logger.debug(s"Ranking metrics session name: $name")
logger.debug(s"timestamp: $timestamp")
logger.debug(s"All columns: $columnNames")

val coalesced = dataFrame.coalesce(dataFrame.sparkSession.sparkContext.defaultParallelism)

val rankingMetricProfiles = rankingMetricDF(coalesced)
.groupBy()
.agg(DatasetProfileAggregator(name, timeInMillis, timeColumn, groupByColumns, modelProfile)
.toColumn
.alias(PROFILE_FIELD))
.groupBy()
.agg(new DatasetProfileMerger(name, timeInMillis).toColumn.alias(PROFILE_FIELD))
rankingMetricProfiles
}

/**
* Run aggregation and build profile based on the specification of this session
*
Expand Down Expand Up @@ -192,6 +259,22 @@ case class WhyProfileSession(private val dataFrame: DataFrame,
})
}

def logRankingMetrics(timestampInMs: Long = Instant.now().toEpochMilli,
orgId: String,
modelId: String,
apiKey: String,
endpoint: String = "https://api.whylabsapp.com",
sslCaCertData: String = null,
): Unit = {
logger.info(s"Computing ranking metrics: [$name, $timestampInMs]")
val df = aggRankingMetricsProfiles(timestamp = timestampInMs)
logger.info(s"Uploading ranking metrics: [$name, $timestampInMs]")
df.foreachPartition((rows: Iterator[Row]) => {
doUpload(orgId, modelId, apiKey, rows, endpoint, sslCaCertData)
})
logger.info(s"Done uploading ranking metrics: [$name, $timestampInMs]")
}

private def doUpload(
orgId: String,
modelId: String,
Expand Down
136 changes: 135 additions & 1 deletion java/spark/src/test/scala/com/whylogs/spark/WhyLogsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,63 @@ import com.whylogs.core.message.InferredType
import com.whylogs.spark.WhyLogs.ProfiledDataFrame
import org.apache.commons.lang3.RandomUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
import org.apache.spark.whylogs.SharedSparkContext

import org.scalatest.exceptions.TestFailedException
import org.scalatest.funsuite.AnyFunSuite

import scala.collection.JavaConverters._
import scala.reflect.io.Directory
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


case class TestDataPoint(x: String, i: Int, d: Double, ts: Timestamp) extends Serializable

case class TestPrediction(prediction: Int, target: Int, score: Double) extends Serializable

class WhyLogsTest extends AnyFunSuite with SharedSparkContext {
val ABS_TOL_MSG = " using absolute tolerance"
private def AbsoluteErrorComparison(x: Double, y: Double, eps: Double): Boolean = {
math.abs(x - y) < eps
}

case class CompareDoubleRightSide(
fun: (Double, Double, Double) => Boolean, y: Double, eps: Double, method: String)

implicit class DoubleWithAlmostEquals(val x: Double) {

def ~=(r: CompareDoubleRightSide): Boolean = r.fun(x, r.y, r.eps)
def !~=(r: CompareDoubleRightSide): Boolean = !r.fun(x, r.y, r.eps)

def ~==(r: CompareDoubleRightSide): Boolean = {
if (!r.fun(x, r.y, r.eps)) {
throw new TestFailedException(
s"Expected $x and ${r.y} to be within ${r.eps}${r.method}.", 0)
}
true
}

def !~==(r: CompareDoubleRightSide): Boolean = {
if (r.fun(x, r.y, r.eps)) {
throw new TestFailedException(
s"Did not expect $x and ${r.y} to be within ${r.eps}${r.method}.", 0)
}
true
}

/**
* Comparison using absolute tolerance.
*/
def absTol(eps: Double): CompareDoubleRightSide =
CompareDoubleRightSide(AbsoluteErrorComparison, x, eps, ABS_TOL_MSG)

override def toString: String = x.toString
}

test("test WhyLogsSession") {
import com.whylogs.spark.WhyLogs._

Expand Down Expand Up @@ -127,6 +171,96 @@ class WhyLogsTest extends AnyFunSuite with SharedSparkContext {
assert(dp.getColumns.size() == 608)
}

test("test WhyLogsSession with RankingMetrics") {
import com.whylogs.spark.WhyLogs._

val file = Files.createTempFile("data", ".parquet")
Files.copy(WhyLogs.getClass.getResourceAsStream("/prediction_data.parquet"), file, StandardCopyOption.REPLACE_EXISTING)
val predictionAndLabelsRDD = spark.sparkContext.parallelize(
Seq(
(Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5)),
(Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)),
(Array(1, 2, 3, 4, 5), Array(0, 0, 0, 0, 0))),
2)
val predictionAndLabelsDF = spark.createDataFrame(predictionAndLabelsRDD).toDF("predictions", "labels")
println(s"predictionAndLabelsDF: ${predictionAndLabelsDF}")
println(s"predictionAndLabelsDF schema: ${predictionAndLabelsDF.printSchema()}")
val selectedDF = predictionAndLabelsDF.select("predictions", "labels")
val rddOfTuples = selectedDF.rdd
.map(row => (
row.getAs[Seq[Int]]("predictions").toArray,
row.getAs[Seq[Int]]("labels").toArray
))

val eps = 1.0e-5
val metrics = new RankingMetrics(rddOfTuples)
val map = metrics.meanAveragePrecision

assert(metrics.precisionAt(1) ~== 1.0 / 3 absTol eps)
assert(metrics.precisionAt(2) ~== 1.0 / 3 absTol eps)
assert(metrics.precisionAt(3) ~== 1.0 / 3 absTol eps)
assert(metrics.precisionAt(4) ~== 0.75 / 3 absTol eps)
assert(metrics.precisionAt(5) ~== 0.8 / 3 absTol eps)
assert(metrics.precisionAt(10) ~== 0.8 / 3 absTol eps)
assert(metrics.precisionAt(15) ~== 8.0 / 45 absTol eps)

println(s" *** precision@1=${metrics.precisionAt(1)}")

assert(map ~== 0.355026 absTol eps)

assert(metrics.meanAveragePrecisionAt(1) ~== 0.333334 absTol eps)
assert(metrics.meanAveragePrecisionAt(2) ~== 0.25 absTol eps)
assert(metrics.meanAveragePrecisionAt(3) ~== 0.24074 absTol eps)
println(s" *** map@1=${metrics.meanAveragePrecisionAt(1)}")

assert(metrics.ndcgAt(3) ~== 1.0 / 3 absTol eps)
assert(metrics.ndcgAt(5) ~== 0.328788 absTol eps)
assert(metrics.ndcgAt(10) ~== 0.487913 absTol eps)
assert(metrics.ndcgAt(15) ~== metrics.ndcgAt(10) absTol eps)
println(s" *** ndcg@3=${metrics.ndcgAt(3)}")

assert(metrics.recallAt(1) ~== 1.0 / 15 absTol eps)
assert(metrics.recallAt(2) ~== 8.0 / 45 absTol eps)
assert(metrics.recallAt(3) ~== 11.0 / 45 absTol eps)
assert(metrics.recallAt(4) ~== 11.0 / 45 absTol eps)
assert(metrics.recallAt(5) ~== 16.0 / 45 absTol eps)
assert(metrics.recallAt(10) ~== 2.0 / 3 absTol eps)
assert(metrics.recallAt(15) ~== 2.0 / 3 absTol eps)
println(s" *** recall@3=${metrics.recallAt(1)}")

val metricsSequence = Seq((
metrics.precisionAt(2),
metrics.meanAveragePrecisionAt(2),
metrics.ndcgAt(2),
metrics.recallAt(2)))
println(metricsSequence)
//val metricOutput = metricsSequence.toDF("precision_at_2", "map_at_2", "ndcg_at_2", "recall_at_2")
//println(s" *** ranking metrics at k:${metricsOutput.show()}")
}

test("test rankingMetricDF") {
import com.whylogs.spark.WhyLogs._

val predictionAndLabelsRDD = spark.sparkContext.parallelize(
Seq(
(Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5)),
(Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)),
(Array(1, 2, 3, 4, 5), Array(0, 0, 0, 0, 0))),
2)
val predictionAndLabelsDF = spark.createDataFrame(predictionAndLabelsRDD).toDF("predictions", "labels")
val session = predictionAndLabelsDF.newProfilingSession("foobar")
.withRankingMetrics(predictionField="predictions", targetField="labels", k=Some(2))
val rankingMetricsDF = session.rankingMetricDF(predictionAndLabelsDF)
println(s"rankingMetricsDF: ${rankingMetricsDF.show()}")
assert(!rankingMetricsDF.head(1).isEmpty)
val eps = 1.0e-5
val result = rankingMetricsDF.head(1)(0)
assert(result.getAs[Double]("precision_k_2") ~== 1.0 / 3 absTol eps )
assert(result.getAs[Double]("average_precision_k_2") ~== 1.0 / 4 absTol eps )
assert(result.getAs[Double]("norm_dis_cumul_gain_k_2") ~== 1.0 / 3 absTol eps )
assert(result.getAs[Double]("recall_k_2") ~== 8.0 / 45 absTol eps )
}

test("profile null value") {
val schema = List(
StructField("name", StringType, nullable = false),
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "whylogs"
version = "0.7.9"
version = "0.7.10"
description = "Profile and monitor your ML data pipeline end-to-end"
authors = ["WhyLabs.ai <support@whylabs.ai>"]
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/whylogs/_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""WhyLabs version number."""

__version__ = "0.7.9"
__version__ = "0.7.10"