Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed May 13, 2024
1 parent c110091 commit 93c9102
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 23 deletions.
15 changes: 14 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,19 @@ Status DBImpl::CloseHelper() {
job_context.Clean();
mutex_.Lock();
}
if (!mutable_db_options_.avoid_sync_during_shutdown && !logs_.empty()) {
mutex_.Unlock();
Status s = SyncWAL();
mutex_.Lock();
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to sync WALs with error -- %s",
s.ToString().c_str());
if (ret.ok()) {
ret = s;
}
}
}
{
InstrumentedMutexLock lock(&log_write_mutex_);
for (auto l : logs_to_free_) {
Expand All @@ -637,7 +650,7 @@ Status DBImpl::CloseHelper() {
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
"Unable to clear writer for WAL %s with error -- %s",
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
// Retain the first error
Expand Down
29 changes: 29 additions & 0 deletions db/fault_injection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// file data (or entire files) not protected by a "sync".

#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "db/log_format.h"
#include "db/version_set.h"
#include "env/mock_env.h"
Expand Down Expand Up @@ -629,6 +630,34 @@ INSTANTIATE_TEST_CASE_P(
std::make_tuple(false, kSyncWal, kEnd),
std::make_tuple(true, kSyncWal, kEnd)));

class FaultInjectionFSTest : public DBTestBase {
public:
FaultInjectionFSTest()
: DBTestBase("fault_injection_fs_test", /*env_do_fsync=*/false) {}
};

TEST_F(FaultInjectionFSTest, SyncWALDuringDBClose) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(env_->GetFileSystem()));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs));
Options options = CurrentOptions();
options.avoid_sync_during_shutdown = true;
options.env = env.get();
Reopen(options);
ASSERT_OK(Put("k1", "v1"));
Close();
Reopen(options);
ASSERT_EQ("NOT_FOUND", Get("k1"));
Destroy(options);

options.avoid_sync_during_shutdown = false;
Reopen(options);
ASSERT_OK(Put("k1", "v1"));
Close();
Reopen(options);
ASSERT_EQ("v1", Get("k1"));
Destroy(options);
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ DECLARE_uint64(WAL_ttl_seconds);
DECLARE_uint64(WAL_size_limit_MB);
DECLARE_bool(strict_bytes_per_sync);
DECLARE_bool(avoid_flush_during_shutdown);
DECLARE_bool(avoid_sync_during_shutdown);
DECLARE_bool(fill_cache);
DECLARE_bool(optimize_multiget_for_io);
DECLARE_bool(memtable_insert_hint_per_batch);
Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,10 @@ DEFINE_bool(avoid_flush_during_recovery,
ROCKSDB_NAMESPACE::Options().avoid_flush_during_recovery,
"Avoid flush during recovery");

DEFINE_bool(avoid_sync_during_shutdown,
ROCKSDB_NAMESPACE::Options().avoid_sync_during_shutdown,
"Options.avoid_sync_during_shutdown");

DEFINE_uint64(max_write_batch_group_size_bytes,
ROCKSDB_NAMESPACE::Options().max_write_batch_group_size_bytes,
"Max write batch group size");
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3785,6 +3785,7 @@ void InitializeOptionsFromFlags(
options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
options.strict_bytes_per_sync = FLAGS_strict_bytes_per_sync;
options.avoid_flush_during_shutdown = FLAGS_avoid_flush_during_shutdown;
options.avoid_sync_during_shutdown = FLAGS_avoid_sync_during_shutdown;
options.dump_malloc_stats = FLAGS_dump_malloc_stats;
options.stats_history_buffer_size = FLAGS_stats_history_buffer_size;
options.skip_stats_update_on_db_open = FLAGS_skip_stats_update_on_db_open;
Expand Down
21 changes: 9 additions & 12 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
uint32_t crc32c_checksum) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

StopWatch sw(clock_, stats_, hist_type_,
Expand Down Expand Up @@ -199,7 +199,7 @@ IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
IOStatus WritableFileWriter::Pad(const IOOptions& opts,
const size_t pad_bytes) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}
const IOOptions io_options = FinalizeIOOptions(opts);
assert(pad_bytes < kDefaultPageSize);
Expand Down Expand Up @@ -348,7 +348,7 @@ IOStatus WritableFileWriter::Close(const IOOptions& opts) {
// enabled
IOStatus WritableFileWriter::Flush(const IOOptions& opts) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

const IOOptions io_options = FinalizeIOOptions(opts);
Expand Down Expand Up @@ -458,7 +458,7 @@ IOStatus WritableFileWriter::PrepareIOOptions(const WriteOptions& wo,

IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOOptions io_options = FinalizeIOOptions(opts);
Expand All @@ -483,7 +483,7 @@ IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
bool use_fsync) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}
IOOptions io_options = FinalizeIOOptions(opts);
if (!writable_file_->IsSyncThreadSafe()) {
Expand All @@ -495,9 +495,6 @@ IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
IOStatus s = SyncInternal(io_options, use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
if (!s.ok()) {
#ifndef NDEBUG
sync_without_flush_called_ = true;
#endif // NDEBUG
set_seen_error();
}
return s;
Expand Down Expand Up @@ -543,7 +540,7 @@ IOStatus WritableFileWriter::SyncInternal(const IOOptions& opts,
IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
uint64_t nbytes) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOSTATS_TIMER_GUARD(range_sync_nanos);
Expand Down Expand Up @@ -572,7 +569,7 @@ IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
IOStatus WritableFileWriter::WriteBuffered(const IOOptions& opts,
const char* data, size_t size) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOStatus s;
Expand Down Expand Up @@ -663,7 +660,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const IOOptions& opts,
const char* data,
size_t size) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

IOStatus s;
Expand Down Expand Up @@ -877,7 +874,7 @@ IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) {

IOStatus WritableFileWriter::WriteDirectWithChecksum(const IOOptions& opts) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
return GetWriterHasPreviousErrorStatus();
}

