Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Merge pull request #36 from cleishm/kv-blocking
Browse files Browse the repository at this point in the history
Fix blocking in several KV store implementations
  • Loading branch information
atoulme committed Aug 10, 2018
2 parents a1172e0 + 1834a3e commit 9becebb
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 13 deletions.
5 changes: 4 additions & 1 deletion dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ dependencyManagement {
entry 'kotlin-stdlib'
entry 'kotlin-stdlib-jdk8'
}
dependency('org.jetbrains.kotlinx:kotlinx-coroutines-core:0.24.0')
dependencySet(group: 'org.jetbrains.kotlinx', version: '0.24.0') {
entry 'kotlinx-coroutines-core'
entry 'kotlinx-coroutines-jdk8'
}
dependencySet(group: 'org.jetbrains.spek', version: '1.1.5') {
entry 'spek-api'
entry 'spek-junit-platform-engine'
Expand Down
1 change: 1 addition & 0 deletions kv/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ description = 'Key value store implementations.'
dependencies {
compile project(':bytes')
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
compileOnly project(':concurrent')
compileOnly 'io.lettuce:lettuce-core'
compileOnly 'org.fusesource.leveldbjni:leveldbjni-all'
Expand Down
11 changes: 7 additions & 4 deletions kv/src/main/kotlin/net/consensys/cava/kv/LevelDBKeyValueStore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package net.consensys.cava.kv

import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.newFixedThreadPoolContext
import kotlinx.coroutines.experimental.withContext
import net.consensys.cava.bytes.Bytes
import org.fusesource.leveldbjni.JniDBFactory
Expand All @@ -21,6 +21,7 @@ import org.iq80.leveldb.Options
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import kotlin.coroutines.experimental.CoroutineContext

/**
* A key-value store backed by LevelDB.
Expand All @@ -30,7 +31,9 @@ class LevelDBKeyValueStore
@JvmOverloads
constructor(
databasePath: Path,
options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong())
options: Options = Options().createIfMissing(true).cacheSize((100 * 1048576).toLong()),
// TODO: replace with IO context when https://github.com/Kotlin/kotlinx.coroutines/issues/79 is resolved
private val context: CoroutineContext = newFixedThreadPoolContext(4, "LevelDBKeyValueStore")
) : KeyValueStore {

private val db: DB
Expand All @@ -40,7 +43,7 @@ constructor(
db = JniDBFactory.factory.open(databasePath.toFile(), options)
}

override suspend fun get(key: Bytes): Bytes? = withContext(Unconfined) {
override suspend fun get(key: Bytes): Bytes? = withContext(context) {
val rawValue = db[key.toArrayUnsafe()]
if (rawValue == null) {
null
Expand All @@ -49,7 +52,7 @@ constructor(
}
}

override suspend fun put(key: Bytes, value: Bytes) = withContext(Unconfined) {
override suspend fun put(key: Bytes, value: Bytes) = withContext(context) {
db.put(key.toArrayUnsafe(), value.toArrayUnsafe())
}

Expand Down
13 changes: 9 additions & 4 deletions kv/src/main/kotlin/net/consensys/cava/kv/MapDBKeyValueStore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package net.consensys.cava.kv

import kotlinx.coroutines.experimental.newFixedThreadPoolContext
import kotlinx.coroutines.experimental.withContext
import net.consensys.cava.bytes.Bytes
import org.mapdb.DB
import org.mapdb.DBMaker
Expand All @@ -21,14 +23,17 @@ import org.mapdb.HTreeMap
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import kotlin.coroutines.experimental.CoroutineContext

/**
* A key-value store backed by a MapDB instance.
*/
class MapDBKeyValueStore
@Throws(IOException::class)
constructor(
databasePath: Path
databasePath: Path,
// TODO: replace with IO context when https://github.com/Kotlin/kotlinx.coroutines/issues/79 is resolved
private val context: CoroutineContext = newFixedThreadPoolContext(4, "MapDBKeyValueStore")
) : KeyValueStore {

private val db: DB
Expand All @@ -40,11 +45,11 @@ constructor(
storageData = db.hashMap("storageData", BytesSerializer(), BytesSerializer()).createOrOpen()
}

override suspend fun get(key: Bytes): Bytes? {
return storageData[key]
override suspend fun get(key: Bytes): Bytes? = withContext(context) {
storageData[key]
}

override suspend fun put(key: Bytes, value: Bytes) {
override suspend fun put(key: Bytes, value: Bytes) = withContext(context) {
storageData[key] = value
db.commit()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import io.lettuce.core.RedisURI
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.codec.RedisCodec
import kotlinx.coroutines.experimental.future.await
import net.consensys.cava.bytes.Bytes
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent.CompletionStage

class RedisKeyValueStore(uri: String)
: KeyValueStore {
Expand All @@ -39,12 +41,11 @@ class RedisKeyValueStore(uri: String)
asyncCommands = conn.async()
}

override suspend fun get(key: Bytes): Bytes? {
return asyncCommands.get(key).get()
}
override suspend fun get(key: Bytes): Bytes? = asyncCommands.get(key).await()

override suspend fun put(key: Bytes, value: Bytes) {
asyncCommands.set(key, value).get()
val future: CompletionStage<String> = asyncCommands.set(key, value)
future.await()
}

override fun close() {
Expand Down

0 comments on commit 9becebb

Please sign in to comment.