Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for namespaces to RocksDb kvstore. #2066

Merged
merged 4 commits into from Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions nimbus/db/aristo/aristo_desc/desc_error.nim
Expand Up @@ -236,7 +236,6 @@ type

# RocksDB backend
RdbBeCantCreateDataDir
RdbBeCantCreateBackupDir
RdbBeCantCreateTmpDir
RdbBeDriverInitError
RdbBeDriverGetError
Expand Down Expand Up @@ -268,7 +267,7 @@ type
AccVtxUnsupported
AccNodeUnsupported
PayloadTypeUnsupported

# Miscelaneous handy helpers
AccRootUnacceptable
MptContextMissing
Expand Down
4 changes: 0 additions & 4 deletions nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim
Expand Up @@ -41,7 +41,6 @@ type
const
BaseFolder* = "nimbus" # Same as for Legacy DB
DataFolder* = "aristo" # Legacy DB has "data"
BackupFolder* = "aristo-history" # Legacy DB has "backups"
SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder
TempFolder* = "tmp" # No `tmp` directory used with legacy DB

Expand All @@ -55,9 +54,6 @@ func baseDir*(rdb: RdbInst): string =
func dataDir*(rdb: RdbInst): string =
rdb.baseDir / DataFolder

func backupsDir*(rdb: RdbInst): string =
rdb.basePath / BaseFolder / BackupFolder

func cacheDir*(rdb: RdbInst): string =
rdb.dataDir / TempFolder

