Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For executing SparkRandomForestClassifier how should I create a BlockRDD #73

Open
MyPythonGitHub opened this issue Dec 10, 2016 · 5 comments

Comments

@MyPythonGitHub
Copy link

Hi,
I am quite new to Sparkit-learn. In order to execute SparkRandomForestClassifier, I need to convert my input dataframe (created as columns retrieved from a Hive table) to Spark BlockRDD. Please can you let me know about how do I do that.

Thanks !

@taynaud
Copy link
Collaborator

taynaud commented Dec 12, 2016

SparkRandomForestClassifier expect a DictRDD with an "X" ArrayRDD and an "y" ArrayRDD.

An ArrayRDD is an RDD of numpy array, so you have to build a small job to transform your DataFrame to 2 rdd of numpy array.

In general, my models expect a list of dict as input, using custom Pipelines with selectors to extract relevant features.

Thus, my conversion routine looks like:

def mapper(partition):
    as_dict = [r.asDict() for r in partition]
    targets = [d[key] for d in as_dict]
    return np.array([as_dict, targets]).T

new_rdd = dataset.rdd.mapPartitions(mapper)
dict_rdd =  DictRDD(new_rdd,
                                 columns=('X', 'y'),
                                 bsize=bsize,
                                 dtype=[np.ndarray, np.ndarray])

You will need to convert your dataframe to numpy array in mapper differently if you have not the same hypothesis than me.

Be also aware that the number of trees behavior is slightly different than the scikit behavior.

SparkRandomForestClassifier will train distinct random forest with n_trees on each partition and then merge them. Thus if you use n_trees=500 and you have 10 partitions in your dataframe, you'll get a final RandomForest of 500 * 10 trees.

Best,

@MyPythonGitHub
Copy link
Author

Thanks Thomas for your help ! I am gradually getting there.

After executing this code (I have replaced 'dataset' with my dataframe name), I am getting the error : NameError: name 'bsize' is not defined

I put bsize = 5 (just a random number, I picked) and then I am getting the error:

NameError: global name 'key' is not defined
I know, I am missing something obvious here but if you can please help me to identify where I am going wrong.

Thanks again !

@taynaud
Copy link
Collaborator

taynaud commented Dec 13, 2016

Hello,

My code was an example, your model really need to fill in my workflow with big SparkPipeline wanting dictionnary as input. You need yo write the code to convert your DataFrame as a rdd of numpy array.

See the quickstart https://github.com/lensacom/sparkit-learn

You can also look at the tests to see how to use it, for instance https://github.com/lensacom/sparkit-learn/blob/master/splearn/ensemble/tests/__init__.py and https://github.com/lensacom/sparkit-learn/blob/master/splearn/utils/testing.py

@MyPythonGitHub
Copy link
Author

Hi Thomas,
Thanks for your feedback ! I am now facing an error which says : TypeError: float() argument must be a string or a number. Wanted to check with you, whether the model accepts float data and in which case will you please guide me to help resolve this error please

Here is my code:

Creating Spark Dataframe

features_input = sqlContext.sql("select feature1, feature2, label from input_table")

##Replacing Nulls with Zeros
features_input = features_input.na.fill(0)

def mapper(partition):
as_dict = [r.asDict() for r in partition]
targets = [d['label'] for d in as_dict]
return np.array([as_dict,targets]).T

f_rdd = features_input.rdd.mapPartitions(mapper)

dict_f_rdd = DictRDD(f_rdd, columns=('X','y'), bsize=3,dtype=[np.ndarray, np.ndarray])
clf = SparkRandomForestClassifier(n_estimators=500, n_jobs=-1)
clf.fit(dict_f_rdd)

Here is my error message:

16/12/13 05:50:19 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/2.3.2.0-2950/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "build/bdist.linux-x86_64/egg/splearn/ensemble/init.py", line 140, in
File "/var/opt/teradata/anaconda4.1.1/anaconda/lib/python2.7/site-packages/sklearn/ensemble/forest.py", line 212, in fit
X = check_array(X, dtype=DTYPE, accept_sparse="csc")
File "/var/opt/teradata/anaconda4.1.1/anaconda/lib/python2.7/site-packages/sklearn/utils/validation.py", line 373, in check_array
array = np.array(array, dtype=dtype, order=order, copy=copy)
TypeError: float() argument must be a string or a number

Here is my data:

Data:
[(array([ {‘feature1': 0.0, ‘feature2’: 0.0, 'label': 0},
{‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0},
{‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0, 0, 0])), (array([ {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0},
{‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0},
{‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0, 0, 0])), (array([ {‘feature1’: 0.0, ‘feature2’: 3.066666666666667, 'label': 0},
{‘feature1’: 13.833333333333334, ‘feature2’: 13.833333333333334, 'label': 0},
{‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0, 0, 0])), (array([ {‘feature1’: 0.0, ‘feature2’: 0.0, 'label': 0}], dtype=object), array([0]))]

@taynaud
Copy link
Collaborator

taynaud commented Dec 20, 2016

Do your model expect to fit on an array of dict ?

If not you have to build a rdd containing acceptable input for your model.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants