Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Feature/offset skip #342

Open
wants to merge 8 commits into
base: branch-3.0.14
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -48,6 +48,10 @@ public class Search extends JSONBuilder {
@JsonProperty("refresh")
private Boolean refresh;

/** Firsts rows to skip. */
@JsonProperty("skip")
private Integer skip;

/** Default constructor. */
public Search() {
}
Expand Down Expand Up @@ -97,4 +101,14 @@ public Search refresh(Boolean refresh) {
return this;
}

/**
* Sets the skip value for this search
*
* @param skip the offset
* @return this builder with the specified skip
*/
public Search skip(Integer skip) {
this.skip = skip;
return this;
}
}
5 changes: 4 additions & 1 deletion doc/documentation.rst
Expand Up @@ -1991,6 +1991,7 @@ Lucene indexes are queried using a custom JSON syntax defining the kind of searc
(, query: ( <query> )* )?
(, sort: ( <sort> )* )?
(, refresh: ( true | false ) )?
(, skip: <skip> )?
}');

where <filter> and <query> are a JSON object:
Expand Down Expand Up @@ -2046,11 +2047,13 @@ explicitly refresh all the index shards with an empty search with consistency
CONSISTENCY ALL
SELECT * FROM <table> WHERE expr(<index_name>, '{refresh:true}');
CONSISTENCY QUORUM

This way the subsequent searches will view all the writes done before this
operation, without needing to wait for the index auto refresh. It is useful to
perform this operation before searching after a bulk data load.

The ``skip`` option lets you skip the first 'skip' rows. It is not compatible
with paging or top-K queries.

Types of search and their options are summarized in the table below.
Details for each of them are available in individual sections and the
examples can be downloaded as a CQL script:
Expand Down
Expand Up @@ -16,6 +16,7 @@
package com.stratio.cassandra.lucene.search;

