Skip to content

Commit

Permalink
maint(core): Extracting traits for Part Key Indexes to explore altern…
Browse files Browse the repository at this point in the history
…ate implementations (#1767)
  • Loading branch information
vishramachandran committed May 14, 2024
1 parent 64c49a3 commit 3e40873
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
}
}

private val partKeyIndex = new PartKeyLuceneIndex(indexDataset, schemas.part, false,
private val partKeyIndex: PartKeyIndexDownsampled = new PartKeyLuceneIndex(indexDataset, schemas.part, false,
false, shardNum, indexTtlMs,
downsampleConfig.indexLocation.map(new java.io.File(_)),
indexMetadataStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RawIndexBootstrapper(colStore: ColumnStore) {
* @param assignPartId the function to invoke to get the partitionId to be used to populate the index record
* @return number of updated records
*/
def bootstrapIndexRaw(index: PartKeyLuceneIndex,
def bootstrapIndexRaw(index: PartKeyIndexRaw,
shardNum: Int,
ref: DatasetRef)
(assignPartId: PartKeyRecord => Int): Task[Long] = {
Expand Down Expand Up @@ -277,7 +277,7 @@ class DownsampleIndexBootstrapper(colStore: ColumnStore,
* creation requires more careful contention analysis. Bootstrap index operation
* builds entire index from scratch
*/
def bootstrapIndexDownsample(index: PartKeyLuceneIndex,
def bootstrapIndexDownsample(index: PartKeyIndexDownsampled,
shardNum: Int,
ref: DatasetRef,
ttlMs: Long,
Expand Down Expand Up @@ -321,7 +321,7 @@ class DownsampleIndexBootstrapper(colStore: ColumnStore,
* @return number of records refreshed
*/
def refreshWithDownsamplePartKeys(
index: PartKeyLuceneIndex,
index: PartKeyIndexDownsampled,
shardNum: Int,
ref: DatasetRef,
fromHour: Long,
Expand Down
207 changes: 207 additions & 0 deletions core/src/main/scala/filodb.core/memstore/PartKeyIndex.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package filodb.core.memstore

import org.apache.lucene.util.BytesRef

import filodb.core.memstore.ratelimit.CardinalityTracker
import filodb.core.metadata.PartitionSchema
import filodb.core.query.ColumnFilter

trait PartKeyIndexRaw {

/**
* Clear the index by deleting all documents and commit
*/
def reset(): Unit

/**
* Start the asynchronous thread to automatically flush
* new writes to readers at the given min and max delays
*/
def startFlushThread(flushDelayMinSeconds: Int, flushDelayMaxSeconds: Int): Unit

/**
* Find partitions that ended ingesting before a given timestamp. Used to identify partitions that can be purged.
* @return matching partIds
*/
def partIdsEndedBefore(endedBefore: Long): debox.Buffer[Int]

/**
* Method to delete documents from index that ended before the provided end time
*
* @param endedBefore the cutoff timestamp. All documents with time <= this time will be removed
* @param returnApproxDeletedCount a boolean flag that requests the return value to be an approximate count of the
* documents that got deleted, if value is set to false, 0 is returned
*/
def removePartitionsEndedBefore(endedBefore: Long, returnApproxDeletedCount: Boolean = true): Int

/**
* Delete partitions with given partIds
*/
def removePartKeys(partIds: debox.Buffer[Int]): Unit

/**
* Memory used by index, esp for unflushed data
*/
def indexRamBytes: Long

/**
* Number of documents in flushed index, excludes tombstones for deletes
*/
def indexNumEntries: Long

/**
* Number of documents in flushed index, includes tombstones for deletes
*/
def indexNumEntriesWithTombstones: Long

/**
* Closes the index for read by other clients. Check for implementation if commit would be done
* automatically.
*/
def closeIndex(): Unit

/**
* Return user field/dimension names in index, except those that are created internally
*/
def indexNames(limit: Int): Seq[String]

/**
* Fetch values/terms for a specific column/key/field, in order from most frequent on down.
* Note that it iterates through all docs up to a certain limit only, so if there are too many terms
* it will not report an accurate top k in exchange for not running too long.
* @param fieldName the name of the column/field/key to get terms for
* @param topK the number of top k results to fetch
*/
def indexValues(fieldName: String, topK: Int = 100): Seq[TermInfo]

/**
* Use faceting to get field/index names given a column filter and time range
*/
def labelNamesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long): Seq[String]

/**
* Use faceting to get field/index values given a column filter and time range
*/
def labelValuesEfficient(colFilters: Seq[ColumnFilter], startTime: Long, endTime: Long,
colName: String, limit: Int = 100): Seq[String]

/**
* Add new part key to index
*/
def addPartKey(partKeyOnHeapBytes: Array[Byte],
partId: Int,
startTime: Long,
endTime: Long = Long.MaxValue,
partKeyBytesRefOffset: Int = 0)
(partKeyNumBytes: Int = partKeyOnHeapBytes.length,
documentId: String = partId.toString): Unit

/**
* Update or create part key to index
*/
def upsertPartKey(partKeyOnHeapBytes: Array[Byte],
partId: Int,
startTime: Long,
endTime: Long = Long.MaxValue,
partKeyBytesRefOffset: Int = 0)
(partKeyNumBytes: Int = partKeyOnHeapBytes.length,
documentId: String = partId.toString): Unit

/**
* Called when TSPartition needs to be created when on-demand-paging from a
* partId that does not exist on heap
*/
def partKeyFromPartId(partId: Int): Option[BytesRef]

/**
* Called when a document is updated with new endTime
*/
def startTimeFromPartId(partId: Int): Long

/**
* Called when a document is updated with new endTime
*/
def endTimeFromPartId(partId: Int): Long

/**
* Fetch start time for given set of partIds. Used to check if ODP is needed for
* queries.
*/
def startTimeFromPartIds(partIds: Iterator[Int]): debox.Map[Int, Long]

/**
* Commit index contents to disk
*/
def commit(): Long

/**
* Update existing part key document with new endTime.
*/
def updatePartKeyWithEndTime(partKeyOnHeapBytes: Array[Byte],
partId: Int,
endTime: Long = Long.MaxValue,
partKeyBytesRefOffset: Int = 0)
(partKeyNumBytes: Int = partKeyOnHeapBytes.length,
documentId: String = partId.toString): Unit
/**
* Refresh readers with updates to index. May be expensive - use carefully.
* @return
*/
def refreshReadersBlocking(): Unit

/**
* Fetch list of partIds for given column filters
*/
def partIdsFromFilters(columnFilters: Seq[ColumnFilter],
startTime: Long,
endTime: Long,
limit: Int = Int.MaxValue): debox.Buffer[Int]

/**
* Fetch list of part key records for given column filters
*/
def partKeyRecordsFromFilters(columnFilters: Seq[ColumnFilter],
startTime: Long,
endTime: Long,
limit: Int = Int.MaxValue): Seq[PartKeyLuceneIndexRecord]

/**
* Fetch partId given partKey. This is slower since it would do an index search
* instead of a key-lookup.
*/
def partIdFromPartKeySlow(partKeyBase: Any,
partKeyOffset: Long): Option[Int]

/**
* Fetch one partKey matching filters
*/
def singlePartKeyFromFilters(columnFilters: Seq[ColumnFilter],
startTime: Long,
endTime: Long): Option[Array[Byte]]

}

trait PartKeyIndexDownsampled extends PartKeyIndexRaw {

def getCurrentIndexState(): (IndexState.Value, Option[Long])

def notifyLifecycleListener(state: IndexState.Value, time: Long): Unit

/**
* Iterate through the LuceneIndex and calculate cardinality count
*/
def calculateCardinality(partSchema: PartitionSchema, cardTracker: CardinalityTracker): Unit

/**
* Run some code for each ingesting partKey which has endTime != Long.MaxValue
*/
def foreachPartKeyStillIngesting(func: (Int, BytesRef) => Unit): Int

/**
* Run some code for each partKey matchin column filter
*/
def foreachPartKeyMatchingFilter(columnFilters: Seq[ColumnFilter],
startTime: Long,
endTime: Long, func: (BytesRef) => Unit): Int

}
50 changes: 2 additions & 48 deletions core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class PartKeyLuceneIndex(ref: DatasetRef,
val lifecycleManager: Option[IndexMetadataStore] = None,
useMemoryMappedImpl: Boolean = true,
disableIndexCaching: Boolean = false
) extends StrictLogging {
) extends StrictLogging with PartKeyIndexDownsampled {

import PartKeyLuceneIndex._

Expand Down Expand Up @@ -445,10 +445,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
logger.info(s"Started flush thread for lucene index on dataset=$ref shard=$shardNum")
}

/**
* Find partitions that ended ingesting before a given timestamp. Used to identify partitions that can be purged.
* @return matching partIds
*/
def partIdsEndedBefore(endedBefore: Long): debox.Buffer[Int] = {
val collector = new PartIdCollector(Int.MaxValue)
val deleteQuery = LongPoint.newRangeQuery(PartKeyLuceneIndex.END_TIME, 0, endedBefore)
Expand All @@ -457,13 +453,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
collector.result
}

/**
* Method to delete documents from index that ended before the provided end time
*
* @param endedBefore the cutoff timestamp. All documents with time <= this time will be removed
* @param returnApproxDeletedCount a boolean flag that requests the return value to be an approximate count of the
* documents that got deleted, if value is set to false, 0 is returned
*/
def removePartitionsEndedBefore(endedBefore: Long, returnApproxDeletedCount: Boolean = true): Int = {
val deleteQuery = LongPoint.newRangeQuery(PartKeyLuceneIndex.END_TIME, 0, endedBefore)
// SInce delete does not return the deleted document count, we query to get the count that match the filter criteria
Expand Down Expand Up @@ -491,9 +480,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
}
}

/**
* Delete partitions with given partIds
*/
def removePartKeys(partIds: debox.Buffer[Int]): Unit = {
if (!partIds.isEmpty) {
val terms = new util.ArrayList[BytesRef]()
Expand All @@ -506,14 +492,8 @@ class PartKeyLuceneIndex(ref: DatasetRef,

def indexRamBytes: Long = indexWriter.ramBytesUsed()

/**
* Number of documents in flushed index, excludes tombstones for deletes
*/
def indexNumEntries: Long = indexWriter.getDocStats().numDocs

/**
* Number of documents in flushed index, includes tombstones for deletes
*/
def indexNumEntriesWithTombstones: Long = indexWriter.getDocStats().maxDoc

def closeIndex(): Unit = {
Expand Down Expand Up @@ -588,13 +568,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
labelValues
}

/**
* Fetch values/terms for a specific column/key/field, in order from most frequent on down.
* Note that it iterates through all docs up to a certain limit only, so if there are too many terms
* it will not report an accurate top k in exchange for not running too long.
* @param fieldName the name of the column/field/key to get terms for
* @param topK the number of top k results to fetch
*/
def indexValues(fieldName: String, topK: Int = 100): Seq[TermInfo] = {
// FIXME this API returns duplicate values because same value can be present in multiple lucene segments

Expand Down Expand Up @@ -737,28 +710,18 @@ class PartKeyLuceneIndex(ref: DatasetRef,
})
}

/**
* Called when TSPartition needs to be created when on-demand-paging from a
* partId that does not exist on heap
*/
def partKeyFromPartId(partId: Int): Option[BytesRef] = {
val collector = new SinglePartKeyCollector()
withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID_FIELD, partId.toString)), collector) )
Option(collector.singleResult)
}

/**
* Called when a document is updated with new endTime
*/
def startTimeFromPartId(partId: Int): Long = {
val collector = new NumericDocValueCollector(PartKeyLuceneIndex.START_TIME)
withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID_FIELD, partId.toString)), collector))
collector.singleResult
}

/**
* Called when a document is updated with new endTime
*/
def startTimeFromPartIds(partIds: Iterator[Int]): debox.Map[Int, Long] = {

val startExecute = System.nanoTime()
Expand All @@ -780,9 +743,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,

def commit(): Long = indexWriter.commit()

/**
* Called when a document is updated with new endTime
*/
def endTimeFromPartId(partId: Int): Long = {
val collector = new NumericDocValueCollector(PartKeyLuceneIndex.END_TIME)
withNewSearcher(s => s.search(new TermQuery(new Term(PART_ID_FIELD, partId.toString)), collector))
Expand Down Expand Up @@ -839,10 +799,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
indexWriter.updateDocument(new Term(PART_ID_FIELD, partId.toString), docToAdd)
}

/**
* Refresh readers with updates to index. May be expensive - use carefully.
* @return
*/
def refreshReadersBlocking(): Unit = {
searcherManager.maybeRefreshBlocking()
logger.info(s"Refreshed index searchers to make reads consistent for dataset=$ref shard=$shardNum")
Expand Down Expand Up @@ -919,6 +875,7 @@ class PartKeyLuceneIndex(ref: DatasetRef,
}
}
//scalastyle:on method.length

def partIdsFromFilters(columnFilters: Seq[ColumnFilter],
startTime: Long,
endTime: Long,
Expand Down Expand Up @@ -1017,9 +974,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
chosenPartId
}

/**
* Iterate through the LuceneIndex and calculate cardinality count
*/
def calculateCardinality(partSchema: PartitionSchema, cardTracker: CardinalityTracker): Unit = {
val coll = new CardinalityCountBuilder(partSchema, cardTracker)
withNewSearcher(s => s.search(new MatchAllDocsQuery(), coll))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class TimeSeriesShard(val ref: DatasetRef,
* Used to answer queries not involving the full partition key.
* Maintained using a high-performance bitmap index.
*/
private[memstore] final val partKeyIndex = new PartKeyLuceneIndex(ref, schemas.part,
private[memstore] final val partKeyIndex: PartKeyIndexRaw = new PartKeyLuceneIndex(ref, schemas.part,
indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum,
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching)

Expand Down

0 comments on commit 3e40873

Please sign in to comment.