Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Apr 25, 2024
1 parent 6807da0 commit 059fec5
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 33 deletions.
4 changes: 4 additions & 0 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
track_and_verify_wals_in_manifest_;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
options.avoid_flush_during_recovery = false;
// To avoid WAL sync during shutdown for reproducing the bug
options.avoid_flush_during_shutdown = true;
options.env = env_.get();
ASSERT_OK(DestroyDB(dbname_, options));
options.create_if_missing = true;
Expand Down Expand Up @@ -1343,6 +1345,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
options.track_and_verify_wals_in_manifest =
track_and_verify_wals_in_manifest_;
options.avoid_flush_during_recovery = false;
// To avoid WAL sync during shutdown for reproducing the bug
options.avoid_flush_during_shutdown = true;
options.env = env_.get();
ASSERT_OK(DestroyDB(dbname_, options));
options.create_if_missing = true;
Expand Down
15 changes: 14 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,19 @@ Status DBImpl::CloseHelper() {
job_context.Clean();
mutex_.Lock();
}
if (!mutable_db_options_.avoid_flush_during_shutdown && !logs_.empty()) {
mutex_.Unlock();
Status s = SyncWAL();
mutex_.Lock();
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to sync WALs with error -- %s",
s.ToString().c_str());
if (ret.ok()) {
ret = s;
}
}
}
{
InstrumentedMutexLock lock(&log_write_mutex_);
for (auto l : logs_to_free_) {
Expand All @@ -637,7 +650,7 @@ Status DBImpl::CloseHelper() {
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
"Unable to clear writer for WAL %s with error -- %s",
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
// Retain the first error
Expand Down
2 changes: 2 additions & 0 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
options.env = fault_env.get();
options.manual_wal_flush = true;
options.track_and_verify_wals_in_manifest = true;
// To avoid WAL sync during shutdown for reproducing the bug
options.avoid_flush_during_shutdown = true;
Reopen(options);

ASSERT_OK(Put("key1", "val1"));
Expand Down
29 changes: 29 additions & 0 deletions db/fault_injection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// file data (or entire files) not protected by a "sync".

#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "db/log_format.h"
#include "db/version_set.h"
#include "env/mock_env.h"
Expand Down Expand Up @@ -629,6 +630,34 @@ INSTANTIATE_TEST_CASE_P(
std::make_tuple(false, kSyncWal, kEnd),
std::make_tuple(true, kSyncWal, kEnd)));

class FaultInjectionFSTest : public DBTestBase {
public:
FaultInjectionFSTest()
: DBTestBase("fault_injection_fs_test", /*env_do_fsync=*/false) {}
};

TEST_F(FaultInjectionFSTest, SyncWALDuringDBClose) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(env_->GetFileSystem()));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs));
Options options = CurrentOptions();
options.avoid_flush_during_shutdown = true;
options.env = env.get();
Reopen(options);
ASSERT_OK(Put("k1", "v1"));
Close();
Reopen(options);
ASSERT_EQ("NOT_FOUND", Get("k1"));
Destroy(options);

options.avoid_flush_during_shutdown = false;
Reopen(options);
ASSERT_OK(Put("k1", "v1"));
Close();
Reopen(options);
ASSERT_EQ("v1", Get("k1"));
Destroy(options);
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
21 changes: 9 additions & 12 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
uint32_t crc32c_checksum) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