import com.google.common.base.MoreObjects;
import com.stratio.cassandra.lucene.IndexException;
import com.stratio.cassandra.lucene.IndexPagingState;
import com.stratio.cassandra.lucene.schema.Schema;
import com.stratio.cassandra.lucene.search.condition.Condition;
Expand Down Expand Up @@ -44,7 +45,8 @@ public class Search {

protected static final Logger logger = LoggerFactory.getLogger(Search.class);

private static final boolean DEFAULT_FORCE_REFRESH = false;
static final boolean DEFAULT_FORCE_REFRESH = false;
static final int DEFAULT_SKIP = 0;

/** The mandatory conditions not participating in scoring. */
public final List<Condition> filter;
Expand All @@ -61,6 +63,9 @@ public class Search {
/** The paging state. */
private final IndexPagingState paging;

/** Firsts rows to skip. */
private int skip;

/**
* Constructor using the specified querying, filtering, sorting and refresh options.
*
Expand All @@ -69,17 +74,24 @@ public class Search {
* @param sort the sort fields for the query
* @param paging the paging state
* @param refresh if this search must refresh the index before reading it
* @param skip skip this number of first rows
*/
public Search(List<Condition> filter,
List<Condition> query,
List<SortField> sort,
IndexPagingState paging,
Boolean refresh) {
Boolean refresh,
Integer skip) {
this.filter = filter == null ? Collections.EMPTY_LIST : filter;
this.query = query == null ? Collections.EMPTY_LIST : query;
this.sort = sort == null ? Collections.EMPTY_LIST : sort;
this.paging = paging;
this.refresh = refresh == null ? DEFAULT_FORCE_REFRESH : refresh;
if ((skip != null) && (skip < 0)) {
throw new IndexException("skip must be positive.");
}

this.skip = skip == null ? DEFAULT_SKIP : skip;
}

/**
Expand Down Expand Up @@ -127,6 +139,24 @@ public boolean isEmpty() {
return filter.isEmpty() && query.isEmpty() && sort.isEmpty();
}

/**
* Returns the number of first rows to skip
*
* @return the number of rows to skip
*/
public int getSkip() {
return skip;
}

/**
* Returns if this search has an offset
*
* @return if this search has an offset
*/
public Boolean useSkip() {
return skip > 0;
}

/**
* Returns the Lucene {@link Query} represented by this search, with the additional optional data range filter.
*
Expand Down
Expand Up @@ -57,6 +57,9 @@ public class SearchBuilder implements Builder<Search> {
@JsonProperty("paging")
private String paging;

@JsonProperty("skip")
private Integer skip;

/** Default constructor. */
SearchBuilder() {
}
Expand Down Expand Up @@ -118,6 +121,17 @@ public SearchBuilder paging(IndexPagingState pagingState) {
return this;
}

/**
* Sets the skip value for this search
*
* @param skip the offset
* @return this builder with the specified skip
*/
public SearchBuilder skip(Integer skip) {
this.skip = skip;
return this;
}

/**
* Returns the {@link Search} represented by this builder.
*
Expand All @@ -129,7 +143,8 @@ public Search build() {
query.stream().map(ConditionBuilder::build).collect(toList()),
sort.stream().map(SortFieldBuilder::build).collect(toList()),
paging == null ? null : IndexPagingState.fromByteBuffer(ByteBufferUtils.byteBuffer(paging)),
refresh);
refresh,
skip);
}

/**
Expand Down
Expand Up @@ -161,24 +161,29 @@ class IndexQueryHandler extends QueryHandler with Logging {
if (expressions.size > 1) {
throw new InvalidRequestException(
"Lucene index only supports one search expression per query.")
}

// Validate expression
val (expression, index) = expressions.head
val search = index.validate(expression)

// Get partitioner
val partitioner = index.service.partitioner

// Get paging info
val limit = select.getLimit(options)
val page = getPageSize.invoke(select, options).asInstanceOf[Int]

// Take control of paging if there is paging and the query requires post processing
if (search.requiresPostProcessing && page > 0 && page < limit) {
executeSortedLuceneQuery(select, state, options, partitioner)
} else {
execute(select, state, options)
// Validate expression
val (expression, index) = expressions.head
val search = index.validate(expression)


// Get partitioner
val partitioner = index.service.partitioner

// Get paging info
val limit = select.getLimit(options)
val page = getPageSize.invoke(select, options).asInstanceOf[Int]

if (search.useSkip() && (page < limit)) {
throw new InvalidRequestException("Search 'skip' option is not compatible with paging.")
} else {
// Take control of paging if there is paging and the query requires post processing
if (search.requiresPostProcessing && page > 0 && page < limit) {
executeSortedLuceneQuery(select, state, options, partitioner)
} else {
execute(select, state, options)
}
}
}
}

Expand Down Expand Up @@ -223,6 +228,7 @@ class IndexQueryHandler extends QueryHandler with Logging {
if (data != null) data.close()
}
}

}

/** Companion object for [[IndexQueryHandler]]. */
Expand Down
Expand Up @@ -300,7 +300,7 @@ abstract class IndexService(
tracer.trace(s"Lucene index searching for $count rows")
val partitions = partitioner.partitions(command)
val readers = afters.filter(a => partitions.contains(a._1))
val documents = lucene.search(readers, query, sort, count)
val documents = lucene.search(readers, query, sort, count, search.getSkip)
reader(documents, command, orderGroup)
}

Expand Down
Expand Up @@ -33,6 +33,7 @@ import org.apache.lucene.search._
* @param query the query to be satisfied by the documents
* @param limit the iteration page size
* @param fields the names of the document fields to be loaded
* @param skip the number of first results to skip
* @author Andres de la Pena `adelapena@stratio.com`
*/
class DocumentIterator(
Expand All @@ -41,7 +42,8 @@ class DocumentIterator(
querySort: Sort,
query: Query,
limit: Int,
fields: java.util.Set[String])
fields: java.util.Set[String],
skip: Int)
extends Iterator[(Document, ScoreDoc)] with AutoCloseable with Logging with Tracing {

private[this] val pageSize = Math.min(limit, MAX_PAGE_SIZE) + 1
Expand All @@ -53,6 +55,7 @@ class DocumentIterator(
private[this] val offsets = cursors.map(_ => 0).toArray
private[this] var finished = false
private[this] var closed = false
private[this] var remainingToSkip = skip

private[this] def releaseSearchers(): Unit =
indices.foreach(i => managers(i).release(searchers(i)))
Expand Down Expand Up @@ -111,10 +114,14 @@ class DocumentIterator(
finished = numFetched < pageSize

for (scoreDoc <- scoreDocs) {
val shard = scoreDoc.shardIndex
afters(shard) = Some(scoreDoc)
val document = searchers(shard).doc(scoreDoc.doc, fields)
documents.add((document, scoreDoc))
if (remainingToSkip <= 0) {
val shard = scoreDoc.shardIndex
afters(shard) = Some(scoreDoc)
val document = searchers(shard).doc(scoreDoc.doc, fields)
documents.add((document, scoreDoc))
} else {
remainingToSkip = remainingToSkip - 1
}
}

tracer.trace(s"Lucene index fetches $numFetched documents")
Expand Down
Expand Up @@ -131,6 +131,7 @@ class PartitionedIndex(
indexes.foreach(_.delete())
if (useLocalPath) localPaths.get.foreach((localPath: Path) => deleteRecursive(localPath.toFile))
} finally if (partitions > 1) if (!useLocalPath) deleteRecursive(globalPath.get.toFile)

logger.info(s"Deleted $name")
}

Expand Down Expand Up @@ -222,9 +223,10 @@ class PartitionedIndex(
* @param query the query to search for
* @param sort the sort to be applied
* @param count the max number of results to be collected
* @param skip the number of first results to skip
* @return the found documents, sorted first by `sort`, then by `query` relevance
*/
def search(partitions: List[(Int, Option[Term])], query: Query, sort: Sort, count: Int)
def search(partitions: List[(Int, Option[Term])], query: Query, sort: Sort, count: Int, skip: Int)
: DocumentIterator = {
logger.debug(
s"""Searching in $name
Expand All @@ -235,6 +237,6 @@ class PartitionedIndex(
| sort : $sort
""".stripMargin)
val cursors = partitions.map { case (p, a) => (indexes(p).searcherManager, a) }
new DocumentIterator(cursors, mergeSort, sort, query, count, fields)
new DocumentIterator(cursors, mergeSort, sort, query, count, fields, skip)
}
}
Expand Up @@ -72,6 +72,7 @@ case class PartitionerOnColumn(
s"The paths size must be equal to number of partitions")
}


/** @inheritdoc*/
override def partitions(command: ReadCommand): List[Int] = command match {
case c: SinglePartitionReadCommand => List(partition(c.partitionKey))
Expand Down
Expand Up @@ -65,7 +65,7 @@ case class PartitionerOnVirtualNode(
partitionPerBound(bound) = partition
}

/** @inheritdoc */
/** @inheritdoc*/
override def numPartitions: Int = (numTokens.toDouble / vnodes_per_partition.toDouble).ceil.toInt

partitionPerBound(new Bounds(tokens(numPartitions - 1), new LongToken(Long.MaxValue))) = partition
Expand All @@ -74,7 +74,7 @@ case class PartitionerOnVirtualNode(
partitionPerBound(new Bounds(new LongToken(Long.MinValue), tokens.head)) = partition
}

/** @inheritdoc */
/** @inheritdoc*/
override def partitions(command: ReadCommand): List[Int] = command match {
case c: SinglePartitionReadCommand => List(partition(c.partitionKey))
case c: PartitionRangeReadCommand =>
Expand Down Expand Up @@ -107,10 +107,10 @@ case class PartitionerOnVirtualNode(
}
}

/** @inheritdoc */
/** @inheritdoc*/
override def partition(key: DecoratedKey): Int = partition(key.getToken)

/** @inheritdoc */
/** @inheritdoc*/
private[this] def partition(token: Token): Int =
partitionPerBound.filter(_._1.contains(token)).toList.head._2

Expand Down
Expand Up @@ -16,9 +16,9 @@
package com.stratio.cassandra.lucene.search;

import com.stratio.cassandra.lucene.IndexException;
import com.stratio.cassandra.lucene.common.JsonSerializer;
import com.stratio.cassandra.lucene.search.condition.builder.ConditionBuilder;
import com.stratio.cassandra.lucene.search.sort.builder.SimpleSortFieldBuilder;
import com.stratio.cassandra.lucene.common.JsonSerializer;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -41,18 +41,32 @@ public void testBuild() throws IOException {
SimpleSortFieldBuilder sort2 = field("field4");
SearchBuilder builder = new SearchBuilder().filter(filter)
.query(query)
.sort(sort1, sort2);
String json = builder.toJson();
.sort(sort1, sort2)
.skip(25);
String json = "{" +
"filter:[{type:\"match\",field:\"field1\",value:\"value2\"}]," +
"query:[{type:\"match\",field:\"field2\",value:\"value2\"}]," +
"sort:[{type:\"simple\",field:\"field3\",reverse:false},{type:\"simple\",field:\"field4\",reverse:false}]," +
"refresh:false," +
"skip:25" +
"}";
assertEquals("JSON serialization is wrong", json, JsonSerializer.toString(builder));
}

@Test
public void testJson() {
SearchBuilder searchBuilder = search().filter(match("field1", "value1"))
.query(match("field2", "value2"))
.sort(field("field"));
String json = searchBuilder.toJson();
assertEquals("JSON serialization is wrong", json, SearchBuilder.fromJson(json).toJson());
.sort(field("field"))
.skip(10);
String json = "{" +
"filter:[{type:\"match\",field:\"field1\",value:\"value1\"}]," +
"query:[{type:\"match\",field:\"field2\",value:\"value2\"}]," +
"sort:[{type:\"simple\",field:\"field\",reverse:false}]," +
"refresh:false," +
"skip:10" +
"}";
assertEquals("JSON serialization is wrong", json, searchBuilder.toJson());
}

@Test(expected = IndexException.class)
Expand Down