/
bigdata.py
343 lines (245 loc) · 11 KB
/
bigdata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Used through a terminal
cat <input-file> | python mapper.py | sort -k1,1 | python reducer.py | sort -k1,1 > results.txt
# cat reads the file
# | chains to the next action, > pipes to file
# sort by descending, first column
# mapper.py
import sys
# get text from standard input
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1)
# reducer.py
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
# try to count, if error continue
try:
count = int(count)
except ValueError:
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
#---------------------------------------------------------------------------------------------------------------
# Use VirtualBox
# Terminal
vagrant up
vagrant ssh # start virtual machine
bigdata_start.sh # start Hive, Spark, etc..
pyspark # start spark
exit # exit out of virtual machine
vagrant half # stop virtual machine
http://127.0.0.1:18888/ # access to jupyter notebook in virtual machine
http://10.211.55.101:8088/cluster # opens hadoop
hadoop fs -ls / # same as cmd line on local machine
#---------------------------------------------------------------------------------------------------------------
## Spark
vagrant up
vagrant ssh
spark_local_start.sh
# Spark into a DataFrame
from pyspark.sql.types import *
# trips.first().split(',')
fields = [StructField(field_name, StringType(), True) for field_name in trips.first().split(',')]
schema = StructType(fields)
tripsRDD = sqlContext.createDataFrame(trips.map(lambda line: line.split(",")), schema)
# Filter now!
tripsRDD.filter(tripsRDD['End Terminal'] == 70).collect()
#---------------------------------------------------------------------------------------------------------------
#######################################################
### Spark - Feature Preparation (RDDs, Spark SQL) ###
### Model Training (MLlib) ###
### Model Evaluation (MLlib) ###
### Production Use (model.predict()) ###
#######################################################
## Feature Preparation ##
df = spark.read.csv("/../hello.csv", header=True, mode="DROPMALFORMED")
## rdd.printSchema() ##
df.select("Store Number").describe().show()
## show df as raw data ##
df.take(5)
## show df similar to pandas, but not as good ##
df.select(df.columns).show(5)
df.select("Date", "Store Number", "Category Name", "Bottles Sold", "Sale (Dollars)").show(5)
## Changing the dtype of every column ##
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.sql.functions import udf, regexp_replace
# stripDollarSigns = udf(lambda s: s.replace("$", ""), DoubleType())
df = df \
.withColumn("Store Number", df["Store Number"].cast("integer")) \
.withColumn("Sale (Dollars)", regexp_replace("Sale (Dollars)", "\\$", "").cast("double")) \
.withColumn("Zip Code", df["Zip Code"].cast("integer")) \
.withColumn("County Number", df["County Number"].cast("integer")) \
.withColumn("Vendor Number", df["Vendor Number"].cast("integer")) \
.withColumn("Item Number", df["Item Number"].cast("integer")) \
.withColumn("Bottle Volume (ml)", df["Bottle Volume (ml)"].cast("integer")) \
.withColumn("State Bottle Cost", regexp_replace("State Bottle Cost", "\\$", "")) \
.withColumn("State Bottle Retail", regexp_replace("State Bottle Retail", "\\$", "")) \
.withColumn("Bottles Sold", df["Bottles Sold"].cast("integer")) \
.withColumn("Volume Sold (Liters)", df["Volume Sold (Liters)"].cast("double")) \
.withColumn("Volume Sold (Gallons)", df["Volume Sold (Gallons)"].cast("double"))
df.printSchema()
df.show(5)
## Similar to Pandas .describe() ##
df.select(df.columns).describe().show()
df.select(["Zip Code", "Bottle Volume (ml)", "Bottles Sold", "Sale (Dollars)", "Volume Sold (Liters)"]).describe().show()
## Linear Regression Modeling ##
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
features = ["Bottles Sold", "Sale (Dollars)", "Bottle Volume (ml)"]
response = "Volume Sold (Liters)"
X = df.rdd.map(
lambda row: LabeledPoint(row[response], [row[feature] for feature in features])
)
## Train-Test split ##
# Split the data into training and test sets (30% held out for testing)
trainingData, testData = X.randomSplit([0.7, 0.3])
## Train on LinearRegression ##
linearModel = LinearRegressionWithSGD.train(trainingData, iterations=100, step=0.000001)
## Examining Coefficients ##
zip(features, linearModel.weights.array)
## Regression Methods ##
from pyspark.mllib.evaluation import RegressionMetrics
prediObserRDD = testData.map(lambda row: (float(linearModel.predict(row.features)), row.label)).cache()
metrics = RegressionMetrics(prediObserRDD)
print """
R2: %.6f
Explained Variance: %.6f
MSE: %.6f
RMSE: %.6f
""" % (metrics.r2, metrics.explainedVariance, metrics.meanSquaredError, metrics.rootMeanSquaredError)
#########################################################################
### Another way to load up the clean data is to pre-create the schema ###
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("PassengerId", IntegerType()),
StructField("Survived", IntegerType()),
StructField("Pclass", IntegerType()),
StructField("Name", StringType()),
StructField("Sex", StringType()),
StructField("Age", DoubleType()),
StructField("SibSp", IntegerType()),
StructField("Parch", IntegerType()),
StructField("Fare", DoubleType()),
StructField("Embarked", StringType())
])
df = spark.read.csv("../../../hello.csv", header=True, mode="DROPMALFORMED", schema=schema)
# Print schema, and then show the first 5 records in printed format
df.printSchema()
df.show(5)
#########################################################################
## Logistic Regression Modeling ##
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
logisticModel = LogisticRegressionWithLBFGS.train(trainingData)
## Examining Coefficients ##
zip(features, logisticModel.weights.array)
## Regression Metrics ##
prediObserRDD = testData.map(lambda row: (float(logisticModel.predict(row.features)), row.label)).cache()
metrics = RegressionMetrics(prediObserRDD)
print """
R2: %.6f
Explained Variance: %.6f
MSE: %.6f
RMSE: %.6f
""" % (metrics.r2, metrics.explainedVariance, metrics.meanSquaredError, metrics.rootMeanSquaredError)
## Classification Metrics ##
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Overall accuracy
def testError(lap):
return lap.filter(lambda (v, p): v != p).count() / float(testData.count())
accuracy = testError(prediObserRDD)
print "Test Accuracy = %s" % accuracy
# Instantiate metrics object
metrics = BinaryClassificationMetrics(prediObserRDD)
# Area under precision-recall curve
print "Area under PR = %s" % metrics.areaUnderPR
# Area under ROC curve
print "Area under ROC = %s" % metrics.areaUnderROC
## Multi-class Metrics (Multinomial Response) ##
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(prediObserRDD)
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print "Summary Stats"
print "--------------------"
print "Accuracy = %s" % metrics.accuracy
print "Precision = %s" % precision
print "Recall = %s" % recall
print "F1 Score = %s" % f1Score
## Random Forests with PySpark (Not fully implemented) ##
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
##########################################################
##########################################################
##########################################################
##########################################################
################### ####################
################### df.toPandas() ####################
################### ####################
##########################################################
##########################################################
##########################################################
##########################################################
# Other good features to know
# Pipelines
# ParamGridSearch
# Model Loading/Saving
#---------------------------------------------------------------------------------------------------------------
## AWS (Create EC2 Instance)
# Create an EC2 instance (pick region!)
# If free, please use the Ubuntu server
# Create a key pair or you cannot connect to your server
# Secure your server by "Create Security Group"
# Inbound, create Custom TCP Rule, TCP, 8888, Anywhere
# Secure way to connect to AWS ubuntu server with ssh file (must create security group)
ssh -i ~/.ssh/ssh_file.pem -L 18888:127.0.0.1:8888 ubuntu@'Public DNS'
# Change permission of your ssh file
chmod 600 ~/.ssh/ssh_file.pem
# Setting up AWS server on ubuntu
5 sudo apt-get update
6 sudo apt-get install python-pip
7 history
8 sudo apt-get install python-2.7
9 sudo apt-get install anaconda
10 pip install upgrade pip
11 sudo pip install --upgrade pip
12 sudo pip install sklearn
13 sudo pip install pandas
14 sudo pip install numpy
15 sudo pip install cPickle
jupyter notebook --ip='*'
# q to quit, then y for yes in next prompt, *DO NOT CTRL-C*
# then type the ip below into browser
localhost:18888/tree
# Create Image - creates a personal instance for future use if you need to create a new instance/server
#---------------------------------------------------------------------------------------------------------------