StopWatch sw(clock_, stats_, hist_type_,
Expand Down Expand Up @@ -199,7 +199,7 @@ IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
IOStatus WritableFileWriter::Pad(const IOOptions& opts,
const size_t pad_bytes) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}
const IOOptions io_options = FinalizeIOOptions(opts);
assert(pad_bytes < kDefaultPageSize);
Expand Down Expand Up @@ -348,7 +348,7 @@ IOStatus WritableFileWriter::Close(const IOOptions& opts) {
// enabled
IOStatus WritableFileWriter::Flush(const IOOptions& opts) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

const IOOptions io_options = FinalizeIOOptions(opts);
Expand Down Expand Up @@ -458,7 +458,7 @@ IOStatus WritableFileWriter::PrepareIOOptions(const WriteOptions& wo,

IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOOptions io_options = FinalizeIOOptions(opts);
Expand All @@ -483,7 +483,7 @@ IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
bool use_fsync) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}
IOOptions io_options = FinalizeIOOptions(opts);
if (!writable_file_->IsSyncThreadSafe()) {
Expand All @@ -495,9 +495,6 @@ IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
IOStatus s = SyncInternal(io_options, use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
if (!s.ok()) {
#ifndef NDEBUG
sync_without_flush_called_ = true;
#endif // NDEBUG
set_seen_error();
}
return s;
Expand Down Expand Up @@ -543,7 +540,7 @@ IOStatus WritableFileWriter::SyncInternal(const IOOptions& opts,
IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
uint64_t nbytes) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOSTATS_TIMER_GUARD(range_sync_nanos);
Expand Down Expand Up @@ -572,7 +569,7 @@ IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
IOStatus WritableFileWriter::WriteBuffered(const IOOptions& opts,
const char* data, size_t size) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOStatus s;
Expand Down Expand Up @@ -663,7 +660,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const IOOptions& opts,
const char* data,
size_t size) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOStatus s;
Expand Down Expand Up @@ -877,7 +874,7 @@ IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) {

IOStatus WritableFileWriter::WriteDirectWithChecksum(const IOOptions& opts) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

assert(use_direct_io());
Expand Down
11 changes: 1 addition & 10 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,6 @@ class WritableFileWriter {
uint64_t next_write_offset_;
bool pending_sync_;
std::atomic<bool> seen_error_;
#ifndef NDEBUG
// SyncWithoutFlush() is the function that is allowed to be called
// concurrently with other function. One of the concurrent call
// could set seen_error_, and the other one would hit assertion
// in debug mode.
std::atomic<bool> sync_without_flush_called_ = false;
#endif // NDEBUG
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
Expand Down Expand Up @@ -304,9 +297,7 @@ class WritableFileWriter {
}
void set_seen_error() { seen_error_.store(true, std::memory_order_relaxed); }

IOStatus AssertFalseAndGetStatusForPrevError() {
// This should only happen if SyncWithoutFlush() was called.
assert(sync_without_flush_called_);
IOStatus GetWriterHasPreviousErrorStatus() {
return IOStatus::IOError("Writer has previous error.");
}

Expand Down
9 changes: 6 additions & 3 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,19 @@ class DB {

// Close the DB by releasing resources, closing files etc. This should be
// called before calling the destructor so that the caller can get back a
// status in case there are any errors. This will not fsync the WAL files.
// If syncing is required, the caller must first call SyncWAL(), or Write()
// status in case there are any errors.
// If `Options.avoid_flush_during_shutdown = true`, then this will not
// fsync the WAL files. In this case, if syncing is required,
// the caller must first call SyncWAL(), or Write()
// using an empty write batch with WriteOptions.sync=true.
// Regardless of the return status, the DB must be freed.
//
// If the return status is Aborted(), closing fails because there is
// unreleased snapshot in the system. In this case, users can release
// the unreleased snapshots and try again and expect it to succeed. For
// other status, re-calling Close() will be no-op and return the original
// close status. If the return status is NotSupported(), then the DB
// close status. If the return status is NotSupported() due to Close()
// is not implemented, then the DB
// implementation does cleanup in the destructor
//
// WaitForCompact() with WaitForCompactOptions.close_db=true will be a good
Expand Down
6 changes: 4 additions & 2 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1256,8 +1256,10 @@ struct DBOptions {
bool avoid_flush_during_recovery = false;

// By default RocksDB will flush all memtables on DB close if there are
// unpersisted data (i.e. with WAL disabled) The flush can be skip to speedup
// DB close. Unpersisted data WILL BE LOST.
// unpersisted data (i.e. with WAL disabled) or when WAL is enabled, sync all
// WALs. If true, These flush and sync will be skipped to speed up DB close.
// But unpersisted data will be lost and unsynced WALs can be lost upon a
// system crash after DB close.
//
// DEFAULT: false
//
Expand Down
5 changes: 0 additions & 5 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,6 @@ def finalize_and_sanitize(src_params):
# files, which would be problematic when unsynced data can be lost in
# crash recoveries.
dest_params["enable_compaction_filter"] = 0
# TODO(hx235): re-enable "reopen" after supporting unsynced data loss
# verification upon reopen. Currently reopen does not restore expected state
# with potential data loss in mind like start of each `./db_stress` run.
# Therefore it always expects no data loss.
dest_params["reopen"] = 0
# Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
if dest_params.get("unordered_write", 0) == 1:
dest_params["txn_write_policy"] = 1
Expand Down
16 changes: 16 additions & 0 deletions tools/ldb_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ TEST_F(LdbCmdTest, DumpFileChecksumNoChecksum) {
Options opts;
opts.env = env.get();
opts.create_if_missing = true;
// To avoid WAL sync during shutdown as writable file in mock env is
// not sync thread safe so we can't sync WAL and DB Close()
// will return non-OK
opts.avoid_flush_during_shutdown = true;

DB* db = nullptr;
std::string dbname = test::PerThreadDBPath(env.get(), "ldb_cmd_test");
Expand Down Expand Up @@ -373,6 +377,10 @@ TEST_F(LdbCmdTest, BlobDBDumpFileChecksumNoChecksum) {
opts.env = env.get();
opts.create_if_missing = true;
opts.enable_blob_files = true;
// To avoid WAL sync during shutdown as writable file in mock env is
// not sync thread safe so we can't sync WAL and DB Close()
// will return non-OK
opts.avoid_flush_during_shutdown = true;

DB* db = nullptr;
std::string dbname = test::PerThreadDBPath(env.get(), "ldb_cmd_test");
Expand Down Expand Up @@ -458,6 +466,10 @@ TEST_F(LdbCmdTest, DumpFileChecksumCRC32) {
opts.env = env.get();
opts.create_if_missing = true;
opts.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
// To avoid WAL sync during shutdown as writable file in mock env is
// not sync thread safe so we can't sync WAL and DB Close()
// will return non-OK
opts.avoid_flush_during_shutdown = true;

DB* db = nullptr;
std::string dbname = test::PerThreadDBPath(env.get(), "ldb_cmd_test");
Expand Down Expand Up @@ -552,6 +564,10 @@ TEST_F(LdbCmdTest, BlobDBDumpFileChecksumCRC32) {
opts.create_if_missing = true;
opts.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
opts.enable_blob_files = true;
// To avoid WAL sync during shutdown as writable file in mock env is
// not sync thread safe so we can't sync WAL and DB Close()
// will return non-OK
opts.avoid_flush_during_shutdown = true;

DB* db = nullptr;
std::string dbname = test::PerThreadDBPath(env.get(), "ldb_cmd_test");
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/behavior_changes/wal_sync_on_close.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`avoid_flush_during_shutdown=false` (the default behavior) will sync WALs during DB close
2 changes: 2 additions & 0 deletions utilities/checkpoint/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,8 @@ TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
options.track_and_verify_wals_in_manifest = true;
// To avoid WAL sync during shutdown for reproducing the bug
options.avoid_flush_during_shutdown = true;
Reopen(options);

ASSERT_OK(Put("key1", "val1"));
Expand Down

0 comments on commit 059fec5

Please sign in to comment.