-
Notifications
You must be signed in to change notification settings - Fork 0
/
PredictRating.java
108 lines (83 loc) · 3.8 KB
/
PredictRating.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package finalproject_try;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class PredictRating {
public static class PredMapper extends Mapper<Object, Text, Text, Text>{
//user0,customer1,sim1,movie0:rating0,movie1:rating1
private Text outKey = new Text();
private Text outValue = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
String user = line[0];
float sim = Float.parseFloat(line[2]);
String movie = "";
float rating = 0;
for(int i = 3; i < line.length; i++){
String[] mr = line[i].split(":");
movie = mr[0];
rating = Float.parseFloat(mr[1]);
outKey.set(user+":"+movie);
outValue.set(String.valueOf(sim)+":"+String.valueOf(sim*rating));
context.write(outKey, outValue);
}
}
}
public static class PredReducer extends Reducer<Text,Text,Text,Text> {
//user:movie,[sim1:product1,sim2:product2,..]
//user:movie,rating
Text outValue = new Text();
public void reduce(Text key, Iterable<Text> values,Context context
) throws IOException, InterruptedException {
float sum_sim = 0;
float sum_product = 0;
for(Text val:values){
String[] line = val.toString().split(":");
float sim = Float.parseFloat(line[0]);
float product = Float.parseFloat(line[1]);
sum_sim = sum_sim+sim;
sum_product = sum_product+product;
}
outValue.set(key.toString()+","+String.valueOf(sum_product/sum_sim));
context.write(null, outValue);
}
}
public static class PredPartitioner extends HashPartitioner<Text, Text>{
@Override
public int getPartition(Text key, Text result, int numReduceTasks) {
int num = key.toString().hashCode();
return (num&Integer.MAX_VALUE)%numReduceTasks;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "predict rating");
job.setJarByClass(PredictRating.class);
MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[2]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[3]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[4]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[5]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[6]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[7]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[8]),TextInputFormat.class,PredMapper.class);
// MultipleInputs.addInputPath(job,new Path(args[9]),TextInputFormat.class,PredMapper.class);
job.setPartitionerClass(PredPartitioner.class);
job.setNumReduceTasks(5);
job.setReducerClass(PredReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}