-
Notifications
You must be signed in to change notification settings - Fork 224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
**WIP** feat(cass): add shardKey to partKey table #1361
base: develop
Are you sure you want to change the base?
Changes from all commits
f947fd0
7406f34
f6ba144
4d8bc58
30d286c
0c18792
bdd9dfa
7f4393f
6604f5d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ import monix.reactive.Observable | |
|
||
import filodb.cassandra.FiloCassandraConnector | ||
import filodb.core._ | ||
import filodb.core.ErrorResponse | ||
import filodb.core.metadata.Schemas | ||
import filodb.core.store._ | ||
import filodb.memory.BinaryRegionLarge | ||
|
||
|
@@ -84,6 +86,9 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
val partKeyTablesInit = Observable.fromIterable(0.until(numShards)).map { s => | ||
getOrCreatePartitionKeysTable(dataset, s) | ||
}.mapEval(t => Task.fromFuture(t.initialize())).toListL | ||
val shardKeyToPartKeyTableInit = Observable.fromIterable(0.until(numShards)).map { s => | ||
getOrCreateShardKeyToPartKeyTable(dataset, s) | ||
}.mapEval(t => Task.fromFuture(t.initialize())).toListL | ||
clusterConnector.createKeyspace(chunkTable.keyspace) | ||
val indexTable = getOrCreateIngestionTimeIndexTable(dataset) | ||
// Important: make sure nodes are in agreement before any schema changes | ||
|
@@ -92,10 +97,15 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
ixResp <- indexTable.initialize() if ixResp == Success | ||
pkutResp <- partitionKeysByUpdateTimeTable.initialize() if pkutResp == Success | ||
partKeyTablesResp <- partKeyTablesInit.runToFuture if partKeyTablesResp.forall(_ == Success) | ||
shardKeyToPartkeyTableResp <- | ||
shardKeyToPartKeyTableInit.runToFuture if shardKeyToPartkeyTableResp.forall(_ == Success) | ||
} yield Success | ||
} else { | ||
// ensure the table handles are eagerly created | ||
0.until(numShards).foreach(getOrCreatePartitionKeysTable(dataset, _)) | ||
0.until(numShards).foreach{ | ||
getOrCreatePartitionKeysTable(dataset, _) | ||
getOrCreateShardKeyToPartKeyTable(dataset, _) | ||
} | ||
Future.successful(Success) | ||
} | ||
} | ||
|
@@ -107,12 +117,17 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
val partKeyTablesTrunc = Observable.fromIterable(0.until(numShards)).map { s => | ||
getOrCreatePartitionKeysTable(dataset, s) | ||
}.mapEval(t => Task.fromFuture(t.clearAll())).toListL | ||
val shardKeyToPartKeyTableTrunc = Observable.fromIterable(0.until(numShards)).map { s => | ||
getOrCreateShardKeyToPartKeyTable(dataset, s) | ||
}.mapEval(t => Task.fromFuture(t.clearAll())).toListL | ||
val indexTable = getOrCreateIngestionTimeIndexTable(dataset) | ||
clusterMeta.checkSchemaAgreement() | ||
for { ctResp <- chunkTable.clearAll() if ctResp == Success | ||
ixResp <- indexTable.clearAll() if ixResp == Success | ||
pkutResp <- partitionKeysByUpdateTimeTable.clearAll() if pkutResp == Success | ||
partKeyTablesResp <- partKeyTablesTrunc.runToFuture if partKeyTablesResp.forall( _ == Success) | ||
for {ctResp <- chunkTable.clearAll() if ctResp == Success | ||
ixResp <- indexTable.clearAll() if ixResp == Success | ||
pkutResp <- partitionKeysByUpdateTimeTable.clearAll() if pkutResp == Success | ||
partKeyTablesResp <- partKeyTablesTrunc.runToFuture if partKeyTablesResp.forall( _ == Success) | ||
shardKeyToPartKeyTableTruncResp <- | ||
shardKeyToPartKeyTableTrunc.runToFuture if shardKeyToPartKeyTableTruncResp.forall( _ == Success) | ||
} yield Success | ||
} | ||
|
||
|
@@ -123,15 +138,21 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
val partKeyTablesDrop = Observable.fromIterable(0.until(numShards)).map { s => | ||
getOrCreatePartitionKeysTable(dataset, s) | ||
}.mapEval(t => Task.fromFuture(t.drop())).toListL | ||
val shardKeyToPartKeyTablesDrop = Observable.fromIterable(0.until(numShards)).map { s => | ||
getOrCreateShardKeyToPartKeyTable(dataset, s) | ||
}.mapEval(t => Task.fromFuture(t.drop())).toListL | ||
clusterMeta.checkSchemaAgreement() | ||
for {ctResp <- chunkTable.drop() if ctResp == Success | ||
ixResp <- indexTable.drop() if ixResp == Success | ||
pkutResp <- partitionKeysByUpdateTimeTable.drop() if pkutResp == Success | ||
partKeyTablesResp <- partKeyTablesDrop.runToFuture if partKeyTablesResp.forall(_ == Success) | ||
shardKeyToPartKeyTablesResp <- | ||
shardKeyToPartKeyTablesDrop.runToFuture if shardKeyToPartKeyTablesResp.forall(_ == Success) | ||
} yield { | ||
chunkTableCache.remove(dataset) | ||
indexTableCache.remove(dataset) | ||
partitionKeysTableCache.remove(dataset) | ||
shardKeyToPartKeyTableCache.remove(dataset) | ||
Success | ||
} | ||
} | ||
|
@@ -232,14 +253,11 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
* | ||
* @param diskTimeToLiveSeconds ttl | ||
*/ | ||
def copyPartitionKeysByTimeRange(datasetRef: DatasetRef, | ||
numOfShards: Int, | ||
splits: Iterator[ScanSplit], | ||
repairStartTime: Long, | ||
repairEndTime: Long, | ||
def copyPartitionKeysByTimeRange(datasetRef: DatasetRef, numOfShards: Int, splits: Iterator[ScanSplit], | ||
repairStartTime: Long, repairEndTime: Long, | ||
target: CassandraColumnStore, | ||
partKeyHashFn: PartKeyRecord => Option[Int], | ||
diskTimeToLiveSeconds: Int): Unit = { | ||
diskTimeToLiveSeconds: Int, schemas: Schemas): Unit = { | ||
def pkRecordWithHash(pkRecord: PartKeyRecord) = { | ||
PartKeyRecord(pkRecord.partKey, pkRecord.startTime, pkRecord.endTime, partKeyHashFn(pkRecord)) | ||
} | ||
|
@@ -261,8 +279,8 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
) | ||
val updateHour = System.currentTimeMillis() / 1000 / 60 / 60 | ||
Await.result( | ||
target.writePartKeys(datasetRef, | ||
shard, Observable.fromIterable(partKeys), diskTimeToLiveSeconds, updateHour, !downsampledData), | ||
target.writePartKeys(datasetRef, shard, Observable.fromIterable(partKeys), | ||
diskTimeToLiveSeconds, updateHour, schemas, !downsampledData), | ||
5.minutes | ||
) | ||
} | ||
|
@@ -430,12 +448,20 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
} | ||
} | ||
|
||
def writePartKeys(ref: DatasetRef, | ||
shard: Int, | ||
partKeys: Observable[PartKeyRecord], | ||
diskTTLSeconds: Long, updateHour: Long, | ||
def scanPartKeysByShardKey(ref: DatasetRef, shard: Int, shardKey: Array[Byte]): Observable[Array[Byte]] = { | ||
val table = getOrCreateShardKeyToPartKeyTable(ref, shard) | ||
table.scanPartKeys(shardKey) | ||
// TODO(a_theimer): figure out if token ranges apply here | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These will only apply if we add extra values to the PK to prevent hot-spotting, right? |
||
// Observable.fromIterable(getScanSplits(ref)).flatMap { tokenRange => | ||
// table.scanPartKeys(tokenRange.asInstanceOf[CassandraTokenRangeSplit].tokens, indexScanParallelismPerShard) | ||
// } | ||
} | ||
|
||
def writePartKeys(ref: DatasetRef, shard: Int, partKeys: Observable[PartKeyRecord], | ||
diskTTLSeconds: Long, updateHour: Long, schemas: Schemas, | ||
writeToPkUTTable: Boolean = true): Future[Response] = { | ||
val pkTable = getOrCreatePartitionKeysTable(ref, shard) | ||
val skToPkTable = getOrCreateShardKeyToPartKeyTable(ref, shard) | ||
val pkByUTTable = getOrCreatePartitionKeysByUpdateTimeTable(ref) | ||
val start = System.currentTimeMillis() | ||
val ret = partKeys.mapParallelUnordered(writeParallelism) { pk => | ||
|
@@ -449,11 +475,21 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
case resp => | ||
Future.successful(resp) | ||
} | ||
Task.fromFuture(writePkFut).map{ resp => | ||
val writeSkToPkFut = { | ||
val shardKey = schemas.shardKeyFromPartKey(pk.partKey) | ||
if (shardKey.nonEmpty) | ||
skToPkTable.addMapping(shardKey, pk.partKey, diskTTLSeconds) | ||
else | ||
Future(Success) | ||
} | ||
Task.fromFuture(Future.sequence(Seq(writePkFut, writeSkToPkFut))).map{ respSeq => | ||
sinkStats.partKeysWrite(1) | ||
resp | ||
// TODO(a_theimer): why won't flatten work? | ||
respSeq.find(_.isInstanceOf[ErrorResponse]) | ||
} | ||
}.findL(_.isInstanceOf[ErrorResponse]).map(_.getOrElse(Success)).runToFuture | ||
}.findL(respOpt => respOpt.isDefined).map{errorOpt => | ||
errorOpt.getOrElse(Some(Success)).get | ||
}.runToFuture | ||
ret.onComplete { _ => | ||
writePksLatency.record(System.currentTimeMillis() - start) | ||
} | ||
|
@@ -473,9 +509,27 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { | |
|
||
def deletePartKeyNoAsync(ref: DatasetRef, | ||
shard: Int, | ||
pk: Array[Byte]): Response = { | ||
pk: Array[Byte], | ||
schemas: Schemas): Response = { | ||
val pkTable = getOrCreatePartitionKeysTable(ref, shard) | ||
pkTable.deletePartKeyNoAsync(pk) | ||
val skToPkTable = getOrCreateShardKeyToPartKeyTable(ref, shard) | ||
val shardKey = schemas.shardKeyFromPartKey(pk) | ||
|
||
def deletePartKey = { | ||
Some(pkTable.deletePartKeyNoAsync(pk)) | ||
} | ||
def deleteShardKeyMapping = { | ||
if (shardKey.nonEmpty) { | ||
Some(skToPkTable.deleteMappingNoAsync(shardKey, pk)) | ||
} else None | ||
} | ||
|
||
val deleteResps = Seq(deletePartKey, deleteShardKeyMapping) | ||
.filter(_.isDefined) | ||
.map(_.get) | ||
|
||
// return the first ErrorResponse; otherwise just the first Response | ||
deleteResps.find(_.isInstanceOf[ErrorResponse]).getOrElse(deleteResps.head) | ||
} | ||
|
||
def getPartKeysByUpdateHour(ref: DatasetRef, | ||
|
@@ -519,6 +573,8 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging { | |
val partKeysByUTTableCache = concurrentCache[DatasetRef, PartitionKeysByUpdateTimeTable](tableCacheSize) | ||
val partitionKeysTableCache = concurrentCache[DatasetRef, | ||
ConcurrentLinkedHashMap[Int, PartitionKeysTable]](tableCacheSize) | ||
val shardKeyToPartKeyTableCache = concurrentCache[DatasetRef, | ||
ConcurrentLinkedHashMap[Int, ShardKeyToPartKeyTable]](tableCacheSize) | ||
Comment on lines
+576
to
+577
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a table per shard (like |
||
|
||
protected val clusterConnector = new FiloCassandraConnector { | ||
def config: Config = cassandraConfig | ||
|
@@ -597,6 +653,16 @@ trait CassandraChunkSource extends RawChunkSource with StrictLogging { | |
}) | ||
} | ||
|
||
def getOrCreateShardKeyToPartKeyTable(dataset: DatasetRef, shard: Int): ShardKeyToPartKeyTable = { | ||
// TODO(a_theimer): should this be on a per-shard basis? | ||
val map = shardKeyToPartKeyTableCache.getOrElseUpdate(dataset, { _ => | ||
concurrentCache[Int, ShardKeyToPartKeyTable](tableCacheSize) | ||
}) | ||
map.getOrElseUpdate(shard, { shard: Int => | ||
new ShardKeyToPartKeyTable(dataset, shard, clusterConnector, ingestionConsistencyLevel)(readEc) | ||
}) | ||
} | ||
|
||
def reset(): Unit = {} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package filodb.cassandra.columnstore | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
|
||
import com.datastax.driver.core.{ConsistencyLevel} | ||
import monix.reactive.Observable | ||
|
||
import filodb.cassandra.FiloCassandraConnector | ||
import filodb.core.{DatasetRef, Response} | ||
|
||
sealed class ShardKeyToPartKeyTable(val dataset: DatasetRef, | ||
val shard: Int, | ||
val connector: FiloCassandraConnector, | ||
writeConsistencyLevel: ConsistencyLevel) | ||
(implicit ec: ExecutionContext) extends BaseDatasetTable { | ||
|
||
import filodb.cassandra.Util._ | ||
|
||
val suffix = s"shardKeyToPartKey_$shard" | ||
|
||
// TODO(a_theimer): compression settings okay? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left the same as |
||
// TODO(a_theimer): probably need to prevent hot-spotting on a single node | ||
// (i.e. add more fields to [Cassandra's] partition key) | ||
Comment on lines
+22
to
+23
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I won't do this until benchmark tests indicate we need it (or one of you already knows we'll need it). |
||
val createCql = | ||
s"""CREATE TABLE IF NOT EXISTS $tableString ( | ||
| shardKey blob, | ||
| partKey blob, | ||
| PRIMARY KEY (shardKey, partKey) | ||
|) WITH compression = {'chunk_length_in_kb': '16', 'sstable_compression': '$sstableCompression'}""".stripMargin | ||
|
||
private lazy val writePartitionCql = session.prepare( | ||
s"INSERT INTO ${tableString} (shardKey, partKey) " + | ||
s"VALUES (?, ?) USING TTL ?") | ||
.setConsistencyLevel(writeConsistencyLevel) | ||
|
||
private lazy val deleteCql = session.prepare( | ||
s"DELETE FROM $tableString " + | ||
s"WHERE shardKey = ? AND partKey = ?" | ||
) | ||
|
||
private lazy val scanCql = session.prepare( | ||
s"SELECT * FROM $tableString " + | ||
s"WHERE shardKey = ?") | ||
.setConsistencyLevel(ConsistencyLevel.ONE) | ||
|
||
def addMapping(shardKey: Array[Byte], partKey: Array[Byte], diskTtlSeconds: Long): Future[Response] = { | ||
connector.execStmt(writePartitionCql.bind(toBuffer(shardKey), toBuffer(partKey), | ||
diskTtlSeconds.toInt: java.lang.Integer)) | ||
} | ||
|
||
def deleteMappingNoAsync(shardKey: Array[Byte], partKey: Array[Byte]): Response = { | ||
val stmt = deleteCql.bind(toBuffer(shardKey), toBuffer(partKey)) | ||
.setConsistencyLevel(writeConsistencyLevel) | ||
connector.execCqlNoAsync(stmt) | ||
} | ||
|
||
def scanPartKeys(shardKey: Array[Byte]): Observable[Array[Byte]] = { | ||
val fut = session.executeAsync(scanCql.bind(toBuffer(shardKey))).toIterator.handleErrors | ||
Observable.fromFuture(fut) | ||
.flatMap(it => Observable.fromIterable(it.toSeq)) | ||
.map(row => row.getBytes("partKey").array()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every
PartKeyTable
init/insert/delete has a counterpartShardKeyToPartKeyTable
call.