Skip to content

Commit

Permalink
TEMP-- adapt existing test/code for no-shard-key case
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Apr 8, 2022
1 parent 4be5cf9 commit 5aa7797
Showing 1 changed file with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package filodb.cassandra.columnstore
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

import com.datastax.driver.core.{ConsistencyLevel, Metadata, Session, TokenRange}
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
import com.typesafe.config.Config
Expand All @@ -17,14 +15,14 @@ import kamon.metric.MeasurementUnit
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
import filodb.core.binaryrecord2.RecordSchema
import filodb.core.metadata.Schemas
import filodb.core.store._
import filodb.memory.BinaryRegionLarge
import filodb.memory.format.UnsafeUtils
import filodb.core.ErrorResponse

/**
* Implementation of a column store using Apache Cassandra tables.
Expand Down Expand Up @@ -460,7 +458,15 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {

// TODO(a_theimer): should be private, but used in test-- generalize and move to util class?
def shardKeyFromPartKey(partKey: Array[Byte], schemas: Schemas): Array[Byte] = {
val nonMetricShardKeyCols = schemas(RecordSchema.schemaID(partKey)).options.nonMetricShardColumns
val schemaId = RecordSchema.schemaID(partKey)
if (schemaId == -1) {
return Array.emptyByteArray
}
val schema = schemas(schemaId)
if (schema == null) {
return Array.emptyByteArray
}
val nonMetricShardKeyCols = schema.options.nonMetricShardColumns
schemas.part.binSchema.colValues(partKey, UnsafeUtils.arayOffset, nonMetricShardKeyCols)
.flatMap(_.getBytes).toArray // TODO(a_theimer): this is wildly inefficient
}
Expand All @@ -485,7 +491,11 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
}
val writeSkToPkFut = {
val shardKey = shardKeyFromPartKey(pk.partKey, schemas)
skToPkTable.addMapping(shardKey, pk.partKey, diskTTLSeconds)
if (shardKey.nonEmpty) {
skToPkTable.addMapping(shardKey, pk.partKey, diskTTLSeconds)
} else {
Future(Success)
}
}
Task.fromFuture(Future.sequence(Seq(writePkFut, writeSkToPkFut))).map{ respSeq =>
sinkStats.partKeysWrite(1)
Expand Down Expand Up @@ -520,8 +530,14 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
val pkTable = getOrCreatePartitionKeysTable(ref, shard)
val skToPkTable = getOrCreateShardKeyToPartKeyTable(ref, shard)
val shardKey = shardKeyFromPartKey(pk, schemas)
pkTable.deletePartKeyNoAsync(pk)
skToPkTable.deleteMappingNoAsync(shardKey, pk)
// TODO(a_theimer): literally anything better than this
var resp = pkTable.deletePartKeyNoAsync(pk)
if (shardKey.nonEmpty) {
skToPkTable.deleteMappingNoAsync(shardKey, pk) match {
case er: ErrorResponse => resp = er
}
}
resp
}

def getPartKeysByUpdateHour(ref: DatasetRef,
Expand Down

0 comments on commit 5aa7797

Please sign in to comment.