Skip to content

Commit

Permalink
Added support for namespaces to RocksDb kvstore. (#2066)
Browse files Browse the repository at this point in the history
* Add new RocksNamespaceRef type and remove backups and readonly support from RocksDb KvStore.

* Bump nim-rocksdb to fc2ba4a836b6b47ae1b17d1c45801c7e06585e19

* Fix tests.

* Fix copyright notice.
  • Loading branch information
web3-developer committed Mar 12, 2024
1 parent 1159b01 commit 799acf3
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 69 deletions.
3 changes: 1 addition & 2 deletions nimbus/db/aristo/aristo_desc/desc_error.nim
Expand Up @@ -236,7 +236,6 @@ type

# RocksDB backend
RdbBeCantCreateDataDir
RdbBeCantCreateBackupDir
RdbBeCantCreateTmpDir
RdbBeDriverInitError
RdbBeDriverGetError
Expand Down Expand Up @@ -268,7 +267,7 @@ type
AccVtxUnsupported
AccNodeUnsupported
PayloadTypeUnsupported

# Miscelaneous handy helpers
AccRootUnacceptable
MptContextMissing
Expand Down
4 changes: 0 additions & 4 deletions nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim
Expand Up @@ -41,7 +41,6 @@ type
const
BaseFolder* = "nimbus" # Same as for Legacy DB
DataFolder* = "aristo" # Legacy DB has "data"
BackupFolder* = "aristo-history" # Legacy DB has "backups"
SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder
TempFolder* = "tmp" # No `tmp` directory used with legacy DB

Expand All @@ -55,9 +54,6 @@ func baseDir*(rdb: RdbInst): string =
func dataDir*(rdb: RdbInst): string =
rdb.baseDir / DataFolder

func backupsDir*(rdb: RdbInst): string =
rdb.basePath / BaseFolder / BackupFolder

func cacheDir*(rdb: RdbInst): string =
rdb.dataDir / TempFolder

Expand Down
7 changes: 1 addition & 6 deletions nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim
Expand Up @@ -47,16 +47,11 @@ proc init*(

let
dataDir = rdb.dataDir
backupsDir = rdb.backupsDir

try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
try:
backupsDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateBackupDir, ""))
try:
rdb.cacheDir.createDir
except OSError, IOError:
Expand All @@ -68,7 +63,7 @@ proc init*(
let rc = openRocksDb(dataDir, dbOpts)
if rc.isErr:
let error = RdbBeDriverInitError
debug logTxt "driver failed", dataDir, backupsDir, openMax,
debug logTxt "driver failed", dataDir, openMax,
error, info=rc.error
return err((RdbBeDriverInitError, rc.error))

Expand Down
115 changes: 77 additions & 38 deletions nimbus/db/kvstore_rocksdb.nim
Expand Up @@ -11,7 +11,7 @@
{.push raises: [].}

import
std/os,
std/[os, sequtils],
stew/results,
rocksdb,
eth/db/kvstore
Expand All @@ -22,46 +22,43 @@ const maxOpenFiles = 512

type
RocksStoreRef* = ref object of RootObj
db: RocksDbRef
backupEngine: BackupEngineRef
readOnly: bool
db: RocksDbReadWriteRef

proc readOnly*(store: RocksStoreRef): bool =
store.readOnly
RocksNamespaceRef* = ref object of RootObj
colFamily: ColFamilyReadWrite

proc readOnlyDb*(store: RocksStoreRef): RocksDbReadOnlyRef =
doAssert store.readOnly
store.db.RocksDbReadOnlyRef
# ------------------------------------------------------------------------------
# RocksStoreRef procs
# ------------------------------------------------------------------------------

proc readWriteDb*(store: RocksStoreRef): RocksDbReadWriteRef =
doAssert not store.readOnly
store.db.RocksDbReadWriteRef
proc rocksDb*(store: RocksStoreRef): RocksDbReadWriteRef =
store.db

template validateCanWriteAndGet(store: RocksStoreRef): RocksDbReadWriteRef =
if store.readOnly:
raiseAssert "Unimplemented"
store.db.RocksDbReadWriteRef

proc get*(store: RocksStoreRef, key: openArray[byte], onData: kvstore.DataProc): KvResult[bool] =
proc get*(
store: RocksStoreRef,
key: openArray[byte],
onData: kvstore.DataProc): KvResult[bool] =
store.db.get(key, onData)

proc find*(store: RocksStoreRef, prefix: openArray[byte], onFind: kvstore.KeyValueProc): KvResult[int] =
proc find*(
store: RocksStoreRef,
prefix: openArray[byte],
onFind: kvstore.KeyValueProc): KvResult[int] =
raiseAssert "Unimplemented"

proc put*(store: RocksStoreRef, key, value: openArray[byte]): KvResult[void] =
store.validateCanWriteAndGet().put(key, value)
store.db.put(key, value)

proc contains*(store: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
store.db.keyExists(key)

proc del*(store: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
let db = store.validateCanWriteAndGet()

let exists = ? db.keyExists(key)
let exists = ? store.db.keyExists(key)
if not exists:
return ok(false)

let res = db.delete(key)
let res = store.db.delete(key)
if res.isErr():
return err(res.error())

Expand All @@ -72,32 +69,74 @@ proc clear*(store: RocksStoreRef): KvResult[bool] =

proc close*(store: RocksStoreRef) =
store.db.close()
store.backupEngine.close()

proc init*(
T: type RocksStoreRef,
basePath: string,
name: string,
readOnly = false): KvResult[T] =
namespaces = @["default"]): KvResult[T] =

let
dataDir = basePath / name / "data"
backupsDir = basePath / name / "backups"
let dataDir = basePath / name / "data"

try:
createDir(dataDir)
createDir(backupsDir)
except OSError, IOError:
return err("rocksdb: cannot create database directory")

let backupEngine = ? openBackupEngine(backupsDir)
return err("RocksStoreRef: cannot create database directory")

let dbOpts = defaultDbOptions()
dbOpts.setMaxOpenFiles(maxOpenFiles)

if readOnly:
let readOnlyDb = ? openRocksDbReadOnly(dataDir, dbOpts)
ok(T(db: readOnlyDb, backupEngine: backupEngine, readOnly: true))
else:
let readWriteDb = ? openRocksDb(dataDir, dbOpts)
ok(T(db: readWriteDb, backupEngine: backupEngine, readOnly: false))
let db = ? openRocksDb(dataDir, dbOpts,
columnFamilies = namespaces.mapIt(initColFamilyDescriptor(it)))
ok(T(db: db))

# ------------------------------------------------------------------------------
# RocksNamespaceRef procs
# ------------------------------------------------------------------------------

proc name*(store: RocksNamespaceRef): string =
store.colFamily.name

proc get*(
ns: RocksNamespaceRef,
key: openArray[byte],
onData: kvstore.DataProc): KvResult[bool] =
ns.colFamily.get(key, onData)

proc find*(
ns: RocksNamespaceRef,
prefix: openArray[byte],
onFind: kvstore.KeyValueProc): KvResult[int] =
raiseAssert "Unimplemented"

proc put*(ns: RocksNamespaceRef, key, value: openArray[byte]): KvResult[void] =
ns.colFamily.put(key, value)

proc contains*(ns: RocksNamespaceRef, key: openArray[byte]): KvResult[bool] =
ns.colFamily.keyExists(key)

proc del*(ns: RocksNamespaceRef, key: openArray[byte]): KvResult[bool] =
let exists = ? ns.colFamily.keyExists(key)
if not exists:
return ok(false)

let res = ns.colFamily.delete(key)
if res.isErr():
return err(res.error())

ok(true)

proc clear*(ns: RocksNamespaceRef): KvResult[bool] =
raiseAssert "Unimplemented"

proc close*(ns: RocksNamespaceRef) =
# To close the database, call close on RocksStoreRef.
raiseAssert "Unimplemented"

proc openNamespace*(
store: RocksStoreRef,
name: string): KvResult[RocksNamespaceRef] =
doAssert not store.db.isClosed()

let cf = ? store.db.withColFamily(name)
ok(RocksNamespaceRef(colFamily: cf))
3 changes: 1 addition & 2 deletions nimbus/db/kvt/kvt_desc/desc_error.nim
@@ -1,5 +1,5 @@
# nimbus-eth1
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
Expand All @@ -19,7 +19,6 @@ type

# RocksDB backend
RdbBeCantCreateDataDir
RdbBeCantCreateBackupDir
RdbBeCantCreateTmpDir
RdbBeDriverInitError
RdbBeDriverGetError
Expand Down
4 changes: 0 additions & 4 deletions nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim
Expand Up @@ -31,7 +31,6 @@ type
const
BaseFolder* = "nimbus" # Same as for Legacy DB
DataFolder* = "kvt" # Legacy DB has "data"
BackupFolder* = "kvt-history" # Legacy DB has "backups"
SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder
TempFolder* = "tmp" # No `tmp` directory used with legacy DB

Expand All @@ -45,9 +44,6 @@ func baseDir*(rdb: RdbInst): string =
func dataDir*(rdb: RdbInst): string =
rdb.baseDir / DataFolder

func backupsDir*(rdb: RdbInst): string =
rdb.basePath / BaseFolder / BackupFolder

func cacheDir*(rdb: RdbInst): string =
rdb.dataDir / TempFolder

Expand Down
7 changes: 1 addition & 6 deletions nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim
Expand Up @@ -47,16 +47,11 @@ proc init*(

let
dataDir = rdb.dataDir
backupsDir = rdb.backupsDir

try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
try:
backupsDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateBackupDir, ""))
try:
rdb.cacheDir.createDir
except OSError, IOError:
Expand All @@ -68,7 +63,7 @@ proc init*(
let rc = openRocksDb(dataDir, dbOpts)
if rc.isErr:
let error = RdbBeDriverInitError
debug logTxt "driver failed", dataDir, backupsDir, openMax,
debug logTxt "driver failed", dataDir, openMax,
error, info=rc.error
return err((RdbBeDriverInitError, rc.error))

Expand Down
4 changes: 2 additions & 2 deletions nimbus/sync/snap/worker/db/rocky_bulk_load.nim
Expand Up @@ -91,7 +91,7 @@ proc lastError*(rbl: RockyBulkLoadRef): string =

proc store*(rbl: RockyBulkLoadRef): RocksDbReadWriteRef =
## Provide the diecriptor for backend functions as defined in `rocksdb`.
rbl.db.readWriteDb()
rbl.db.rocksDb()

# ------------------------------------------------------------------------------
# Public functions
Expand Down Expand Up @@ -154,7 +154,7 @@ proc finish*(

var filePath = rbl.filePath.cstring
if csError.isNil:
rbl.db.readWriteDb().cPtr.rocksdb_ingest_external_file(
rbl.db.rocksDb.cPtr.rocksdb_ingest_external_file(
cast[cstringArray](filePath.addr), 1,
rbl.importOption,
cast[cstringArray](csError.addr))
Expand Down
34 changes: 32 additions & 2 deletions tests/db/test_kvstore_rocksdb.nim
Expand Up @@ -18,8 +18,12 @@ import
../../nimbus/db/kvstore_rocksdb,
eth/../tests/db/test_kvstore

suite "RocksStoreRef":
test "KvStore interface":
suite "KvStore RocksDb Tests":
const
NS_DEFAULT = "default"
NS_OTHER = "other"

test "RocksStoreRef KvStore interface":
let tmp = getTempDir() / "nimbus-test-db"
removeDir(tmp)

Expand All @@ -28,3 +32,29 @@ suite "RocksStoreRef":
db.close()

testKvStore(kvStore db, false, false)

test "RocksNamespaceRef KvStore interface - default namespace":
let tmp = getTempDir() / "nimbus-test-db"
removeDir(tmp)

let db = RocksStoreRef.init(tmp, "test")[]
defer:
db.close()

let defaultNs = db.openNamespace(NS_DEFAULT)[]
testKvStore(kvStore defaultNs, false, false)

test "RocksNamespaceRef KvStore interface - multiple namespace":
let tmp = getTempDir() / "nimbus-test-db"
removeDir(tmp)

let db = RocksStoreRef.init(tmp, "test",
namespaces = @[NS_DEFAULT, NS_OTHER])[]
defer:
db.close()

let defaultNs = db.openNamespace(NS_DEFAULT)[]
testKvStore(kvStore defaultNs, false, false)

let otherNs = db.openNamespace(NS_OTHER)[]
testKvStore(kvStore otherNs, false, false)
2 changes: 1 addition & 1 deletion tests/replay/undump_kvp.nim
Expand Up @@ -57,7 +57,7 @@ proc walkAllDb(
## Walk over all key-value pairs of the database (`RocksDB` only.)
let
rop = rocksdb_readoptions_create()
rit = rocky.readWriteDb.cPtr.rocksdb_create_iterator(rop)
rit = rocky.rocksdb.cPtr.rocksdb_create_iterator(rop)

rit.rocksdb_iter_seek_to_first()
while rit.rocksdb_iter_valid() != 0:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_rocksdb_timing/test_db_timing.nim
Expand Up @@ -137,7 +137,7 @@ proc test_dbTimingRockySetup*(
let
rdb = cdb.backend.toRocksStoreRef
rop = rocksdb_readoptions_create()
rit = rdb.readWriteDb().cPtr.rocksdb_create_iterator(rop)
rit = rdb.rocksDb.cPtr.rocksdb_create_iterator(rop)
check not rit.isNil

var
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-rocksdb

0 comments on commit 799acf3

Please sign in to comment.