Skip to content

Commit

Permalink
Merge pull request #183 from mongodb-partners/v4.2.5
Browse files Browse the repository at this point in the history
V4.2.5 ship to master branch
  • Loading branch information
wolfkdy committed Oct 24, 2021
2 parents db89b80 + 4259d1d commit 8dbe9f3
Show file tree
Hide file tree
Showing 23 changed files with 814 additions and 477 deletions.
2 changes: 1 addition & 1 deletion src/mongo_rate_limiter_checker.cpp
Expand Up @@ -32,7 +32,7 @@
#include "mongo_rate_limiter_checker.h"

#ifdef __linux__
#include "rocks_parameters.h"
#include "mongo/db/modules/rocks/src/rocks_parameters_gen.h"
#include "rocks_util.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
Expand Down
233 changes: 185 additions & 48 deletions src/rocks_compaction_scheduler.cpp

Large diffs are not rendered by default.

36 changes: 31 additions & 5 deletions src/rocks_compaction_scheduler.h
Expand Up @@ -57,12 +57,19 @@ namespace mongo {

class CompactionBackgroundJob;

struct OplogDelCompactStats {
uint64_t oplogEntriesDeleted;
uint64_t oplogSizeDeleted;
uint64_t oplogCompactSkip;
uint64_t oplogCompactKeep;
};

class RocksCompactionScheduler {
public:
RocksCompactionScheduler();
~RocksCompactionScheduler();

void start(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf);
void start(rocksdb::TOTransactionDB* db, rocksdb::ColumnFamilyHandle* cf);

static int getSkippedDeletionsThreshold() { return kSkippedDeletionsThreshold; }

Expand All @@ -71,8 +78,9 @@ namespace mongo {
// schedule compact range operation for execution in _compactionThread
void compactAll();
Status compactOplog(rocksdb::ColumnFamilyHandle* cf, const std::string& begin, const std::string& end);
Status rollbackToStable(rocksdb::ColumnFamilyHandle* cf);

rocksdb::CompactionFilterFactory* createCompactionFilterFactory() const;
rocksdb::CompactionFilterFactory* createCompactionFilterFactory();
std::unordered_map<uint32_t, BSONObj> getDroppedPrefixes() const;
boost::optional<std::pair<uint32_t, std::pair<std::string, std::string>>> getOplogDeleteUntil() const;

Expand All @@ -87,19 +95,32 @@ namespace mongo {
void notifyCompacted(const std::string& begin, const std::string& end, bool rangeDropped,
bool opSucceeded);

// calculate Oplog Delete Entries
void addOplogEntriesDeleted(const uint64_t entries);
// calculate Oplog Delete Size
void addOplogSizeDeleted(const uint64_t size);
// add up to Oplog Compact Removed Entries
void addOplogCompactRemoved();
// add up tp Oplog Compact Preserved Entries
void addOplogCompactPreserved();
// query Oplog Delete and Compact all Stats
const OplogDelCompactStats getOplogDelCompactStats() const;

private:
void compactPrefix(rocksdb::ColumnFamilyHandle* cf, const std::string& prefix);
void compactDroppedPrefix(rocksdb::ColumnFamilyHandle* cf, const std::string& prefix);
void compact(rocksdb::ColumnFamilyHandle* cf, const std::string& begin, const std::string& end,
bool rangeDropped, uint32_t order, boost::optional<std::shared_ptr<Notification<Status>>>);
void compact(rocksdb::ColumnFamilyHandle* cf, const std::string& begin,
const std::string& end, bool rangeDropped, uint32_t order,
const bool trimHistory,
boost::optional<std::shared_ptr<Notification<Status>>>);
void droppedPrefixCompacted(const std::string& prefix, bool opSucceeded);

private:
Mutex _lock = MONGO_MAKE_LATCH("RocksCompactionScheduler::_lock");
// protected by _lock
Timer _timer;

rocksdb::DB* _db; // not owned
rocksdb::TOTransactionDB* _db; // not owned

// not owned, cf where compaction_scheduler's metadata exists.
rocksdb::ColumnFamilyHandle* _metaCf;
Expand All @@ -123,5 +144,10 @@ namespace mongo {
std::atomic<uint32_t> _droppedPrefixesCount;
boost::optional<std::pair<uint32_t, std::pair<std::string, std::string>>> _oplogDeleteUntil;
static const std::string kDroppedPrefix;

std::atomic<uint64_t> _oplogEntriesDeleted;
std::atomic<uint64_t> _oplogSizeDeleted;
std::atomic<uint64_t> _oplogCompactSkip;
std::atomic<uint64_t> _oplogCompactKeep;
};
} // namespace mongo
12 changes: 9 additions & 3 deletions src/rocks_durability_manager.cpp
Expand Up @@ -34,8 +34,14 @@
#include "rocks_util.h"

namespace mongo {
RocksDurabilityManager::RocksDurabilityManager(rocksdb::DB* db, bool durable)
: _db(db), _durable(durable), _journalListener(&NoOpJournalListener::instance) {}
RocksDurabilityManager::RocksDurabilityManager(rocksdb::DB* db, bool durable,
rocksdb::ColumnFamilyHandle* defaultCf,
rocksdb::ColumnFamilyHandle* oplogCf)
: _db(db),
_durable(durable),
_defaultCf(defaultCf),
_oplogCf(oplogCf),
_journalListener(&NoOpJournalListener::instance) {}

void RocksDurabilityManager::setJournalListener(JournalListener* jl) {
stdx::unique_lock<Latch> lk(_journalListenerMutex);
Expand All @@ -58,7 +64,7 @@ namespace mongo {
stdx::unique_lock<Latch> jlk(_journalListenerMutex);
JournalListener::Token token = _journalListener->getToken();
if (!_durable || forceFlush) {
invariantRocksOK(_db->Flush(rocksdb::FlushOptions()));
invariantRocksOK(_db->Flush(rocksdb::FlushOptions(), {_defaultCf, _oplogCf}));
} else {
invariantRocksOK(_db->SyncWAL());
}
Expand Down
26 changes: 15 additions & 11 deletions src/rocks_durability_manager.h 100755 → 100644
Expand Up @@ -45,23 +45,26 @@ namespace mongo {
RocksDurabilityManager& operator=(const RocksDurabilityManager&) = delete;

public:
RocksDurabilityManager(rocksdb::DB* db, bool durable);
RocksDurabilityManager(rocksdb::DB* db, bool durable,
rocksdb::ColumnFamilyHandle* defaultCf,
rocksdb::ColumnFamilyHandle* oplogCf);

void setJournalListener(JournalListener* jl);

void waitUntilDurable(bool forceFlush);

/**
* Waits until a prepared unit of work has ended (either been commited or aborted).
* This should be used when encountering WT_PREPARE_CONFLICT errors. The caller is
* required to retry the conflicting WiredTiger API operation. A return from this
* function does not guarantee that the conflicting transaction has ended, only
* that one prepared unit of work in the process has signaled that it has ended.
* Accepts an OperationContext that will throw an AssertionException when interrupted.
* Waits until a prepared unit of work has ended (either been commited or aborted). This
* should be used when encountering ROCKS_PREPARE_CONFLICT errors. The caller is required to
* retry the conflicting WiredTiger API operation. A return from this function does not
* guarantee that the conflicting transaction has ended, only that one prepared unit of work
* in the process has signaled that it has ended. Accepts an OperationContext that will
* throw an AssertionException when interrupted.
*
* This method is provided in RocksDurabilityManager and not RecoveryUnit because all
* recovery units share the same durable manager, and we want a recovery unit on one
* thread to signal all recovery units waiting for prepare conflicts across all
* other threads.
* recovery units share the same RocksDurabilityManager, and we want a recovery unit on one
* thread to signal all recovery units waiting for prepare conflicts across all other
* threads.
*/
void waitUntilPreparedUnitOfWorkCommitsOrAborts(OperationContext* opCtx,
uint64_t lastCount);
Expand All @@ -80,7 +83,8 @@ namespace mongo {
rocksdb::DB* _db; // not owned

bool _durable;

rocksdb::ColumnFamilyHandle* _defaultCf; // not owned
rocksdb::ColumnFamilyHandle* _oplogCf; // not owned
// Notified when we commit to the journal.
JournalListener* _journalListener;

Expand Down
103 changes: 66 additions & 37 deletions src/rocks_engine.cpp 100755 → 100644
Expand Up @@ -174,13 +174,6 @@ namespace mongo {
return _data->resize(num);
}

// TODO(cuixin): consider interfaces below, mongoRocks has not implemented them yet
// WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback skip
// WiredTigerKVEngine::initRsOplogBackgroundThread skip
// getBackupInformationFromBackupCursor is used in
// WiredTigerKVEngine::beginNonBlockingBackup
// rocks db skip it

// first four bytes are the default prefix 0
const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 13);

Expand All @@ -203,7 +196,7 @@ namespace mongo {
unsigned long long memSizeMB = pi.getMemSizeMB();
if (memSizeMB > 0) {
// reserve 1GB for system and binaries, and use 30% of the rest
double cacheMB = (memSizeMB - 1024) * 0.3;
double cacheMB = (memSizeMB - 1024) * 0.5;
cacheSizeGB = static_cast<uint64_t>(cacheMB / 1024);
}
if (cacheSizeGB < 1) {
Expand Down Expand Up @@ -290,7 +283,8 @@ namespace mongo {
{_defaultCf.get(), _oplogCf.get()});
_maxPrefix = std::max(_maxPrefix, maxDroppedPrefix);

_durabilityManager.reset(new RocksDurabilityManager(_db.get(), _durable));
_durabilityManager.reset(
new RocksDurabilityManager(_db.get(), _durable, _defaultCf.get(), _oplogCf.get()));
_oplogManager.reset(new RocksOplogManager(_db.get(), this, _durabilityManager.get()));

rocksdb::RocksTimeStamp ts(0);
Expand Down Expand Up @@ -327,10 +321,10 @@ namespace mongo {
}();
if (newDB) {
// init manifest so list column families will not fail when db is empty.
invariantRocksOK(rocksdb::TOTransactionDB::Open(_options(false /* isOplog */),
rocksdb::TOTransactionDBOptions(),
_path,
&db));
invariantRocksOK(rocksdb::TOTransactionDB::Open(
_options(false /* isOplog */),
rocksdb::TOTransactionDBOptions(rocksGlobalOptions.maxConflictCheckSizeMB), _path,
&db));
invariantRocksOK(db->Close());
}

Expand All @@ -346,10 +340,10 @@ namespace mongo {
// init oplog columnfamily if not exists.
if (!hasOplog) {
rocksdb::ColumnFamilyHandle* cf = nullptr;
invariantRocksOK(rocksdb::TOTransactionDB::Open(_options(false /* isOplog */),
rocksdb::TOTransactionDBOptions(),
_path,
&db));
invariantRocksOK(rocksdb::TOTransactionDB::Open(
_options(false /* isOplog */),
rocksdb::TOTransactionDBOptions(rocksGlobalOptions.maxConflictCheckSizeMB), _path,
&db));
invariantRocksOK(db->CreateColumnFamily(_options(true /* isOplog */),
NamespaceString::kRsOplogNamespace.toString(),
&cf));
Expand All @@ -359,14 +353,12 @@ namespace mongo {
}

std::vector<rocksdb::ColumnFamilyHandle*> cfs;
s = rocksdb::TOTransactionDB::Open(_options(false /* isOplog */),
rocksdb::TOTransactionDBOptions(), _path,
{{rocksdb::kDefaultColumnFamilyName,
_options(false /* isOplog */)},
{NamespaceString::kRsOplogNamespace.toString(),
_options(true /* isOplog */)}},
&cfs,
&db);
s = rocksdb::TOTransactionDB::Open(
_options(false /* isOplog */),
rocksdb::TOTransactionDBOptions(rocksGlobalOptions.maxConflictCheckSizeMB), _path,
{{rocksdb::kDefaultColumnFamilyName, _options(false /* isOplog */)},
{NamespaceString::kRsOplogNamespace.toString(), _options(true /* isOplog */)}},
&cfs, &db);
invariantRocksOK(s);
invariant(cfs.size() == 2);
invariant(cfs[0]->GetName() == rocksdb::kDefaultColumnFamilyName);
Expand Down Expand Up @@ -401,6 +393,27 @@ namespace mongo {
bb.done();
}

std::map<int, std::vector<uint64_t>> RocksEngine::getDefaultCFNumEntries() const {
std::map<int, std::vector<uint64_t>> numEntriesMap;

std::vector<rocksdb::LiveFileMetaData> allFiles;
_db->GetRootDB()->GetLiveFilesMetaData(&allFiles);
for (const auto& f : allFiles) {
if (NamespaceString::oplog(f.column_family_name)) {
continue;
}

if (numEntriesMap.find(f.level) == numEntriesMap.end()) {
numEntriesMap[f.level] = std::vector<uint64_t>(2, 0);
}

numEntriesMap[f.level][0] += f.num_entries;
numEntriesMap[f.level][1] += f.num_deletions;
}

return numEntriesMap;
}

Status RocksEngine::okToRename(OperationContext* opCtx, StringData fromNS, StringData toNS,
StringData ident, const RecordStore* originalRecordStore) const {
_counterManager->sync();
Expand Down Expand Up @@ -767,17 +780,27 @@ namespace mongo {
table_options.format_version = 2;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));

// options.info_log = std::shared_ptr<rocksdb::Logger>(new MongoRocksLogger);
options.write_buffer_size = 64 * 1024 * 1024; // 64MB
options.level0_slowdown_writes_trigger = 8;
options.max_write_buffer_number = 4;
options.max_background_compactions = 8;
options.max_background_flushes = 2;
options.write_buffer_size = rocksGlobalOptions.writeBufferSize;
options.max_write_buffer_number = rocksGlobalOptions.maxWriteBufferNumber;
options.max_background_jobs = rocksGlobalOptions.maxBackgroundJobs;
options.max_total_wal_size = rocksGlobalOptions.maxTotalWalSize;
options.db_write_buffer_size = rocksGlobalOptions.dbWriteBufferSize;
options.num_levels = rocksGlobalOptions.numLevels;

options.delayed_write_rate = rocksGlobalOptions.delayedWriteRate;
options.level0_file_num_compaction_trigger =
rocksGlobalOptions.level0FileNumCompactionTrigger;
options.level0_slowdown_writes_trigger = rocksGlobalOptions.level0SlowdownWritesTrigger;
options.level0_stop_writes_trigger = rocksGlobalOptions.level0StopWritesTrigger;
options.soft_pending_compaction_bytes_limit =
static_cast<unsigned long long>(rocksGlobalOptions.softPendingCompactionMBLimit) * 1024 * 1024;
options.hard_pending_compaction_bytes_limit =
static_cast<unsigned long long>(rocksGlobalOptions.hardPendingCompactionMBLimit) * 1024 * 1024;
options.target_file_size_base = 64 * 1024 * 1024; // 64MB
options.soft_rate_limit = 2.5;
options.hard_rate_limit = 3;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 512 * 1024 * 1024; // 512 MB
options.max_bytes_for_level_base = rocksGlobalOptions.maxBytesForLevelBase;
// This means there is no limit on open files. Make sure to always set ulimit so that it can
// keep all RocksDB files opened.
options.max_open_files = -1;
Expand Down Expand Up @@ -838,6 +861,7 @@ namespace mongo {
namespace {

MONGO_FAIL_POINT_DEFINE(RocksPreserveSnapshotHistoryIndefinitely);
MONGO_FAIL_POINT_DEFINE(RocksSetOldestTSToStableTS);

} // namespace

Expand Down Expand Up @@ -898,6 +922,11 @@ namespace mongo {
// TODO(wolfkdy): in 4.0.3, setOldestTimestamp considers oplogReadTimestamp
// it disappears in mongo4.2, find why it happens
void RocksEngine::setOldestTimestamp(Timestamp oldestTimestamp, bool force) {
// Set the oldest timestamp to the stable timestamp to ensure that there is no lag window
// between the two.
if (MONGO_FAIL_POINT(RocksSetOldestTSToStableTS)) {
force = false;
}
if (MONGO_FAIL_POINT(RocksPreserveSnapshotHistoryIndefinitely)) {
return;
}
Expand Down Expand Up @@ -986,10 +1015,10 @@ namespace mongo {
LOG_FOR_ROLLBACK(0) << "Rolling back to the stable timestamp. StableTimestamp: "
<< stableTimestamp
<< " Initial Data Timestamp: " << initialDataTimestamp;
auto s = _db->RollbackToStable(_defaultCf.get());
if (!s.ok()) {
return {ErrorCodes::UnrecoverableRollbackError,
str::stream() << "Error rolling back to stable. Err: " << s.ToString()};
auto s = _compactionScheduler->rollbackToStable(_defaultCf.get());
if (!s.isOK()) {
return {ErrorCodes::UnrecoverableRollbackError,
str::stream() << "Error rolling back to stable. Err: " << s};
}

setInitialDataTimestamp(initialDataTimestamp);
Expand Down Expand Up @@ -1049,7 +1078,7 @@ namespace mongo {
RocksRecordStore* oplogRecordStore) {
stdx::lock_guard<Latch> lock(_oplogManagerMutex);
if (_oplogManagerCount == 0)
_oplogManager->start(opCtx, oplogRecordStore, !_keepDataHistory);
_oplogManager->start(opCtx, oplogRecordStore);
_oplogManagerCount++;
}

Expand Down
4 changes: 4 additions & 0 deletions src/rocks_engine.h
Expand Up @@ -268,6 +268,10 @@ namespace mongo {

rocksdb::Statistics* getStatistics() const { return _statistics.get(); }

std::map<int, std::vector<uint64_t>> getDefaultCFNumEntries() const;

rocksdb::ColumnFamilyHandle* getOplogCFHandle() const { return _oplogCf.get(); }
rocksdb::ColumnFamilyHandle* getDefaultCfHandle() const { return _defaultCf.get(); }
bool canRecoverToStableTimestamp() const;
rocksdb::ColumnFamilyHandle* getDefaultCf_ForTest() const { return _defaultCf.get(); }
rocksdb::ColumnFamilyHandle* getOplogCf_ForTest() const { return _oplogCf.get(); }
Expand Down
17 changes: 16 additions & 1 deletion src/rocks_global_options.h
Expand Up @@ -42,7 +42,8 @@ namespace mongo {
crashSafeCounters(false),
counters(true),
singleDeleteIndex(false),
logLevel("info") {}
logLevel("info"),
maxConflictCheckSizeMB(200) {}

Status store(const optionenvironment::Environment& params);
static Status validateRocksdbLogLevel(const std::string& value);
Expand All @@ -58,6 +59,20 @@ namespace mongo {
bool singleDeleteIndex;

std::string logLevel;
int maxConflictCheckSizeMB;
int maxBackgroundJobs;
long maxTotalWalSize;
long dbWriteBufferSize;
long writeBufferSize;
long delayedWriteRate;
int numLevels;
int maxWriteBufferNumber;
int level0FileNumCompactionTrigger;
int level0SlowdownWritesTrigger;
int level0StopWritesTrigger;
long maxBytesForLevelBase;
int softPendingCompactionMBLimit;
int hardPendingCompactionMBLimit;
};

extern RocksGlobalOptions rocksGlobalOptions;
Expand Down

0 comments on commit 8dbe9f3

Please sign in to comment.