Skip to content

Commit

Permalink
Added support for ingesting sst files using the SstFileWriter. (#37)
Browse files Browse the repository at this point in the history
* Create ColFamilyReadOnly and ColFamilyReadWrite types for using a specific column family.

* Use inline pragma for small procs and add lock to RocksDbRef type close to prevent double free.

* Added documentation for the public API.

* Initial implementation of sst filewriter.

* Added tests for sstfilewriter.

* Documentation minor improvements.
  • Loading branch information
web3-developer committed Mar 5, 2024
1 parent 2eaa725 commit a0a4b47
Show file tree
Hide file tree
Showing 24 changed files with 629 additions and 76 deletions.
13 changes: 11 additions & 2 deletions rocksdb.nim
Expand Up @@ -7,6 +7,15 @@
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import ./rocksdb/[backup, rocksdb, rocksiterator, transactiondb, writebatch]
import
./rocksdb/[backup, columnfamily, rocksdb, rocksiterator],
./rocksdb/[sstfilewriter, transactiondb, writebatch]

export backup, rocksdb, rocksiterator, transactiondb, writebatch
export
backup,
columnfamily,
rocksdb,
rocksiterator,
sstfilewriter,
transactiondb,
writebatch
12 changes: 11 additions & 1 deletion rocksdb/backup.nim
Expand Up @@ -7,6 +7,8 @@
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## A `BackupEngineRef` is used to create and manage backups against a RocksDB database.

{.push raises: [].}

import
Expand All @@ -32,6 +34,9 @@ type
proc openBackupEngine*(
path: string,
backupOpts = defaultBackupEngineOptions()): RocksDBResult[BackupEngineRef] =
## Create a new backup engine. The `path` parameter is the path of the backup
## directory. Note that the same directory should not be used for both backups
## and the database itself.

var errors: cstring
let backupEnginePtr = rocksdb_backup_engine_open(
Expand All @@ -46,13 +51,16 @@ proc openBackupEngine*(
backupOpts: backupOpts)
ok(engine)

template isClosed*(backupEngine: BackupEngineRef): bool =
proc isClosed*(backupEngine: BackupEngineRef): bool {.inline.} =
## Returns `true` if the `BackupEngineRef` has been closed.
backupEngine.cPtr.isNil()

proc createNewBackup*(
backupEngine: BackupEngineRef,
db: RocksDbRef): RocksDBResult[void] =
## Create a new backup of the database.
doAssert not backupEngine.isClosed()
doAssert not db.isClosed()

var errors: cstring
rocksdb_backup_engine_create_new_backup(
Expand All @@ -68,6 +76,7 @@ proc restoreDbFromLatestBackup*(
dbDir: string,
walDir = dbDir,
keepLogFiles = false): RocksDBResult[void] =
## Restore the database from the latest backup.
doAssert not backupEngine.isClosed()

let restoreOptions = rocksdb_restore_options_create()
Expand All @@ -87,6 +96,7 @@ proc restoreDbFromLatestBackup*(
ok()

proc close*(backupEngine: BackupEngineRef) =
## Close the `BackupEngineRef`.
if not backupEngine.isClosed():
rocksdb_backup_engine_close(backupEngine.cPtr)
backupEngine.cPtr = nil
113 changes: 113 additions & 0 deletions rocksdb/columnfamily.nim
@@ -0,0 +1,113 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## `ColFamilyReadOnly` and `ColFamilyReadWrite` types both hold a reference to a
## `RocksDbReadOnlyRef` or `RocksDbReadWriteRef` respectively. They are convenience
## types which enable writing to a specific column family without having to specify the
## column family in each call.
##
## These column family types do not own the underlying `RocksDbRef` and therefore
## to close the database, simply call `columnFamily.db.close()` which will close
## the underlying `RocksDbRef`. Note that doing so will also impact any other column
## families that hold a reference to the same `RocksDbRef`.

{.push raises: [].}

import
./rocksdb

export rocksdb

type
ColFamilyReadOnly* = object
db: RocksDbReadOnlyRef
name: string

ColFamilyReadWrite* = object
db: RocksDbReadWriteRef
name: string

proc withColFamily*(
db: RocksDbReadOnlyRef,
name: string): RocksDBResult[ColFamilyReadOnly] =
## Creates a new `ColFamilyReadOnly` from the given `RocksDbReadOnlyRef` and
## column family name.

# validate that the column family exists
discard db.keyExists(@[0.byte], name).valueOr:
return err(error)

ok(ColFamilyReadOnly(db: db, name: name))

proc withColFamily*(
db: RocksDbReadWriteRef,
name: string): RocksDBResult[ColFamilyReadWrite] =
## Create a new `ColFamilyReadWrite` from the given `RocksDbReadWriteRef` and
## column family name.

# validate that the column family exists
discard db.keyExists(@[0.byte], name).valueOr:
return err(error)

ok(ColFamilyReadWrite(db: db, name: name))

proc db*(cf: ColFamilyReadOnly | ColFamilyReadWrite): auto {.inline.} =
## Returns the underlying `RocksDbReadOnlyRef` or `RocksDbReadWriteRef`.
cf.db

proc name*(cf: ColFamilyReadOnly | ColFamilyReadWrite): string {.inline.} =
## Returns the name of the column family.
cf.name

proc get*(
cf: ColFamilyReadOnly | ColFamilyReadWrite,
key: openArray[byte],
onData: DataProc): RocksDBResult[bool] {.inline.} =
## Gets the value of the given key from the column family using the `onData`
## callback.
cf.db.get(key, onData, cf.name)

proc get*(
cf: ColFamilyReadOnly | ColFamilyReadWrite,
key: openArray[byte]): RocksDBResult[seq[byte]] {.inline.} =
## Gets the value of the given key from the column family.
cf.db.get(key, cf.name)

proc put*(
cf: ColFamilyReadWrite,
key, val: openArray[byte]): RocksDBResult[void] {.inline.} =
## Puts a value for the given key into the column family.
cf.db.put(key, val, cf.name)

proc keyExists*(
cf: ColFamilyReadOnly | ColFamilyReadWrite,
key: openArray[byte]): RocksDBResult[bool] {.inline.} =
## Checks if the given key exists in the column family.
cf.db.keyExists(key, cf.name)

proc delete*(
cf: ColFamilyReadWrite,
key: openArray[byte]): RocksDBResult[void] {.inline.} =
## Deletes the given key from the column family.
cf.db.delete(key, cf.name)

proc openIterator*(
cf: ColFamilyReadOnly | ColFamilyReadWrite): RocksDBResult[RocksIteratorRef] {.inline.} =
## Opens an `RocksIteratorRef` for the given column family.
cf.db.openIterator(cf.name)

proc openWriteBatch*(cf: ColFamilyReadWrite): WriteBatchRef {.inline.} =
## Opens a `WriteBatchRef` for the given column family.
cf.db.openWriteBatch(cf.name)

proc write*(
cf: ColFamilyReadWrite,
updates: WriteBatchRef): RocksDBResult[void] {.inline.} =
## Writes the updates in the `WriteBatchRef` to the column family.
cf.db.write(updates)
12 changes: 6 additions & 6 deletions rocksdb/columnfamily/cfdescriptor.nim
Expand Up @@ -25,20 +25,20 @@ proc initColFamilyDescriptor*(
options = defaultColFamilyOptions()): ColFamilyDescriptor =
ColFamilyDescriptor(name: name, options: options)

template name*(descriptor: ColFamilyDescriptor): string =
proc name*(descriptor: ColFamilyDescriptor): string {.inline.} =
descriptor.name

template options*(descriptor: ColFamilyDescriptor): ColFamilyOptionsRef =
proc options*(descriptor: ColFamilyDescriptor): ColFamilyOptionsRef {.inline.} =
descriptor.options

template isDefault*(descriptor: ColFamilyDescriptor): bool =
proc isDefault*(descriptor: ColFamilyDescriptor): bool {.inline.} =
descriptor.name == DEFAULT_COLUMN_FAMILY_NAME

template defaultColFamilyDescriptor*(): ColFamilyDescriptor =
proc defaultColFamilyDescriptor*(): ColFamilyDescriptor {.inline.} =
initColFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME)

template isClosed*(descriptor: ColFamilyDescriptor): bool =
proc isClosed*(descriptor: ColFamilyDescriptor): bool {.inline.} =
descriptor.options.isClosed()

template close*(descriptor: ColFamilyDescriptor) =
proc close*(descriptor: ColFamilyDescriptor) {.inline.} =
descriptor.options.close()
4 changes: 2 additions & 2 deletions rocksdb/columnfamily/cfhandle.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newColFamilyHandle*(cPtr: ColFamilyHandlePtr): ColFamilyHandleRef =
ColFamilyHandleRef(cPtr: cPtr)

template isClosed*(handle: ColFamilyHandleRef): bool =
proc isClosed*(handle: ColFamilyHandleRef): bool {.inline.} =
handle.cPtr.isNil()

proc cPtr*(handle: ColFamilyHandleRef): ColFamilyHandlePtr =
Expand All @@ -41,7 +41,7 @@ proc cPtr*(handle: ColFamilyHandleRef): ColFamilyHandlePtr =
# var nameLen: csize_t
# $rocksdb_column_family_handle_get_name(handle.cPtr, nameLen.addr)

# template isDefault*(handle: ColFamilyHandleRef): bool =
# proc isDefault*(handle: ColFamilyHandleRef): bool {.inline.} =
# handle.getName() == DEFAULT_COLUMN_FAMILY_NAME

proc close*(handle: ColFamilyHandleRef) =
Expand Down
2 changes: 1 addition & 1 deletion rocksdb/columnfamily/cfopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newColFamilyOptions*(): ColFamilyOptionsRef =
ColFamilyOptionsRef(cPtr: rocksdb_options_create())

template isClosed*(cfOpts: ColFamilyOptionsRef): bool =
proc isClosed*(cfOpts: ColFamilyOptionsRef): bool {.inline.} =
cfOpts.cPtr.isNil()

proc cPtr*(cfOpts: ColFamilyOptionsRef): ColFamilyOptionsPtr =
Expand Down
2 changes: 1 addition & 1 deletion rocksdb/internal/cftable.nim
Expand Up @@ -31,7 +31,7 @@ proc newColFamilyTable*(

ColFamilyTableRef(columnFamilies: cfTable)

template isClosed*(table: ColFamilyTableRef): bool =
proc isClosed*(table: ColFamilyTableRef): bool {.inline.} =
table.columnFamilies.isNil()

proc get*(table: ColFamilyTableRef, name: string): ColFamilyHandleRef =
Expand Down
6 changes: 6 additions & 0 deletions rocksdb/internal/utils.nim
Expand Up @@ -10,10 +10,16 @@
{.push raises: [].}

import
std/locks,
../lib/librocksdb

const DEFAULT_COLUMN_FAMILY_NAME* = "default"

proc createLock*(): Lock =
var lock = Lock()
initLock(lock)
lock

template bailOnErrors*(errors: cstring): auto =
if not errors.isNil:
let res = err($(errors))
Expand Down
4 changes: 2 additions & 2 deletions rocksdb/options/backupopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newBackupEngineOptions*(): BackupEngineOptionsRef =
BackupEngineOptionsRef(cPtr: rocksdb_options_create())

template isClosed*(engineOpts: BackupEngineOptionsRef): bool =
proc isClosed*(engineOpts: BackupEngineOptionsRef): bool {.inline.} =
engineOpts.cPtr.isNil()

proc cPtr*(engineOpts: BackupEngineOptionsRef): BackupEngineOptionsPtr =
Expand All @@ -30,7 +30,7 @@ proc cPtr*(engineOpts: BackupEngineOptionsRef): BackupEngineOptionsPtr =

# TODO: Add setters and getters for backup options properties.

template defaultBackupEngineOptions*(): BackupEngineOptionsRef =
proc defaultBackupEngineOptions*(): BackupEngineOptionsRef {.inline.} =
newBackupEngineOptions()
# TODO: set prefered defaults

Expand Down
2 changes: 1 addition & 1 deletion rocksdb/options/dbopts.nim
Expand Up @@ -22,7 +22,7 @@ type
proc newDbOptions*(): DbOptionsRef =
DbOptionsRef(cPtr: rocksdb_options_create())

template isClosed*(dbOpts: DbOptionsRef): bool =
proc isClosed*(dbOpts: DbOptionsRef): bool {.inline.} =
dbOpts.cPtr.isNil()

proc cPtr*(dbOpts: DbOptionsRef): DbOptionsPtr =
Expand Down
4 changes: 2 additions & 2 deletions rocksdb/options/readopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newReadOptions*(): ReadOptionsRef =
ReadOptionsRef(cPtr: rocksdb_readoptions_create())

template isClosed*(readOpts: ReadOptionsRef): bool =
proc isClosed*(readOpts: ReadOptionsRef): bool {.inline.} =
readOpts.cPtr.isNil()

proc cPtr*(readOpts: ReadOptionsRef): ReadOptionsPtr =
Expand All @@ -30,7 +30,7 @@ proc cPtr*(readOpts: ReadOptionsRef): ReadOptionsPtr =

# TODO: Add setters and getters for read options properties.

template defaultReadOptions*(): ReadOptionsRef =
proc defaultReadOptions*(): ReadOptionsRef {.inline.} =
newReadOptions()
# TODO: set prefered defaults

Expand Down
4 changes: 2 additions & 2 deletions rocksdb/options/writeopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newWriteOptions*(): WriteOptionsRef =
WriteOptionsRef(cPtr: rocksdb_writeoptions_create())

template isClosed*(writeOpts: WriteOptionsRef): bool =
proc isClosed*(writeOpts: WriteOptionsRef): bool {.inline.} =
writeOpts.cPtr.isNil()

proc cPtr*(writeOpts: WriteOptionsRef): WriteOptionsPtr =
Expand All @@ -30,7 +30,7 @@ proc cPtr*(writeOpts: WriteOptionsRef): WriteOptionsPtr =

# TODO: Add setters and getters for write options properties.

template defaultWriteOptions*(): WriteOptionsRef =
proc defaultWriteOptions*(): WriteOptionsRef {.inline.} =
newWriteOptions()
# TODO: set prefered defaults

Expand Down

0 comments on commit a0a4b47

Please sign in to comment.