Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for ingesting sst files using the SstFileWriter. #37

Merged
merged 6 commits into from Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions rocksdb.nim
Expand Up @@ -7,6 +7,15 @@
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import ./rocksdb/[backup, rocksdb, rocksiterator, transactiondb, writebatch]
import
./rocksdb/[backup, columnfamily, rocksdb, rocksiterator],
./rocksdb/[sstfilewriter, transactiondb, writebatch]

export backup, rocksdb, rocksiterator, transactiondb, writebatch
export
backup,
columnfamily,
rocksdb,
rocksiterator,
sstfilewriter,
transactiondb,
writebatch
12 changes: 11 additions & 1 deletion rocksdb/backup.nim
Expand Up @@ -7,6 +7,8 @@
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## A `BackupEngineRef` is used to create and manage backups against a RocksDB database.

{.push raises: [].}

import
Expand All @@ -32,6 +34,9 @@ type
proc openBackupEngine*(
path: string,
backupOpts = defaultBackupEngineOptions()): RocksDBResult[BackupEngineRef] =
## Create a new backup engine. The `path` parameter is the path of the backup
## directory. Note that the same directory should not be used for both backups
## and the database itself.

var errors: cstring
let backupEnginePtr = rocksdb_backup_engine_open(
Expand All @@ -46,13 +51,16 @@ proc openBackupEngine*(
backupOpts: backupOpts)
ok(engine)

template isClosed*(backupEngine: BackupEngineRef): bool =
proc isClosed*(backupEngine: BackupEngineRef): bool {.inline.} =
## Returns `true` if the `BackupEngineRef` has been closed.
backupEngine.cPtr.isNil()

proc createNewBackup*(
backupEngine: BackupEngineRef,
db: RocksDbRef): RocksDBResult[void] =
## Create a new backup of the database.
doAssert not backupEngine.isClosed()
doAssert not db.isClosed()

var errors: cstring
rocksdb_backup_engine_create_new_backup(
Expand All @@ -68,6 +76,7 @@ proc restoreDbFromLatestBackup*(
dbDir: string,
walDir = dbDir,
keepLogFiles = false): RocksDBResult[void] =
## Restore the database from the latest backup.
doAssert not backupEngine.isClosed()

let restoreOptions = rocksdb_restore_options_create()
Expand All @@ -87,6 +96,7 @@ proc restoreDbFromLatestBackup*(
ok()

proc close*(backupEngine: BackupEngineRef) =
## Close the `BackupEngineRef`.
if not backupEngine.isClosed():
rocksdb_backup_engine_close(backupEngine.cPtr)
backupEngine.cPtr = nil
113 changes: 113 additions & 0 deletions rocksdb/columnfamily.nim
@@ -0,0 +1,113 @@
# Nim-RocksDB
# Copyright 2024 Status Research & Development GmbH
# Licensed under either of
#
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * GPL license, version 2.0, ([LICENSE-GPLv2](LICENSE-GPLv2) or https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html)
#
# at your option. This file may not be copied, modified, or distributed except according to those terms.

## `ColFamilyReadOnly` and `ColFamilyReadWrite` types both hold a reference to a
## `RocksDbReadOnlyRef` or `RocksDbReadWriteRef` respectively. They are convenience
## types which enable writing to a specific column family without having to specify the
## column family in each call.
##
## These column family types do not own the underlying `RocksDbRef` and therefore
## to close the database, simply call `columnFamily.db.close()` which will close
## the underlying `RocksDbRef`. Note that doing so will also impact any other column
## families that hold a reference to the same `RocksDbRef`.

{.push raises: [].}

import
./rocksdb

export rocksdb

type
ColFamilyReadOnly* = object
db: RocksDbReadOnlyRef
name: string

ColFamilyReadWrite* = object
db: RocksDbReadWriteRef
name: string

proc withColFamily*(
db: RocksDbReadOnlyRef,
name: string): RocksDBResult[ColFamilyReadOnly] =
## Creates a new `ColFamilyReadOnly` from the given `RocksDbReadOnlyRef` and
## column family name.

# validate that the column family exists
discard db.keyExists(@[0.byte], name).valueOr:
return err(error)

ok(ColFamilyReadOnly(db: db, name: name))

proc withColFamily*(
db: RocksDbReadWriteRef,
name: string): RocksDBResult[ColFamilyReadWrite] =
## Create a new `ColFamilyReadWrite` from the given `RocksDbReadWriteRef` and
## column family name.

# validate that the column family exists
discard db.keyExists(@[0.byte], name).valueOr:
return err(error)

ok(ColFamilyReadWrite(db: db, name: name))

proc db*(cf: ColFamilyReadOnly | ColFamilyReadWrite): auto {.inline.} =
## Returns the underlying `RocksDbReadOnlyRef` or `RocksDbReadWriteRef`.
cf.db

proc name*(cf: ColFamilyReadOnly | ColFamilyReadWrite): string {.inline.} =
## Returns the name of the column family.
cf.name

proc get*(
cf: ColFamilyReadOnly | ColFamilyReadWrite,
key: openArray[byte],
onData: DataProc): RocksDBResult[bool] {.inline.} =
## Gets the value of the given key from the column family using the `onData`
## callback.
cf.db.get(key, onData, cf.name)

proc get*(
cf: ColFamilyReadOnly | ColFamilyReadWrite,
key: openArray[byte]): RocksDBResult[seq[byte]] {.inline.} =
## Gets the value of the given key from the column family.
cf.db.get(key, cf.name)

proc put*(
cf: ColFamilyReadWrite,
key, val: openArray[byte]): RocksDBResult[void] {.inline.} =
## Puts a value for the given key into the column family.
cf.db.put(key, val, cf.name)

proc keyExists*(
cf: ColFamilyReadOnly | ColFamilyReadWrite,
key: openArray[byte]): RocksDBResult[bool] {.inline.} =
## Checks if the given key exists in the column family.
cf.db.keyExists(key, cf.name)

proc delete*(
cf: ColFamilyReadWrite,
key: openArray[byte]): RocksDBResult[void] {.inline.} =
## Deletes the given key from the column family.
cf.db.delete(key, cf.name)

proc openIterator*(
cf: ColFamilyReadOnly | ColFamilyReadWrite): RocksDBResult[RocksIteratorRef] {.inline.} =
## Opens an `RocksIteratorRef` for the given column family.
cf.db.openIterator(cf.name)

proc openWriteBatch*(cf: ColFamilyReadWrite): WriteBatchRef {.inline.} =
## Opens a `WriteBatchRef` for the given column family.
cf.db.openWriteBatch(cf.name)

proc write*(
cf: ColFamilyReadWrite,
updates: WriteBatchRef): RocksDBResult[void] {.inline.} =
## Writes the updates in the `WriteBatchRef` to the column family.
cf.db.write(updates)
12 changes: 6 additions & 6 deletions rocksdb/columnfamily/cfdescriptor.nim
Expand Up @@ -25,20 +25,20 @@ proc initColFamilyDescriptor*(
options = defaultColFamilyOptions()): ColFamilyDescriptor =
ColFamilyDescriptor(name: name, options: options)

template name*(descriptor: ColFamilyDescriptor): string =
proc name*(descriptor: ColFamilyDescriptor): string {.inline.} =
descriptor.name

template options*(descriptor: ColFamilyDescriptor): ColFamilyOptionsRef =
proc options*(descriptor: ColFamilyDescriptor): ColFamilyOptionsRef {.inline.} =
descriptor.options

template isDefault*(descriptor: ColFamilyDescriptor): bool =
proc isDefault*(descriptor: ColFamilyDescriptor): bool {.inline.} =
descriptor.name == DEFAULT_COLUMN_FAMILY_NAME

template defaultColFamilyDescriptor*(): ColFamilyDescriptor =
proc defaultColFamilyDescriptor*(): ColFamilyDescriptor {.inline.} =
initColFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME)

template isClosed*(descriptor: ColFamilyDescriptor): bool =
proc isClosed*(descriptor: ColFamilyDescriptor): bool {.inline.} =
descriptor.options.isClosed()

template close*(descriptor: ColFamilyDescriptor) =
proc close*(descriptor: ColFamilyDescriptor) {.inline.} =
descriptor.options.close()
4 changes: 2 additions & 2 deletions rocksdb/columnfamily/cfhandle.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newColFamilyHandle*(cPtr: ColFamilyHandlePtr): ColFamilyHandleRef =
ColFamilyHandleRef(cPtr: cPtr)

template isClosed*(handle: ColFamilyHandleRef): bool =
proc isClosed*(handle: ColFamilyHandleRef): bool {.inline.} =
handle.cPtr.isNil()

proc cPtr*(handle: ColFamilyHandleRef): ColFamilyHandlePtr =
Expand All @@ -41,7 +41,7 @@ proc cPtr*(handle: ColFamilyHandleRef): ColFamilyHandlePtr =
# var nameLen: csize_t
# $rocksdb_column_family_handle_get_name(handle.cPtr, nameLen.addr)

# template isDefault*(handle: ColFamilyHandleRef): bool =
# proc isDefault*(handle: ColFamilyHandleRef): bool {.inline.} =
# handle.getName() == DEFAULT_COLUMN_FAMILY_NAME

proc close*(handle: ColFamilyHandleRef) =
Expand Down
2 changes: 1 addition & 1 deletion rocksdb/columnfamily/cfopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newColFamilyOptions*(): ColFamilyOptionsRef =
ColFamilyOptionsRef(cPtr: rocksdb_options_create())

template isClosed*(cfOpts: ColFamilyOptionsRef): bool =
proc isClosed*(cfOpts: ColFamilyOptionsRef): bool {.inline.} =
cfOpts.cPtr.isNil()

proc cPtr*(cfOpts: ColFamilyOptionsRef): ColFamilyOptionsPtr =
Expand Down
2 changes: 1 addition & 1 deletion rocksdb/internal/cftable.nim
Expand Up @@ -31,7 +31,7 @@ proc newColFamilyTable*(

ColFamilyTableRef(columnFamilies: cfTable)

template isClosed*(table: ColFamilyTableRef): bool =
proc isClosed*(table: ColFamilyTableRef): bool {.inline.} =
table.columnFamilies.isNil()

proc get*(table: ColFamilyTableRef, name: string): ColFamilyHandleRef =
Expand Down
6 changes: 6 additions & 0 deletions rocksdb/internal/utils.nim
Expand Up @@ -10,10 +10,16 @@
{.push raises: [].}

import
std/locks,
../lib/librocksdb

const DEFAULT_COLUMN_FAMILY_NAME* = "default"

proc createLock*(): Lock =
var lock = Lock()
initLock(lock)
lock

template bailOnErrors*(errors: cstring): auto =
if not errors.isNil:
let res = err($(errors))
Expand Down
4 changes: 2 additions & 2 deletions rocksdb/options/backupopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newBackupEngineOptions*(): BackupEngineOptionsRef =
BackupEngineOptionsRef(cPtr: rocksdb_options_create())

template isClosed*(engineOpts: BackupEngineOptionsRef): bool =
proc isClosed*(engineOpts: BackupEngineOptionsRef): bool {.inline.} =
engineOpts.cPtr.isNil()

proc cPtr*(engineOpts: BackupEngineOptionsRef): BackupEngineOptionsPtr =
Expand All @@ -30,7 +30,7 @@ proc cPtr*(engineOpts: BackupEngineOptionsRef): BackupEngineOptionsPtr =

# TODO: Add setters and getters for backup options properties.

template defaultBackupEngineOptions*(): BackupEngineOptionsRef =
proc defaultBackupEngineOptions*(): BackupEngineOptionsRef {.inline.} =
newBackupEngineOptions()
# TODO: set prefered defaults

Expand Down
2 changes: 1 addition & 1 deletion rocksdb/options/dbopts.nim
Expand Up @@ -22,7 +22,7 @@ type
proc newDbOptions*(): DbOptionsRef =
DbOptionsRef(cPtr: rocksdb_options_create())

template isClosed*(dbOpts: DbOptionsRef): bool =
proc isClosed*(dbOpts: DbOptionsRef): bool {.inline.} =
dbOpts.cPtr.isNil()

proc cPtr*(dbOpts: DbOptionsRef): DbOptionsPtr =
Expand Down
4 changes: 2 additions & 2 deletions rocksdb/options/readopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newReadOptions*(): ReadOptionsRef =
ReadOptionsRef(cPtr: rocksdb_readoptions_create())

template isClosed*(readOpts: ReadOptionsRef): bool =
proc isClosed*(readOpts: ReadOptionsRef): bool {.inline.} =
readOpts.cPtr.isNil()

proc cPtr*(readOpts: ReadOptionsRef): ReadOptionsPtr =
Expand All @@ -30,7 +30,7 @@ proc cPtr*(readOpts: ReadOptionsRef): ReadOptionsPtr =

# TODO: Add setters and getters for read options properties.

template defaultReadOptions*(): ReadOptionsRef =
proc defaultReadOptions*(): ReadOptionsRef {.inline.} =
newReadOptions()
# TODO: set prefered defaults

Expand Down
4 changes: 2 additions & 2 deletions rocksdb/options/writeopts.nim
Expand Up @@ -21,7 +21,7 @@ type
proc newWriteOptions*(): WriteOptionsRef =
WriteOptionsRef(cPtr: rocksdb_writeoptions_create())

template isClosed*(writeOpts: WriteOptionsRef): bool =
proc isClosed*(writeOpts: WriteOptionsRef): bool {.inline.} =
writeOpts.cPtr.isNil()

proc cPtr*(writeOpts: WriteOptionsRef): WriteOptionsPtr =
Expand All @@ -30,7 +30,7 @@ proc cPtr*(writeOpts: WriteOptionsRef): WriteOptionsPtr =

# TODO: Add setters and getters for write options properties.

template defaultWriteOptions*(): WriteOptionsRef =
proc defaultWriteOptions*(): WriteOptionsRef {.inline.} =
newWriteOptions()
# TODO: set prefered defaults

Expand Down