assert(use_direct_io());
Expand Down
11 changes: 1 addition & 10 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,6 @@ class WritableFileWriter {
uint64_t next_write_offset_;
bool pending_sync_;
std::atomic<bool> seen_error_;
#ifndef NDEBUG
// SyncWithoutFlush() is the function that is allowed to be called
// concurrently with other function. One of the concurrent call
// could set seen_error_, and the other one would hit assertion
// in debug mode.
std::atomic<bool> sync_without_flush_called_ = false;
#endif // NDEBUG
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
Expand Down Expand Up @@ -304,9 +297,7 @@ class WritableFileWriter {
}
void set_seen_error() { seen_error_.store(true, std::memory_order_relaxed); }

IOStatus AssertFalseAndGetStatusForPrevError() {
// This should only happen if SyncWithoutFlush() was called.
assert(sync_without_flush_called_);
IOStatus GetWriterHasPreviousErrorStatus() {
return IOStatus::IOError("Writer has previous error.");
}

Expand Down
9 changes: 9 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,15 @@ struct DBOptions {
// Dynamically changeable through SetDBOptions() API.
bool avoid_flush_during_shutdown = false;

// By default RocksDB will not sync WAL on DB close even if there are
// unpersisted data (i.e. unsynced WAL data). This can speedup
// DB close. Unpersisted data WILL BE LOST.
//
// DEFAULT: true
//
// Dynamically changeable through SetDBOptions() API.
bool avoid_sync_during_shutdown = true;

// Set this option to true during creation of database if you want
// to be able to ingest behind (call IngestExternalFile() skipping keys
// that already exist, rather than overwriting matching keys).
Expand Down
8 changes: 8 additions & 0 deletions options/db_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct MutableDBOptions, avoid_flush_during_shutdown),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"avoid_sync_during_shutdown",
{offsetof(struct MutableDBOptions, avoid_sync_during_shutdown),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"writable_file_max_buffer_size",
{offsetof(struct MutableDBOptions, writable_file_max_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal,
Expand Down Expand Up @@ -990,6 +994,7 @@ MutableDBOptions::MutableDBOptions()
max_background_compactions(-1),
max_subcompactions(0),
avoid_flush_during_shutdown(false),
avoid_sync_during_shutdown(true),
writable_file_max_buffer_size(1024 * 1024),
delayed_write_rate(2 * 1024U * 1024U),
max_total_wal_size(0),
Expand All @@ -1009,6 +1014,7 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options)
max_background_compactions(options.max_background_compactions),
max_subcompactions(options.max_subcompactions),
avoid_flush_during_shutdown(options.avoid_flush_during_shutdown),
avoid_sync_during_shutdown(options.avoid_sync_during_shutdown),
writable_file_max_buffer_size(options.writable_file_max_buffer_size),
delayed_write_rate(options.delayed_write_rate),
max_total_wal_size(options.max_total_wal_size),
Expand All @@ -1034,6 +1040,8 @@ void MutableDBOptions::Dump(Logger* log) const {
max_subcompactions);
ROCKS_LOG_HEADER(log, " Options.avoid_flush_during_shutdown: %d",
avoid_flush_during_shutdown);
ROCKS_LOG_HEADER(log, " Options.avoid_sync_during_shutdown: %d",
avoid_sync_during_shutdown);
ROCKS_LOG_HEADER(
log, " Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt,
writable_file_max_buffer_size);
Expand Down
1 change: 1 addition & 0 deletions options/db_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct MutableDBOptions {
int max_background_compactions;
uint32_t max_subcompactions;
bool avoid_flush_during_shutdown;
bool avoid_sync_during_shutdown;
size_t writable_file_max_buffer_size;
uint64_t delayed_write_rate;
uint64_t max_total_wal_size;
Expand Down
1 change: 1 addition & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"allow_2pc=false;"
"avoid_flush_during_recovery=false;"
"avoid_flush_during_shutdown=false;"
"avoid_sync_during_shutdown=false;"
"allow_ingest_behind=false;"
"concurrent_prepare=false;"
"two_write_queues=false;"
Expand Down
1 change: 1 addition & 0 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@
"WAL_size_limit_MB": lambda: random.choice([0, 1]),
"strict_bytes_per_sync": lambda: random.choice([0, 1]),
"avoid_flush_during_shutdown": lambda: random.choice([0, 1]),
"avoid_sync_during_shutdown": lambda: random.choice([0, 1]),
"fill_cache": lambda: random.choice([0, 1]),
"optimize_multiget_for_io": lambda: random.choice([0, 1]),
"memtable_insert_hint_per_batch": lambda: random.choice([0, 1]),
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/behavior_changes/wal_sync_on_close.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce a new DB option `avoid_sync_during_shutdown`. If set true, it will sync WALs during DB close

0 comments on commit 93c9102

Please sign in to comment.