Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

**WIP** feat(cass): add shardKey to partKey table #1361

Draft
wants to merge 9 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
import filodb.core.ErrorResponse
import filodb.core.metadata.Schemas
import filodb.core.store._
import filodb.memory.BinaryRegionLarge

Expand Down Expand Up @@ -84,6 +86,9 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
val partKeyTablesInit = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreatePartitionKeysTable(dataset, s)
}.mapEval(t => Task.fromFuture(t.initialize())).toListL
val shardKeyToPartKeyTableInit = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreateShardKeyToPartKeyTable(dataset, s)
}.mapEval(t => Task.fromFuture(t.initialize())).toListL
Comment on lines 86 to +91
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every PartKeyTable init/insert/delete has a counterpart ShardKeyToPartKeyTable call.

clusterConnector.createKeyspace(chunkTable.keyspace)
val indexTable = getOrCreateIngestionTimeIndexTable(dataset)
// Important: make sure nodes are in agreement before any schema changes
Expand All @@ -92,10 +97,15 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
ixResp <- indexTable.initialize() if ixResp == Success
pkutResp <- partitionKeysByUpdateTimeTable.initialize() if pkutResp == Success
partKeyTablesResp <- partKeyTablesInit.runToFuture if partKeyTablesResp.forall(_ == Success)
shardKeyToPartkeyTableResp <-
shardKeyToPartKeyTableInit.runToFuture if shardKeyToPartkeyTableResp.forall(_ == Success)
} yield Success
} else {
// ensure the table handles are eagerly created
0.until(numShards).foreach(getOrCreatePartitionKeysTable(dataset, _))
0.until(numShards).foreach{
getOrCreatePartitionKeysTable(dataset, _)
getOrCreateShardKeyToPartKeyTable(dataset, _)
}
Future.successful(Success)
}
}
Expand All @@ -107,12 +117,17 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
val partKeyTablesTrunc = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreatePartitionKeysTable(dataset, s)
}.mapEval(t => Task.fromFuture(t.clearAll())).toListL
val shardKeyToPartKeyTableTrunc = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreateShardKeyToPartKeyTable(dataset, s)
}.mapEval(t => Task.fromFuture(t.clearAll())).toListL
val indexTable = getOrCreateIngestionTimeIndexTable(dataset)
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.clearAll() if ctResp == Success
ixResp <- indexTable.clearAll() if ixResp == Success
pkutResp <- partitionKeysByUpdateTimeTable.clearAll() if pkutResp == Success
partKeyTablesResp <- partKeyTablesTrunc.runToFuture if partKeyTablesResp.forall( _ == Success)
for {ctResp <- chunkTable.clearAll() if ctResp == Success
ixResp <- indexTable.clearAll() if ixResp == Success
pkutResp <- partitionKeysByUpdateTimeTable.clearAll() if pkutResp == Success
partKeyTablesResp <- partKeyTablesTrunc.runToFuture if partKeyTablesResp.forall( _ == Success)
shardKeyToPartKeyTableTruncResp <-
shardKeyToPartKeyTableTrunc.runToFuture if shardKeyToPartKeyTableTruncResp.forall( _ == Success)
} yield Success
}

Expand All @@ -123,15 +138,21 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
val partKeyTablesDrop = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreatePartitionKeysTable(dataset, s)
}.mapEval(t => Task.fromFuture(t.drop())).toListL
val shardKeyToPartKeyTablesDrop = Observable.fromIterable(0.until(numShards)).map { s =>
getOrCreateShardKeyToPartKeyTable(dataset, s)
}.mapEval(t => Task.fromFuture(t.drop())).toListL
clusterMeta.checkSchemaAgreement()
for {ctResp <- chunkTable.drop() if ctResp == Success
ixResp <- indexTable.drop() if ixResp == Success
pkutResp <- partitionKeysByUpdateTimeTable.drop() if pkutResp == Success
partKeyTablesResp <- partKeyTablesDrop.runToFuture if partKeyTablesResp.forall(_ == Success)
shardKeyToPartKeyTablesResp <-
shardKeyToPartKeyTablesDrop.runToFuture if shardKeyToPartKeyTablesResp.forall(_ == Success)
} yield {
chunkTableCache.remove(dataset)
indexTableCache.remove(dataset)
partitionKeysTableCache.remove(dataset)
shardKeyToPartKeyTableCache.remove(dataset)
Success
}
}
Expand Down Expand Up @@ -232,14 +253,11 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
*
* @param diskTimeToLiveSeconds ttl
*/
def copyPartitionKeysByTimeRange(datasetRef: DatasetRef,
numOfShards: Int,
splits: Iterator[ScanSplit],
repairStartTime: Long,
repairEndTime: Long,
def copyPartitionKeysByTimeRange(datasetRef: DatasetRef, numOfShards: Int, splits: Iterator[ScanSplit],
repairStartTime: Long, repairEndTime: Long,
target: CassandraColumnStore,
partKeyHashFn: PartKeyRecord => Option[Int],
diskTimeToLiveSeconds: Int): Unit = {
diskTimeToLiveSeconds: Int, schemas: Schemas): Unit = {
def pkRecordWithHash(pkRecord: PartKeyRecord) = {
PartKeyRecord(pkRecord.partKey, pkRecord.startTime, pkRecord.endTime, partKeyHashFn(pkRecord))
}
Expand All @@ -261,8 +279,8 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
)
val updateHour = System.currentTimeMillis() / 1000 / 60 / 60
Await.result(
target.writePartKeys(datasetRef,
shard, Observable.fromIterable(partKeys), diskTimeToLiveSeconds, updateHour, !downsampledData),
target.writePartKeys(datasetRef, shard, Observable.fromIterable(partKeys),
diskTimeToLiveSeconds, updateHour, schemas, !downsampledData),
5.minutes
)
}
Expand Down Expand Up @@ -430,12 +448,20 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
}
}

