Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5094][MLlib] Add Python API for Gradient Boosted Trees #3951

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 76 additions & 0 deletions examples/src/main/python/mllib/gradient_boosted_trees.py
@@ -0,0 +1,76 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Gradient boosted Trees classification and regression using MLlib.
"""

import sys

from pyspark.context import SparkContext
from pyspark.mllib.tree import GradientBoostedTrees
from pyspark.mllib.util import MLUtils


def testClassification(trainingData, testData):
# Train a GradientBoostedTrees model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = GradientBoostedTrees.trainClassifier(trainingData, categoricalFeaturesInfo={},
numIterations=30, maxDepth=4)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() \
/ float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification ensemble model:')
print(model.toDebugString())


def testRegression(trainingData, testData):
# Train a GradientBoostedTrees model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo={},
numIterations=30, maxDepth=4)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() \
/ float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression ensemble model:')
print(model.toDebugString())


if __name__ == "__main__":
if len(sys.argv) > 1:
print >> sys.stderr, "Usage: gradient_boosted_trees"
exit(1)
sc = SparkContext(appName="PythonGradientBoostedTrees")

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

print('\nRunning example of classification using GradientBoostedTrees\n')
testClassification(trainingData, testData)

print('\nRunning example of regression using GradientBoostedTrees\n')
testRegression(trainingData, testData)

sc.stop()
Expand Up @@ -41,10 +41,11 @@ import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.stat.correlation.CorrelationNames
import org.apache.spark.mllib.stat.test.ChiSqTestResult
import org.apache.spark.mllib.tree.{RandomForest, DecisionTree}
import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest, DecisionTree}
import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Algo, Strategy}
import org.apache.spark.mllib.tree.impurity._
import org.apache.spark.mllib.tree.model.{RandomForestModel, DecisionTreeModel}
import org.apache.spark.mllib.tree.loss.Losses
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel, DecisionTreeModel}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -528,6 +529,35 @@ class PythonMLLibAPI extends Serializable {
}
}

/**
* Java stub for Python mllib GradientBoostedTrees.train().
* This stub returns a handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on exit;
* see the Py4J documentation.
*/
def trainGradientBoostedTreesModel(
data: JavaRDD[LabeledPoint],
algoStr: String,
categoricalFeaturesInfo: JMap[Int, Int],
lossStr: String,
numIterations: Int,
learningRate: Double,
maxDepth: Int): GradientBoostedTreesModel = {
val boostingStrategy = BoostingStrategy.defaultParams(algoStr)
boostingStrategy.setLoss(Losses.fromString(lossStr))
boostingStrategy.setNumIterations(numIterations)
boostingStrategy.setLearningRate(learningRate)
boostingStrategy.treeStrategy.setMaxDepth(maxDepth)
boostingStrategy.treeStrategy.categoricalFeaturesInfo = categoricalFeaturesInfo.asScala.toMap

val cached = data.rdd.persist(StorageLevel.MEMORY_AND_DISK)
try {
GradientBoostedTrees.train(cached, boostingStrategy)
} finally {
cached.unpersist(blocking = false)
}
}

/**
* Java stub for mllib Statistics.colStats(X: RDD[Vector]).
* TODO figure out return type.
Expand Down
41 changes: 34 additions & 7 deletions python/pyspark/mllib/tests.py
Expand Up @@ -142,7 +142,7 @@ def test_clustering(self):

def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
data = [
LabeledPoint(0.0, [1, 0, 0]),
LabeledPoint(1.0, [0, 1, 1]),
Expand Down Expand Up @@ -171,18 +171,31 @@ def test_classification(self):
self.assertTrue(nb_model.predict(features[3]) > 0)

categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = \
DecisionTree.trainClassifier(rdd, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo)
dt_model = DecisionTree.trainClassifier(
rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)

rf_model = RandomForest.trainClassifier(
rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)

gbt_model = GradientBoostedTrees.trainClassifier(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)

def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
data = [
LabeledPoint(-1.0, [0, -1]),
LabeledPoint(1.0, [0, 1]),
Expand Down Expand Up @@ -211,13 +224,27 @@ def test_regression(self):
self.assertTrue(rr_model.predict(features[3]) > 0)

categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = \
DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
dt_model = DecisionTree.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)

rf_model = RandomForest.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)

gbt_model = GradientBoostedTrees.trainRegressor(
rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)


class StatTests(PySparkTestCase):
# SPARK-4023
Expand Down