This repository has been archived by the owner on Feb 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
SearchRDDJava.java
222 lines (199 loc) · 7.66 KB
/
SearchRDDJava.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/*
* Copyright © 2020 Spark Search (The Spark Search Contributors)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.search.rdd;
import org.apache.lucene.search.Query;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.search.SearchException;
import org.apache.spark.search.SearchOptions;
import org.apache.spark.search.SearchRecordJava;
import scala.Tuple2;
import scala.reflect.ClassTag;
import java.io.Serializable;
/**
* Definition of search RDD for java.
*/
public interface SearchRDDJava<S> {
/**
* {@link org.apache.spark.search.rdd.SearchRDD#count()}
*/
long count();
/**
* {@link org.apache.spark.search.rdd.SearchRDD#count(String)}
*/
long count(String query);
/**
* {@link org.apache.spark.search.rdd.SearchRDD#searchList(String, int, double)}
*/
SearchRecordJava<S>[] searchList(String query, int topK);
/**
* {@link org.apache.spark.search.rdd.SearchRDD#searchList(String, int, double)}
*/
SearchRecordJava<S>[] searchList(String query, int topK, double minScore);
/**
* {@link org.apache.spark.search.rdd.SearchRDD#search(String, int, double)}
*/
JavaRDD<SearchRecordJava<S>> search(String query, int topK, double minScore);
/**
* Searches for this input RDD elements matches against these ones
* by building a lucene query string per doc
* and returns matching hits per doc.
*
* @param rdd to match with
* @param queryBuilder builds the query string to join with the searched document
* @param topK topK to return
* @param minScore minimum score of matching documents
* @param <V> Doc type to match with
* @return matches doc and related hits RDD
*/
<K, V> JavaPairRDD<K, Tuple2<V, SearchRecordJava<S>[]>> matches(JavaPairRDD<K, V> rdd,
QueryStringBuilder<V> queryBuilder,
int topK,
double minScore);
/**
* Searches for this input RDD elements matches against these ones
* by building a lucene query per doc
* and returns matching hits per doc.
*
* @param rdd to match with
* @param queryBuilder builds the lucene query to join with the searched document
* @param topK topK to return
* @param minScore minimum score of matching documents
* @param <V> Doc type to match with
* @return matches doc and related hits RDD
*/
<K, V> JavaPairRDD<K, Tuple2<V, SearchRecordJava<S>[]>> matchesQuery(JavaPairRDD<K, V> rdd,
QueryBuilder<V> queryBuilder,
int topK,
double minScore);
/**
* Saves the current indexed RDD onto hdfs
* in order to be able to reload it later on.
*
* @param path Path on the spark file system (hdfs) to save on
*/
void save(String path);
/**
* Returns this search RDD as a classical search RDD.
*
* @return A classical search RDD
*/
JavaRDD<S> javaRDD();
/**
* Builds a lucene query string to search for matching hits
* against the input bean.
*/
@FunctionalInterface
interface QueryStringBuilder<T> extends Serializable {
String build(T doc);
}
/**
* Builds a lucene query to search for matching hits
* against the input bean.
*/
@FunctionalInterface
interface QueryBuilder<T> extends Serializable {
Query build(T doc);
}
/**
* Builder to build a search java rdd.
*
* @param <T> Runtime type of document to index
* @return A search RDD builder
*/
static <T> Builder<T> builder() {
return new Builder<>();
}
/**
* Creates a search java rdd.
*
* @param rdd RDD to index and search on
* @param clazz Runtime time of the index documents
* @param <T> Runtime type of document to index
* @return A search RDD builder
*/
static <T> SearchRDDJava<T> of(JavaRDD<T> rdd, Class<T> clazz) {
return SearchRDDJava.<T>builder().rdd(rdd).runtimeClass(clazz).build();
}
/**
* Reload an indexed RDD from spark FS.
*
* @param sc Spark context
* @param path Path where the search rdd lucene indices were previously saved
* @param clazz Runtime class instance T of indexed document
* @param <T> Runtime class of indexed document
* @return Reloaded search java rdd
*/
static <T> SearchRDDJava<T> load(JavaSparkContext sc, String path, Class<T> clazz) {
return load(sc, path, clazz, SearchOptions.defaultOptions());
}
/**
* Reload an indexed RDD from spark FS.
*
* @param sc Spark context
* @param path Path where the search rdd lucene indices were previously saved
* @param clazz Runtime class instance T of indexed document
* @param options Search option
* @param <T> Runtime class of indexed document
* @return Reloaded search java rdd
*/
static <T> SearchRDDJava<T> load(JavaSparkContext sc, String path,
Class<T> clazz, SearchOptions<T> options) {
try {
return (SearchRDDJava<T>) SearchRDDJava.class.getClassLoader()
.loadClass("org.apache.spark.search.rdd.SearchRDDReloadedJava")
.getDeclaredMethod("load", SparkContext.class, String.class,
SearchOptions.class, ClassTag.class)
.invoke(
null,
sc.sc(), path,
options, scala.reflect.ClassTag$.MODULE$.apply(clazz));
} catch (Exception e) {
throw new SearchException("Unable to reload SearchRDDJava from path "
+ path + ", got: " + e, e);
}
}
class Builder<T> {
private JavaRDD<T> rdd;
private Class<T> clazz;
private SearchOptions<T> options = SearchOptions.defaultOptions();
private Builder() {
}
public Builder<T> runtimeClass(Class<T> clazz) {
this.clazz = clazz;
return this;
}
public Builder<T> rdd(JavaRDD<T> rdd) {
this.rdd = rdd;
return this;
}
public Builder<T> options(SearchOptions<T> options) {
this.options = options;
return this;
}
public SearchRDDJava<T> build() {
if (rdd == null) {
throw new SearchException("Please specify rdd to search for");
}
if (clazz == null) {
throw new SearchException("Please specify runtime class of element to search for");
}
return new SearchRDDJava2Scala<>(rdd, clazz, options);
}
}
}