def writePartKeys(ref: DatasetRef,
shard: Int,
partKeys: Observable[PartKeyRecord],
diskTTLSeconds: Long, updateHour: Long,
def scanPartKeysByShardKey(ref: DatasetRef, shard: Int, shardKey: Array[Byte]): Observable[Array[Byte]] = {
val table = getOrCreateShardKeyToPartKeyTable(ref, shard)
table.scanPartKeys(shardKey)
// TODO(a_theimer): figure out if token ranges apply here
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These will only apply if we add extra values to the PK to prevent hot-spotting, right?

// Observable.fromIterable(getScanSplits(ref)).flatMap { tokenRange =>
// table.scanPartKeys(tokenRange.asInstanceOf[CassandraTokenRangeSplit].tokens, indexScanParallelismPerShard)
// }
}

def writePartKeys(ref: DatasetRef, shard: Int, partKeys: Observable[PartKeyRecord],
diskTTLSeconds: Long, updateHour: Long, schemas: Schemas,
writeToPkUTTable: Boolean = true): Future[Response] = {
val pkTable = getOrCreatePartitionKeysTable(ref, shard)
val skToPkTable = getOrCreateShardKeyToPartKeyTable(ref, shard)
val pkByUTTable = getOrCreatePartitionKeysByUpdateTimeTable(ref)
val start = System.currentTimeMillis()
val ret = partKeys.mapParallelUnordered(writeParallelism) { pk =>
Expand All @@ -449,11 +475,21 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
case resp =>
Future.successful(resp)
}
Task.fromFuture(writePkFut).map{ resp =>
val writeSkToPkFut = {
val shardKey = schemas.shardKeyFromPartKey(pk.partKey)
if (shardKey.nonEmpty)
skToPkTable.addMapping(shardKey, pk.partKey, diskTTLSeconds)
else
Future(Success)
}
Task.fromFuture(Future.sequence(Seq(writePkFut, writeSkToPkFut))).map{ respSeq =>
sinkStats.partKeysWrite(1)
resp
// TODO(a_theimer): why won't flatten work?
respSeq.find(_.isInstanceOf[ErrorResponse])
}
}.findL(_.isInstanceOf[ErrorResponse]).map(_.getOrElse(Success)).runToFuture
}.findL(respOpt => respOpt.isDefined).map{errorOpt =>
errorOpt.getOrElse(Some(Success)).get
}.runToFuture
ret.onComplete { _ =>
writePksLatency.record(System.currentTimeMillis() - start)
}
Expand All @@ -473,9 +509,27 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {

def deletePartKeyNoAsync(ref: DatasetRef,
shard: Int,
pk: Array[Byte]): Response = {
pk: Array[Byte],
schemas: Schemas): Response = {
val pkTable = getOrCreatePartitionKeysTable(ref, shard)
pkTable.deletePartKeyNoAsync(pk)
val skToPkTable = getOrCreateShardKeyToPartKeyTable(ref, shard)
val shardKey = schemas.shardKeyFromPartKey(pk)

def deletePartKey = {
Some(pkTable.deletePartKeyNoAsync(pk))
}
def deleteShardKeyMapping = {
if (shardKey.nonEmpty) {
Some(skToPkTable.deleteMappingNoAsync(shardKey, pk))
} else None
}

val deleteResps = Seq(deletePartKey, deleteShardKeyMapping)
.filter(_.isDefined)
.map(_.get)

// return the first ErrorResponse; otherwise just the first Response
deleteResps.find(_.isInstanceOf[ErrorResponse]).getOrElse(deleteResps.head)
}

def getPartKeysByUpdateHour(ref: DatasetRef,
Expand Down Expand Up @@ -519,6 +573,8 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {
val partKeysByUTTableCache = concurrentCache[DatasetRef, PartitionKeysByUpdateTimeTable](tableCacheSize)
val partitionKeysTableCache = concurrentCache[DatasetRef,
ConcurrentLinkedHashMap[Int, PartitionKeysTable]](tableCacheSize)
val shardKeyToPartKeyTableCache = concurrentCache[DatasetRef,
ConcurrentLinkedHashMap[Int, ShardKeyToPartKeyTable]](tableCacheSize)
Comment on lines +576 to +577
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a table per shard (like PartitionKeysTable)?


protected val clusterConnector = new FiloCassandraConnector {
def config: Config = cassandraConfig
Expand Down Expand Up @@ -597,6 +653,16 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging {
})
}

def getOrCreateShardKeyToPartKeyTable(dataset: DatasetRef, shard: Int): ShardKeyToPartKeyTable = {
// TODO(a_theimer): should this be on a per-shard basis?
val map = shardKeyToPartKeyTableCache.getOrElseUpdate(dataset, { _ =>
concurrentCache[Int, ShardKeyToPartKeyTable](tableCacheSize)
})
map.getOrElseUpdate(shard, { shard: Int =>
new ShardKeyToPartKeyTable(dataset, shard, clusterConnector, ingestionConsistencyLevel)(readEc)
})
}

def reset(): Unit = {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package filodb.cassandra.columnstore

import scala.concurrent.{ExecutionContext, Future}

import com.datastax.driver.core.{ConsistencyLevel}
import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core.{DatasetRef, Response}

sealed class ShardKeyToPartKeyTable(val dataset: DatasetRef,
val shard: Int,
val connector: FiloCassandraConnector,
writeConsistencyLevel: ConsistencyLevel)
(implicit ec: ExecutionContext) extends BaseDatasetTable {

import filodb.cassandra.Util._

val suffix = s"shardKeyToPartKey_$shard"

// TODO(a_theimer): compression settings okay?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left the same as PartitionKeyTable

// TODO(a_theimer): probably need to prevent hot-spotting on a single node
// (i.e. add more fields to [Cassandra's] partition key)
Comment on lines +22 to +23
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't do this until benchmark tests indicate we need it (or one of you already knows we'll need it).

val createCql =
s"""CREATE TABLE IF NOT EXISTS $tableString (
| shardKey blob,
| partKey blob,
| PRIMARY KEY (shardKey, partKey)
|) WITH compression = {'chunk_length_in_kb': '16', 'sstable_compression': '$sstableCompression'}""".stripMargin

private lazy val writePartitionCql = session.prepare(
s"INSERT INTO ${tableString} (shardKey, partKey) " +
s"VALUES (?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

private lazy val deleteCql = session.prepare(
s"DELETE FROM $tableString " +
s"WHERE shardKey = ? AND partKey = ?"
)

private lazy val scanCql = session.prepare(
s"SELECT * FROM $tableString " +
s"WHERE shardKey = ?")
.setConsistencyLevel(ConsistencyLevel.ONE)

def addMapping(shardKey: Array[Byte], partKey: Array[Byte], diskTtlSeconds: Long): Future[Response] = {
connector.execStmt(writePartitionCql.bind(toBuffer(shardKey), toBuffer(partKey),
diskTtlSeconds.toInt: java.lang.Integer))
}

def deleteMappingNoAsync(shardKey: Array[Byte], partKey: Array[Byte]): Response = {
val stmt = deleteCql.bind(toBuffer(shardKey), toBuffer(partKey))
.setConsistencyLevel(writeConsistencyLevel)
connector.execCqlNoAsync(stmt)
}

def scanPartKeys(shardKey: Array[Byte]): Observable[Array[Byte]] = {
val fut = session.executeAsync(scanCql.bind(toBuffer(shardKey))).toIterator.handleErrors
Observable.fromFuture(fut)
.flatMap(it => Observable.fromIterable(it.toSeq))
.map(row => row.getBytes("partKey").array())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.store.{ChunkSet, ChunkSetInfo, ColumnStoreSpec, PartKeyRecord}
import filodb.memory.{BinaryRegionLarge, NativeMemoryManager}
import filodb.memory.format.{TupleRowReader, UnsafeUtils}
import filodb.memory.format.{SeqRowReader, TupleRowReader, UnsafeUtils}
import filodb.memory.format.ZeroCopyUTF8String._


import java.nio.charset.StandardCharsets

class CassandraColumnStoreSpec extends ColumnStoreSpec {
Expand Down Expand Up @@ -70,7 +71,7 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
.zipWithIndex.map { case (pk, i) => PartKeyRecord(pk, 5, 10, Some(i))}.toSet

val updateHour = 10
colStore.writePartKeys(dataset, 0, Observable.fromIterable(pks), 1.hour.toSeconds.toInt, 10, true )
colStore.writePartKeys(dataset, 0, Observable.fromIterable(pks), 1.hour.toSeconds.toInt, 10, schemas, true)
.futureValue shouldEqual Success

val expectedKeys = pks.map(pk => new String(pk.partKey, StandardCharsets.UTF_8).toInt)
Expand All @@ -81,12 +82,79 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
val readData2 = colStore.scanPartKeys(dataset, 0).toListL.runToFuture.futureValue.toSet
readData2.map(pk => new String(pk.partKey, StandardCharsets.UTF_8).toInt) shouldEqual expectedKeys

readData2.map(_.partKey).foreach(pk => colStore.deletePartKeyNoAsync(dataset, 0, pk))
readData2.map(_.partKey).foreach(pk => colStore.deletePartKeyNoAsync(dataset, 0, pk, schemas))

val readData3 = colStore.scanPartKeys(dataset, 0).toListL.runToFuture.futureValue
readData3.isEmpty shouldEqual true
}

"ShardKeyToPartKeyTable Reads, Writes, and Deletes" should "work" in {
import MetricsTestData._

val dataset = timeseriesDatasetMultipleShardKeys
val schemas = Schemas(dataset.schema.partition, Map("timeseries" -> dataset.schema))

// ws, ns, metric triplets
val shardKeys = Seq(
Seq("ws1", "ns1", "metric1"),
Seq("ws1", "ns1", "metric1"), // same as above
Seq("ws1", "ns1", "metric2"), // same as above, different metric
Seq("ws1", "ns2", "metric1"),
Seq("ws2", "ns1", "metric1"),
Seq("ws2", "ns2", "metric1"),
)

val rows = shardKeys.zipWithIndex.map{ case (key, i) =>
val keyMap = Seq("_ws_", "_ns_", "_metric_")
.zip(key)
.map{ case (k, v) => (k.utf8, v.utf8)}
.toMap
SeqRowReader(Seq(i.toLong, i.toDouble, keyMap))
}

val pks = partKeyFromRecords(dataset, records(dataset, rows)).zipWithIndex.map { case (pk, i) =>
val bytes = dataset.schema.partKeySchema.asByteArray(pk)
PartKeyRecord(bytes, 0, 0, Some(i))
}

// map shardKeys to their corresponding sets of partKeys
val shardKeyToPartKeySet = pks.foldLeft(Map[Seq[Byte], Set[Seq[Byte]]]()){ case (map, pk) =>
val shardKey = schemas.shardKeyFromPartKey(pk.partKey).toSeq
val partKey = pk.partKey.toSeq
val seenKeys: Set[Seq[Byte]] = map.get(shardKey).getOrElse(Set())
if (!seenKeys.contains(partKey)) {
val mapWithNewKey = Map(shardKey -> seenKeys.union(Set(partKey)))
map.filterKeys(k => k != shardKey) ++ mapWithNewKey
} else {
map
}
}

colStore.initialize(dataset.ref, 1).futureValue
colStore.truncate(dataset.ref, 1).futureValue

colStore.writePartKeys(dataset.ref, 0, Observable.fromIterable(pks), 1.hour.toSeconds.toInt, 10, schemas, true)
.futureValue shouldEqual Success

// scan for each shard key and compare against expected result set
shardKeyToPartKeySet.map{ case (shardKey, partKeySet) =>
val scannedPks = colStore.scanPartKeysByShardKey(dataset.ref, 0, shardKey.toArray).toListL.runToFuture.futureValue
scannedPks.size shouldEqual partKeySet.size
scannedPks.map(_.toSeq).toSet shouldEqual partKeySet
}

// delete all partkeys
shardKeyToPartKeySet.values.flatten.foreach{pk =>
colStore.deletePartKeyNoAsync(dataset.ref, 0, pk.toArray, schemas)
}

// make sure shardKey scans all return empty result
shardKeyToPartKeySet.keys.foreach{ shardKey =>
val scannedPks = colStore.scanPartKeysByShardKey(dataset.ref, 0, shardKey.toArray).toListL.runToFuture.futureValue
scannedPks.isEmpty shouldEqual true
}
}

"copyOrDeleteChunksByIngestionTimeRange" should "actually work" in {
val dataset = Dataset("source", Schemas.gauge)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class OdpSpec extends AnyFunSpec with Matchers with BeforeAndAfterAll with Scala

colStore.write(dataset.ref, Observable.fromIteratorUnsafe(chunks)).futureValue
val pk = PartKeyRecord(gaugePartKeyBytes, firstSampleTime, firstSampleTime + numSamples, Some(150))
colStore.writePartKeys(dataset.ref, 0, Observable.now(pk), 259200, 34).futureValue
colStore.writePartKeys(dataset.ref, 0, Observable.now(pk), 259200, 34, schemas).futureValue
}

it ("should be able to do full ODP for non concurrent queries") {
Expand Down