Expand Down
7 changes: 1 addition & 6 deletions nimbus/db/aristo/aristo_init/rocks_db/rdb_init.nim
Expand Up @@ -47,16 +47,11 @@ proc init*(

let
dataDir = rdb.dataDir
backupsDir = rdb.backupsDir

try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
try:
backupsDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateBackupDir, ""))
try:
rdb.cacheDir.createDir
except OSError, IOError:
Expand All @@ -68,7 +63,7 @@ proc init*(
let rc = openRocksDb(dataDir, dbOpts)
if rc.isErr:
let error = RdbBeDriverInitError
debug logTxt "driver failed", dataDir, backupsDir, openMax,
debug logTxt "driver failed", dataDir, openMax,
error, info=rc.error
return err((RdbBeDriverInitError, rc.error))

Expand Down
115 changes: 77 additions & 38 deletions nimbus/db/kvstore_rocksdb.nim
Expand Up @@ -11,7 +11,7 @@
{.push raises: [].}

import
std/os,
std/[os, sequtils],
stew/results,
rocksdb,
eth/db/kvstore
Expand All @@ -22,46 +22,43 @@ const maxOpenFiles = 512

type
RocksStoreRef* = ref object of RootObj
db: RocksDbRef
backupEngine: BackupEngineRef
readOnly: bool
db: RocksDbReadWriteRef

proc readOnly*(store: RocksStoreRef): bool =
store.readOnly
RocksNamespaceRef* = ref object of RootObj
colFamily: ColFamilyReadWrite

proc readOnlyDb*(store: RocksStoreRef): RocksDbReadOnlyRef =
doAssert store.readOnly
store.db.RocksDbReadOnlyRef
# ------------------------------------------------------------------------------
# RocksStoreRef procs
# ------------------------------------------------------------------------------

proc readWriteDb*(store: RocksStoreRef): RocksDbReadWriteRef =
doAssert not store.readOnly
store.db.RocksDbReadWriteRef
proc rocksDb*(store: RocksStoreRef): RocksDbReadWriteRef =
store.db

template validateCanWriteAndGet(store: RocksStoreRef): RocksDbReadWriteRef =
if store.readOnly:
raiseAssert "Unimplemented"
store.db.RocksDbReadWriteRef

proc get*(store: RocksStoreRef, key: openArray[byte], onData: kvstore.DataProc): KvResult[bool] =
proc get*(
store: RocksStoreRef,
key: openArray[byte],
onData: kvstore.DataProc): KvResult[bool] =
store.db.get(key, onData)

proc find*(store: RocksStoreRef, prefix: openArray[byte], onFind: kvstore.KeyValueProc): KvResult[int] =
proc find*(
store: RocksStoreRef,
prefix: openArray[byte],
onFind: kvstore.KeyValueProc): KvResult[int] =
raiseAssert "Unimplemented"

proc put*(store: RocksStoreRef, key, value: openArray[byte]): KvResult[void] =
store.validateCanWriteAndGet().put(key, value)
store.db.put(key, value)

proc contains*(store: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
store.db.keyExists(key)

proc del*(store: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
let db = store.validateCanWriteAndGet()

let exists = ? db.keyExists(key)
let exists = ? store.db.keyExists(key)
if not exists:
return ok(false)

let res = db.delete(key)
let res = store.db.delete(key)
if res.isErr():
return err(res.error())

Expand All @@ -72,32 +69,74 @@ proc clear*(store: RocksStoreRef): KvResult[bool] =

proc close*(store: RocksStoreRef) =
store.db.close()
store.backupEngine.close()

proc init*(
T: type RocksStoreRef,
basePath: string,
name: string,
readOnly = false): KvResult[T] =
namespaces = @["default"]): KvResult[T] =

let
dataDir = basePath / name / "data"
backupsDir = basePath / name / "backups"
let dataDir = basePath / name / "data"

try:
createDir(dataDir)
createDir(backupsDir)
except OSError, IOError:
return err("rocksdb: cannot create database directory")

let backupEngine = ? openBackupEngine(backupsDir)
return err("RocksStoreRef: cannot create database directory")

let dbOpts = defaultDbOptions()
dbOpts.setMaxOpenFiles(maxOpenFiles)

if readOnly:
let readOnlyDb = ? openRocksDbReadOnly(dataDir, dbOpts)
ok(T(db: readOnlyDb, backupEngine: backupEngine, readOnly: true))
else:
let readWriteDb = ? openRocksDb(dataDir, dbOpts)
ok(T(db: readWriteDb, backupEngine: backupEngine, readOnly: false))
let db = ? openRocksDb(dataDir, dbOpts,
columnFamilies = namespaces.mapIt(initColFamilyDescriptor(it)))
ok(T(db: db))

# ------------------------------------------------------------------------------
# RocksNamespaceRef procs
# ------------------------------------------------------------------------------

proc name*(store: RocksNamespaceRef): string =
store.colFamily.name

proc get*(
ns: RocksNamespaceRef,
key: openArray[byte],
onData: kvstore.DataProc): KvResult[bool] =
ns.colFamily.get(key, onData)

proc find*(
ns: RocksNamespaceRef,
prefix: openArray[byte],
onFind: kvstore.KeyValueProc): KvResult[int] =
raiseAssert "Unimplemented"

proc put*(ns: RocksNamespaceRef, key, value: openArray[byte]): KvResult[void] =
ns.colFamily.put(key, value)

proc contains*(ns: RocksNamespaceRef, key: openArray[byte]): KvResult[bool] =
ns.colFamily.keyExists(key)

proc del*(ns: RocksNamespaceRef, key: openArray[byte]): KvResult[bool] =
let exists = ? ns.colFamily.keyExists(key)
if not exists:
return ok(false)

let res = ns.colFamily.delete(key)
if res.isErr():
return err(res.error())

ok(true)

proc clear*(ns: RocksNamespaceRef): KvResult[bool] =
raiseAssert "Unimplemented"

proc close*(ns: RocksNamespaceRef) =
# To close the database, call close on RocksStoreRef.
raiseAssert "Unimplemented"

proc openNamespace*(
store: RocksStoreRef,
name: string): KvResult[RocksNamespaceRef] =
doAssert not store.db.isClosed()

let cf = ? store.db.withColFamily(name)
ok(RocksNamespaceRef(colFamily: cf))
3 changes: 1 addition & 2 deletions nimbus/db/kvt/kvt_desc/desc_error.nim
@@ -1,5 +1,5 @@
# nimbus-eth1
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
Expand All @@ -19,7 +19,6 @@ type

# RocksDB backend
RdbBeCantCreateDataDir
RdbBeCantCreateBackupDir
RdbBeCantCreateTmpDir
RdbBeDriverInitError
RdbBeDriverGetError
Expand Down
4 changes: 0 additions & 4 deletions nimbus/db/kvt/kvt_init/rocks_db/rdb_desc.nim
Expand Up @@ -31,7 +31,6 @@ type
const
BaseFolder* = "nimbus" # Same as for Legacy DB
DataFolder* = "kvt" # Legacy DB has "data"
BackupFolder* = "kvt-history" # Legacy DB has "backups"
SstCache* = "bulkput" # Rocks DB bulk load file name in temp folder
TempFolder* = "tmp" # No `tmp` directory used with legacy DB

Expand All @@ -45,9 +44,6 @@ func baseDir*(rdb: RdbInst): string =
func dataDir*(rdb: RdbInst): string =
rdb.baseDir / DataFolder

func backupsDir*(rdb: RdbInst): string =
rdb.basePath / BaseFolder / BackupFolder

func cacheDir*(rdb: RdbInst): string =
rdb.dataDir / TempFolder

Expand Down
7 changes: 1 addition & 6 deletions nimbus/db/kvt/kvt_init/rocks_db/rdb_init.nim
Expand Up @@ -47,16 +47,11 @@ proc init*(

let
dataDir = rdb.dataDir
backupsDir = rdb.backupsDir

try:
dataDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateDataDir, ""))
try:
backupsDir.createDir
except OSError, IOError:
return err((RdbBeCantCreateBackupDir, ""))
try:
rdb.cacheDir.createDir
except OSError, IOError:
Expand All @@ -68,7 +63,7 @@ proc init*(
let rc = openRocksDb(dataDir, dbOpts)
if rc.isErr:
let error = RdbBeDriverInitError
debug logTxt "driver failed", dataDir, backupsDir, openMax,
debug logTxt "driver failed", dataDir, openMax,
error, info=rc.error
return err((RdbBeDriverInitError, rc.error))

Expand Down
4 changes: 2 additions & 2 deletions nimbus/sync/snap/worker/db/rocky_bulk_load.nim
Expand Up @@ -91,7 +91,7 @@ proc lastError*(rbl: RockyBulkLoadRef): string =

proc store*(rbl: RockyBulkLoadRef): RocksDbReadWriteRef =
## Provide the diecriptor for backend functions as defined in `rocksdb`.
rbl.db.readWriteDb()
rbl.db.rocksDb()

# ------------------------------------------------------------------------------
# Public functions
Expand Down Expand Up @@ -154,7 +154,7 @@ proc finish*(

var filePath = rbl.filePath.cstring
if csError.isNil:
rbl.db.readWriteDb().cPtr.rocksdb_ingest_external_file(
rbl.db.rocksDb.cPtr.rocksdb_ingest_external_file(
cast[cstringArray](filePath.addr), 1,
rbl.importOption,
cast[cstringArray](csError.addr))
Expand Down
34 changes: 32 additions & 2 deletions tests/db/test_kvstore_rocksdb.nim
Expand Up @@ -18,8 +18,12 @@ import
../../nimbus/db/kvstore_rocksdb,
eth/../tests/db/test_kvstore

suite "RocksStoreRef":
test "KvStore interface":
suite "KvStore RocksDb Tests":
const
NS_DEFAULT = "default"
NS_OTHER = "other"

test "RocksStoreRef KvStore interface":
let tmp = getTempDir() / "nimbus-test-db"
removeDir(tmp)

Expand All @@ -28,3 +32,29 @@ suite "RocksStoreRef":
db.close()

testKvStore(kvStore db, false, false)

test "RocksNamespaceRef KvStore interface - default namespace":
let tmp = getTempDir() / "nimbus-test-db"
removeDir(tmp)

let db = RocksStoreRef.init(tmp, "test")[]
defer:
db.close()

let defaultNs = db.openNamespace(NS_DEFAULT)[]
testKvStore(kvStore defaultNs, false, false)

test "RocksNamespaceRef KvStore interface - multiple namespace":
let tmp = getTempDir() / "nimbus-test-db"
removeDir(tmp)

let db = RocksStoreRef.init(tmp, "test",
namespaces = @[NS_DEFAULT, NS_OTHER])[]
defer:
db.close()

let defaultNs = db.openNamespace(NS_DEFAULT)[]
testKvStore(kvStore defaultNs, false, false)

let otherNs = db.openNamespace(NS_OTHER)[]
testKvStore(kvStore otherNs, false, false)
2 changes: 1 addition & 1 deletion tests/replay/undump_kvp.nim
Expand Up @@ -57,7 +57,7 @@ proc walkAllDb(
## Walk over all key-value pairs of the database (`RocksDB` only.)
let
rop = rocksdb_readoptions_create()
rit = rocky.readWriteDb.cPtr.rocksdb_create_iterator(rop)
rit = rocky.rocksdb.cPtr.rocksdb_create_iterator(rop)

rit.rocksdb_iter_seek_to_first()
while rit.rocksdb_iter_valid() != 0:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_rocksdb_timing/test_db_timing.nim
Expand Up @@ -137,7 +137,7 @@ proc test_dbTimingRockySetup*(
let
rdb = cdb.backend.toRocksStoreRef
rop = rocksdb_readoptions_create()
rit = rdb.readWriteDb().cPtr.rocksdb_create_iterator(rop)
rit = rdb.rocksDb.cPtr.rocksdb_create_iterator(rop)
check not rit.isNil

var
Expand Down