-
Notifications
You must be signed in to change notification settings - Fork 224
/
TimeSeriesChunksTable.scala
254 lines (219 loc) · 10.4 KB
/
TimeSeriesChunksTable.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package filodb.cassandra.columnstore
import java.nio.ByteBuffer
import scala.concurrent.Future
import com.datastax.driver.core.{ConsistencyLevel, ResultSet, Row}
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import filodb.cassandra.FiloCassandraConnector
import filodb.core._
import filodb.core.store._
/**
* Represents the table which holds the actual columnar chunks, but
* with a time series rather than OLAP layout. The chunks for different columns are stored together grouped
* under the same chunkID. This reduces read latency when we read on-demand data because for time series
* we always read all the chunks together.
*/
sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
val connector: FiloCassandraConnector,
writeConsistencyLevel: ConsistencyLevel)
(implicit sched: Scheduler) extends BaseDatasetTable {
import collection.JavaConverters._
import filodb.cassandra.Util._
import filodb.core.Iterators._
val suffix = "tschunks"
private val compressChunks = connector.config.getBoolean("lz4-chunk-compress")
val createCql = s"""CREATE TABLE IF NOT EXISTS $tableString (
| partition blob,
| chunkid bigint,
| info blob,
| chunks frozen<list<blob>>,
| PRIMARY KEY (partition, chunkid)
|) WITH compression = {
'sstable_compression': '$sstableCompression'}""".stripMargin
private lazy val writeChunksCql = session.prepare(
s"INSERT INTO $tableString (partition, chunkid, info, chunks) " +
s"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
private lazy val deleteChunksCql = session.prepare(
s"DELETE FROM $tableString WHERE partition=? AND chunkid IN ?")
.setConsistencyLevel(writeConsistencyLevel)
private lazy val readChunkInCql = session.prepare(
s"SELECT info, chunks FROM $tableString " +
s"WHERE partition = ? AND chunkid IN ?")
.setConsistencyLevel(ConsistencyLevel.ONE)
private lazy val readChunksCql = session.prepare(
s"SELECT chunkid, info, chunks FROM $tableString " +
s"WHERE partition = ? AND chunkid IN ?")
.setConsistencyLevel(ConsistencyLevel.ONE)
private lazy val readAllChunksCql = session.prepare(
s"SELECT chunkid, info, chunks FROM $tableString " +
s"WHERE partition = ?")
.setConsistencyLevel(ConsistencyLevel.ONE)
private lazy val scanBySplit = session.prepare(
s"SELECT partition, info, chunks FROM $tableString " +
s"WHERE TOKEN(partition) >= ? AND TOKEN(partition) < ?")
.setConsistencyLevel(ConsistencyLevel.ONE)
private lazy val readChunkRangeCql = session.prepare(
s"SELECT partition, info, chunks FROM $tableString " +
s"WHERE partition IN ? AND chunkid >= ? AND chunkid < ?")
.setConsistencyLevel(ConsistencyLevel.ONE)
def writeChunks(partition: Array[Byte],
chunkInfo: ChunkSetInfo,
chunks: Seq[ByteBuffer],
stats: ChunkSinkStats,
diskTimeToLive: Int): Future[Response] = {
var chunkBytes = 0L
val chunkList = chunks.map { bytes =>
val finalBytes = compressChunk(bytes)
chunkBytes += finalBytes.capacity.toLong
finalBytes
}.asJava
val insert = writeChunksCql.bind().setBytes(0, toBuffer(partition))
.setLong(1, chunkInfo.id)
.setBytes(2, toBuffer(ChunkSetInfo.toBytes(chunkInfo)))
.setList(3, chunkList, classOf[ByteBuffer])
.setInt(4, diskTimeToLive)
stats.addChunkWriteStats(chunks.length, chunkBytes, chunkInfo.numRows)
connector.execStmtWithRetries(insert.setConsistencyLevel(writeConsistencyLevel))
}
/**
* Writes a single record, exactly as-is from the readChunksNoAsync method. Is
* used to copy records from one column store to another.
*/
def writeChunks(partKeyBytes: ByteBuffer,
row: Row,
stats: ChunkSinkStats,
diskTimeToLiveSeconds: Int): Future[Response] = {
val info = row.getBytes(1)
val chunks = row.getList(2, classOf[ByteBuffer])
val chunkBytes = chunks.asScala.map(buf => buf.remaining()).reduce(_ + _)
stats.addChunkWriteStats(chunks.size(), chunkBytes, ChunkSetInfo.getNumRows(info))
connector.execStmtWithRetries(writeChunksCql.bind(
partKeyBytes, // partition
row.getLong(0): java.lang.Long, // chunkid
info,
chunks,
diskTimeToLiveSeconds: java.lang.Integer)
)
}
/**
* Deletes raw chunk set rows.
*/
def deleteChunks(partKeyBytes: ByteBuffer,
chunkInfos: Seq[ByteBuffer]): Future[Response] = {
val query = deleteChunksCql.bind().setBytes(0, partKeyBytes)
.setList(1, chunkInfos.map(ChunkSetInfo.getChunkID).asJava)
connector.execStmtWithRetries(query)
}
/**
* Reads and returns a single RawPartData, raw data for a single partition/time series
*/
def readRawPartitionData(partKeyBytes: Array[Byte],
chunkInfos: Seq[Array[Byte]]): Task[RawPartData] = {
val query = readChunkInCql.bind().setBytes(0, toBuffer(partKeyBytes))
.setList(1, chunkInfos.map(ChunkSetInfo.getChunkID).asJava)
val futChunksets = session.executeAsync(query)
.toIterator.handleErrors
.map(_.map { row => chunkSetFromRow(row) })
Task.fromFuture(futChunksets).map { chunkSetIt =>
RawPartData(partKeyBytes, chunkSetIt.toBuffer)
}
}
private def chunkSetFromRow(row: Row, infoIndex: Int = 0): RawChunkSet = {
val chunks = row.getList(infoIndex + 1, classOf[ByteBuffer]).toArray.map {
case b: ByteBuffer => decompressChunk(b)
}
RawChunkSet(row.getBytes(infoIndex).array, chunks)
}
/**
* Reads raw chunk set Rows consisting of:
*
* chunkid: Long
* info: ByteBuffer
* chunks: List<ByteBuffer>
*
* Note: This method is intended for use by repair jobs and isn't async-friendly.
*/
def readChunksNoAsync(partKeyBytes: ByteBuffer,
chunkInfos: Seq[ByteBuffer]): ResultSet = {
val query = readChunksCql.bind().setBytes(0, partKeyBytes)
.setList(1, chunkInfos.map(ChunkSetInfo.getChunkID).asJava)
session.execute(query)
}
/**
* Test method which returns the same results as the readChunks method. Not async-friendly.
*/
def readAllChunksNoAsync(partKeyBytes: ByteBuffer): ResultSet = {
session.execute(readAllChunksCql.bind().setBytes(0, partKeyBytes))
}
/**
* Reads and returns a stream of RawPartDatas given a range of chunkIDs from multiple partitions
*/
def readRawPartitionRange(partitions: Seq[Array[Byte]],
startTime: Long,
endTimeExclusive: Long): Observable[RawPartData] = {
readRawPartitionRangeBB(partitions.map(toBuffer), startTime, endTimeExclusive)
}
def readRawPartitionRangeBB(partitions: Seq[ByteBuffer],
startTime: Long,
endTimeExclusive: Long): Observable[RawPartData] = {
val query = readChunkRangeCql.bind().setList(0, partitions.asJava, classOf[ByteBuffer])
.setLong(1, chunkID(startTime, 0))
.setLong(2, chunkID(endTimeExclusive, 0))
val futRawParts: Future[Iterator[RawPartData]] = session.executeAsync(query)
.toIterator.handleErrors
.map { rowIt =>
rowIt.map { row => (row.getBytes(0), chunkSetFromRow(row, 1)) }
.sortedGroupBy(_._1)
.map { case (partKeyBuffer, chunkSetIt) =>
RawPartData(partKeyBuffer.array, chunkSetIt.map(_._2).toBuffer)
}
}
for {
it <- Observable.fromFuture(futRawParts)
rpd <- Observable.fromIterator(it)
} yield rpd
}
/**
* Not an async call - limit number of partitions queried at a time
*/
def readRawPartitionRangeBBNoAsync(partitions: Seq[ByteBuffer],
startTime: Long,
endTimeExclusive: Long): Seq[RawPartData] = {
val query = readChunkRangeCql.bind().setList(0, partitions.asJava, classOf[ByteBuffer])
.setLong(1, chunkID(startTime, 0))
.setLong(2, chunkID(endTimeExclusive, 0))
session.execute(query).iterator().asScala
.map { row => (row.getBytes(0), chunkSetFromRow(row, 1)) }
.sortedGroupBy(_._1)
.map { case (partKeyBuffer, chunkSetIt) =>
RawPartData(partKeyBuffer.array, chunkSetIt.map(_._2).toBuffer)
}.toSeq
}
def scanPartitionsBySplit(tokens: Seq[(String, String)]): Observable[RawPartData] = {
val res: Observable[Future[Iterator[RawPartData]]] = Observable.fromIterable(tokens).map { case (start, end) =>
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
* Correct way to bind tokens is to do stmt.bind().setPartitionKeyToken(token)
*/
val stmt = scanBySplit.bind(start.toLong: java.lang.Long, end.toLong: java.lang.Long)
session.executeAsync(stmt).toIterator.handleErrors
.map { rowIt =>
rowIt.map { row => (row.getBytes(0), chunkSetFromRow(row, 1)) }
.sortedGroupBy(_._1)
.map { case (partKeyBuffer, chunkSetIt) =>
RawPartData(partKeyBuffer.array, chunkSetIt.map(_._2).toBuffer)
}
}
}
for {
fut <- res
it <- Observable.fromFuture(fut)
rpd <- Observable.fromIterator(it)
} yield rpd
}
private def compressChunk(orig: ByteBuffer): ByteBuffer =
if (compressChunks) compress(orig) else orig
}