diff --git a/src/mongo_rate_limiter_checker.cpp b/src/mongo_rate_limiter_checker.cpp index 170e0f6..2f46898 100644 --- a/src/mongo_rate_limiter_checker.cpp +++ b/src/mongo_rate_limiter_checker.cpp @@ -32,7 +32,7 @@ #include "mongo_rate_limiter_checker.h" #ifdef __linux__ -#include "rocks_parameters.h" +#include "mongo/db/modules/rocks/src/rocks_parameters_gen.h" #include "rocks_util.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" diff --git a/src/rocks_compaction_scheduler.cpp b/src/rocks_compaction_scheduler.cpp index cd9779b..1218d7d 100644 --- a/src/rocks_compaction_scheduler.cpp +++ b/src/rocks_compaction_scheduler.cpp @@ -71,15 +71,22 @@ namespace mongo { class OplogCompactionFilter: public rocksdb::CompactionFilter { public: - explicit OplogCompactionFilter(const std::string& from, const std::string& until) - : _from(from), _until(until) {} + explicit OplogCompactionFilter(const std::string& from, const std::string& until, + RocksCompactionScheduler* compactionScheduler) + : _from(from), _until(until), _compactionScheduler(compactionScheduler) {} // filter is not called from multiple threads simultaneously virtual bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& existing_value, std::string* new_value, bool* value_changed) const { - return key.compare(rocksdb::Slice(_until)) <= 0 && - key.compare(rocksdb::Slice(_from)) >= 0; + bool filter = (key.compare(rocksdb::Slice(_until)) <= 0 && + key.compare(rocksdb::Slice(_from)) >= 0); + if (filter) { + _compactionScheduler->addOplogCompactRemoved(); + } else { + _compactionScheduler->addOplogCompactPreserved(); + } + return filter; } // IgnoreSnapshots is available since RocksDB 4.3 @@ -92,6 +99,7 @@ namespace mongo { private: const std::string _from; const std::string _until; + RocksCompactionScheduler* _compactionScheduler; }; class PrefixDeletingCompactionFilter : public rocksdb::CompactionFilter { @@ -135,8 +143,7 @@ namespace mongo { class PrefixDeletingCompactionFilterFactory : public rocksdb::CompactionFilterFactory { public: - explicit PrefixDeletingCompactionFilterFactory( - const RocksCompactionScheduler* scheduler) + explicit PrefixDeletingCompactionFilterFactory(RocksCompactionScheduler* scheduler) : _compactionScheduler(scheduler) {} virtual std::unique_ptr CreateCompactionFilter( @@ -146,8 +153,9 @@ namespace mongo { if (!context.is_manual_compaction) { return std::unique_ptr(nullptr); } - return std::unique_ptr( - new OplogCompactionFilter(oplogDelUntil->second.first, oplogDelUntil->second.second)); + return std::unique_ptr(new OplogCompactionFilter( + oplogDelUntil->second.first, oplogDelUntil->second.second, + _compactionScheduler)); } auto droppedPrefixes = _compactionScheduler->getDroppedPrefixes(); if (droppedPrefixes.size() == 0) { @@ -168,18 +176,20 @@ namespace mongo { } private: - const RocksCompactionScheduler* _compactionScheduler; + RocksCompactionScheduler* _compactionScheduler; }; } // namespace class CompactionBackgroundJob : public BackgroundJob { public: - CompactionBackgroundJob(rocksdb::DB* db, RocksCompactionScheduler* compactionScheduler); + CompactionBackgroundJob(rocksdb::TOTransactionDB* db, + RocksCompactionScheduler* compactionScheduler); virtual ~CompactionBackgroundJob(); // schedule compact range operation for execution in _compactionThread void scheduleCompactOp(rocksdb::ColumnFamilyHandle* cf, const std::string& begin, const std::string& end, bool rangeDropped, uint32_t order, + const bool trimHistory, boost::optional>>); private: @@ -190,6 +200,7 @@ namespace mongo { std::string _end_str; bool _rangeDropped; uint32_t _order; + bool _trimHistory; boost::optional>> _notification; bool operator>(const CompactOp& other) const { return _order > other._order; } }; @@ -202,7 +213,7 @@ namespace mongo { void compact(const CompactOp& op); - rocksdb::DB* _db; // not owned + rocksdb::TOTransactionDB* _db; // not owned RocksCompactionScheduler* _compactionScheduler; // not owned bool _compactionThreadRunning = true; @@ -215,7 +226,7 @@ namespace mongo { const char* const CompactionBackgroundJob::_name = "RocksCompactionThread"; - CompactionBackgroundJob::CompactionBackgroundJob(rocksdb::DB* db, + CompactionBackgroundJob::CompactionBackgroundJob(rocksdb::TOTransactionDB* db, RocksCompactionScheduler* compactionScheduler) : _db(db), _compactionScheduler(compactionScheduler) { go(); @@ -288,13 +299,13 @@ namespace mongo { LOG(1) << "Compaction thread terminating" << std::endl; } - void CompactionBackgroundJob::scheduleCompactOp(rocksdb::ColumnFamilyHandle* cf, - const std::string& begin, const std::string& end, - bool rangeDropped, uint32_t order, - boost::optional>> notification) { + void CompactionBackgroundJob::scheduleCompactOp( + rocksdb::ColumnFamilyHandle* cf, const std::string& begin, const std::string& end, + bool rangeDropped, uint32_t order, bool trimHistory, + boost::optional>> notification) { { stdx::lock_guard lk(_compactionMutex); - _compactionQueue.push({cf, begin, end, rangeDropped, order, notification}); + _compactionQueue.push({cf, begin, end, rangeDropped, order, trimHistory, notification}); } _compactionWakeUp.notify_one(); } @@ -302,33 +313,116 @@ namespace mongo { void CompactionBackgroundJob::compact(const CompactOp& op) { rocksdb::Slice start_slice(op._start_str); rocksdb::Slice end_slice(op._end_str); + rocksdb::Status s = rocksdb::Status::OK(); rocksdb::Slice* start = !op._start_str.empty() ? &start_slice : nullptr; rocksdb::Slice* end = !op._end_str.empty() ? &end_slice : nullptr; const bool isOplog = NamespaceString::oplog(op._cf->GetName()); LOG(1) << "Starting compaction of cf: " << op._cf->GetName() - << " range: " << (start ? start->ToString(true) : "") - << " .. " << (end ? end->ToString(true) : "") << " (rangeDropped is " - << op._rangeDropped << ")" << " (isOplog is " << isOplog << ")"; + << " range: " << (start ? start->ToString(true) : "") << " .. " + << (end ? end->ToString(true) : "") << " (rangeDropped is " << op._rangeDropped + << ")" + << " (isOplog is " << isOplog << ")" + << " (trimHistory is " << op._trimHistory << ")"; + + if (op._trimHistory) { + invariant(!start && !end && !op._rangeDropped && !isOplog); + s = _db->RollbackToStable(op._cf); + } else { + if (op._rangeDropped || isOplog) { + std::vector beforeDelFiles; + std::vector afterDelFiles; + std::vector diffFiles; + auto queryDelFilesInRange = [&]() -> std::vector { + if (op._start_str.empty() && op._end_str.empty()) { + return {}; + } + std::vector toDelFiles; + std::vector allFiles; + _db->GetRootDB()->GetLiveFilesMetaData(&allFiles); + for (const auto& f : allFiles) { + if (!NamespaceString::oplog(f.column_family_name)) { + continue; + } + // [start, end] + if (!op._start_str.empty() && !op._end_str.empty()) { + if ((op._start_str <= f.smallestkey) && (f.largestkey <= op._end_str)) { + toDelFiles.push_back(f); + } + } else if (op._start_str.empty() && (f.largestkey <= op._end_str)) { + // [start, max() + toDelFiles.push_back(f); + } else if (op._end_str.empty() && (op._start_str <= f.smallestkey)) { + // min(), end] + toDelFiles.push_back(f); + } else { + // skip + } + } + return toDelFiles; + }; + if (isOplog) { + LOG(1) << "Before DeleteFilesInRange Stats: " << op._cf->GetName(); + beforeDelFiles = queryDelFilesInRange(); + } - if (op._rangeDropped) { - auto s = rocksdb::DeleteFilesInRange(_db, op._cf, start, end); - if (!s.ok()) { - // Do not fail the entire procedure, since there is still chance - // to purge the range below, in CompactRange - log() << "Failed to delete files in compacted range: " << s.ToString(); + auto s1 = rocksdb::DeleteFilesInRange(_db, op._cf, start, end); + if (!s1.ok()) { + // Do not fail the entire procedure, since there is still chance + // to purge the range below, in CompactRange + log() << "Failed to delete files in compacted range: " << s1.ToString(); + } + + if (isOplog) { + LOG(1) << "After DeleteFilesInRange Stats: " << op._cf->GetName(); + afterDelFiles = queryDelFilesInRange(); + [&]() { + for (const auto& f : beforeDelFiles) { + invariant(NamespaceString::oplog(f.column_family_name)); + + auto vit = std::find_if(afterDelFiles.begin(), afterDelFiles.end(), + [&](const rocksdb::LiveFileMetaData& a) { + return a.name == f.name; + }); + + // not found + if (vit == afterDelFiles.end()) { + diffFiles.push_back(f); + } + } + }(); + auto oplogFilesStats = [&]() { + uint64_t oplogEntries = 0; + uint64_t oplogSizesum = 0; + + for (const auto& f : diffFiles) { + invariant(NamespaceString::oplog(f.column_family_name)); + + oplogEntries += f.num_entries; + oplogSizesum += f.size; + } + return std::make_pair(oplogEntries, oplogSizesum); + }(); + _compactionScheduler->addOplogEntriesDeleted(oplogFilesStats.first); + _compactionScheduler->addOplogSizeDeleted(oplogFilesStats.second); + } } - } - rocksdb::CompactRangeOptions compact_options; - compact_options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce; - // if auto-compaction runs parallelly, oplog compact-range may leave a hole. - compact_options.exclusive_manual_compaction = isOplog; - compact_options.ignore_pin_timestamp = isOplog; + rocksdb::CompactRangeOptions compact_options; + compact_options.bottommost_level_compaction = + rocksdb::BottommostLevelCompaction::kForce; + // if auto-compaction runs parallelly, oplog compact-range may leave a hole. + compact_options.exclusive_manual_compaction = isOplog; + compact_options.ignore_pin_timestamp = isOplog; - auto s = _db->CompactRange(compact_options, op._cf, start, end); + s = _db->CompactRange(compact_options, op._cf, start, end); + } if (!s.ok()) { - log() << "Failed to compact range: " << s.ToString(); + if (op._trimHistory) { + log() << "Failed to RollbackToStable: " << s.ToString(); + } else { + log() << "Failed to compact range: " << s.ToString(); + } if (op._notification != boost::none) { (*op._notification)->set(rocksToMongoStatus(s)); } @@ -352,7 +446,8 @@ namespace mongo { RocksCompactionScheduler::RocksCompactionScheduler() : _db(nullptr), _metaCf(nullptr), _droppedPrefixesCount(0) {} - void RocksCompactionScheduler::start(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf) { + void RocksCompactionScheduler::start(rocksdb::TOTransactionDB* db, + rocksdb::ColumnFamilyHandle* cf) { _db = db; _metaCf = cf; _timer.reset(); @@ -387,7 +482,9 @@ namespace mongo { void RocksCompactionScheduler::compactAll() { // NOTE(wolfkdy): compactAll only compacts DefaultColumnFamily // oplog cf is handled in RocksRecordStore. - compact(_db->DefaultColumnFamily(), std::string(), std::string(), false, kOrderFull, boost::none); + const bool trimHistory = false; + compact(_db->DefaultColumnFamily(), std::string(), std::string(), false, kOrderFull, + trimHistory, boost::none); } Status RocksCompactionScheduler::compactOplog(rocksdb::ColumnFamilyHandle* cf, @@ -402,7 +499,8 @@ namespace mongo { _oplogDeleteUntil = std::make_pair(cf->GetID(), std::make_pair(begin, end)); } auto notification = std::make_shared>(); - compact(cf, begin, end, false, kOrderOplog, notification); + const bool trimHistory = false; + compact(cf, begin, end, false, kOrderOplog, trimHistory, notification); auto s = notification->get(); if (!s.isOK()) { LOG(0) << "compactOplog to " << rocksdb::Slice(end).ToString() << " failed " << s; @@ -410,26 +508,41 @@ namespace mongo { return s; } + Status RocksCompactionScheduler::rollbackToStable(rocksdb::ColumnFamilyHandle* cf) { + auto notification = std::make_shared>(); + const bool trimHistory = true; + compact(cf, std::string(), std::string(), false, kOrderOplog, trimHistory, notification); + auto s = notification->get(); + if (!s.isOK()) { + LOG(0) << "rollbackToStable failed: " << s; + } + return s; + } + void RocksCompactionScheduler::compactPrefix(rocksdb::ColumnFamilyHandle* cf, const std::string& prefix) { - compact(cf, prefix, rocksGetNextPrefix(prefix), false, kOrderRange, boost::none); + bool trimHistory = false; + compact(cf, prefix, rocksGetNextPrefix(prefix), false, kOrderRange, trimHistory, + boost::none); } void RocksCompactionScheduler::compactDroppedPrefix(rocksdb::ColumnFamilyHandle* cf, const std::string& prefix) { LOG(0) << "Compacting dropped prefix: " << rocksdb::Slice(prefix).ToString(true) << " from cf: " << cf->GetName(); - compact(cf, prefix, rocksGetNextPrefix(prefix), true, kOrderDroppedRange, boost::none); + bool trimHistory = false; + compact(cf, prefix, rocksGetNextPrefix(prefix), true, kOrderDroppedRange, trimHistory, + boost::none); } - void RocksCompactionScheduler::compact(rocksdb::ColumnFamilyHandle* cf, - const std::string& begin, const std::string& end, - bool rangeDropped, uint32_t order, - boost::optional>> notification) { - _compactionJob->scheduleCompactOp(cf, begin, end, rangeDropped, order, notification); + void RocksCompactionScheduler::compact( + rocksdb::ColumnFamilyHandle* cf, const std::string& begin, const std::string& end, + bool rangeDropped, uint32_t order, const bool trimHistory, + boost::optional>> notification) { + _compactionJob->scheduleCompactOp(cf, begin, end, rangeDropped, order, trimHistory, + notification); } - rocksdb::CompactionFilterFactory* RocksCompactionScheduler::createCompactionFilterFactory() - const { + rocksdb::CompactionFilterFactory* RocksCompactionScheduler::createCompactionFilterFactory() { return new PrefixDeletingCompactionFilterFactory(this); } @@ -451,8 +564,6 @@ namespace mongo { (uint32_t)get_internal_delete_skipped_count(); int dropped_count = 0; uint32_t int_prefix = 0; - - // NOTE(cuixin): only invoke in rocksengine contruct function, no need check conflict for (iter->Seek(kDroppedPrefix); iter->Valid() && iter->key().starts_with(kDroppedPrefix); iter->Next()) { invariantRocksOK(iter->status()); @@ -567,4 +678,30 @@ namespace mongo { } } } + + void RocksCompactionScheduler::addOplogEntriesDeleted(const uint64_t entries) { + _oplogEntriesDeleted.fetch_add(entries, std::memory_order_relaxed); + } + + void RocksCompactionScheduler::addOplogSizeDeleted(const uint64_t size) { + _oplogSizeDeleted.fetch_add(size, std::memory_order_relaxed); + } + + void RocksCompactionScheduler::addOplogCompactRemoved() { + _oplogCompactSkip.fetch_add(1, std::memory_order_relaxed); + } + + void RocksCompactionScheduler::addOplogCompactPreserved() { + _oplogCompactKeep.fetch_add(1, std::memory_order_relaxed); + } + + const OplogDelCompactStats RocksCompactionScheduler::getOplogDelCompactStats() const { + OplogDelCompactStats stats; + stats.oplogEntriesDeleted = _oplogEntriesDeleted.load(std::memory_order_relaxed); + stats.oplogSizeDeleted = _oplogSizeDeleted.load(std::memory_order_relaxed); + stats.oplogCompactSkip = _oplogCompactSkip.load(std::memory_order_relaxed); + stats.oplogCompactKeep = _oplogCompactKeep.load(std::memory_order_relaxed); + + return stats; + } } // namespace mongo diff --git a/src/rocks_compaction_scheduler.h b/src/rocks_compaction_scheduler.h index 7d3f81b..26fffa0 100644 --- a/src/rocks_compaction_scheduler.h +++ b/src/rocks_compaction_scheduler.h @@ -57,12 +57,19 @@ namespace mongo { class CompactionBackgroundJob; + struct OplogDelCompactStats { + uint64_t oplogEntriesDeleted; + uint64_t oplogSizeDeleted; + uint64_t oplogCompactSkip; + uint64_t oplogCompactKeep; + }; + class RocksCompactionScheduler { public: RocksCompactionScheduler(); ~RocksCompactionScheduler(); - void start(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf); + void start(rocksdb::TOTransactionDB* db, rocksdb::ColumnFamilyHandle* cf); static int getSkippedDeletionsThreshold() { return kSkippedDeletionsThreshold; } @@ -71,8 +78,9 @@ namespace mongo { // schedule compact range operation for execution in _compactionThread void compactAll(); Status compactOplog(rocksdb::ColumnFamilyHandle* cf, const std::string& begin, const std::string& end); + Status rollbackToStable(rocksdb::ColumnFamilyHandle* cf); - rocksdb::CompactionFilterFactory* createCompactionFilterFactory() const; + rocksdb::CompactionFilterFactory* createCompactionFilterFactory(); std::unordered_map getDroppedPrefixes() const; boost::optional>> getOplogDeleteUntil() const; @@ -87,11 +95,24 @@ namespace mongo { void notifyCompacted(const std::string& begin, const std::string& end, bool rangeDropped, bool opSucceeded); + // calculate Oplog Delete Entries + void addOplogEntriesDeleted(const uint64_t entries); + // calculate Oplog Delete Size + void addOplogSizeDeleted(const uint64_t size); + // add up to Oplog Compact Removed Entries + void addOplogCompactRemoved(); + // add up tp Oplog Compact Preserved Entries + void addOplogCompactPreserved(); + // query Oplog Delete and Compact all Stats + const OplogDelCompactStats getOplogDelCompactStats() const; + private: void compactPrefix(rocksdb::ColumnFamilyHandle* cf, const std::string& prefix); void compactDroppedPrefix(rocksdb::ColumnFamilyHandle* cf, const std::string& prefix); - void compact(rocksdb::ColumnFamilyHandle* cf, const std::string& begin, const std::string& end, - bool rangeDropped, uint32_t order, boost::optional>>); + void compact(rocksdb::ColumnFamilyHandle* cf, const std::string& begin, + const std::string& end, bool rangeDropped, uint32_t order, + const bool trimHistory, + boost::optional>>); void droppedPrefixCompacted(const std::string& prefix, bool opSucceeded); private: @@ -99,7 +120,7 @@ namespace mongo { // protected by _lock Timer _timer; - rocksdb::DB* _db; // not owned + rocksdb::TOTransactionDB* _db; // not owned // not owned, cf where compaction_scheduler's metadata exists. rocksdb::ColumnFamilyHandle* _metaCf; @@ -123,5 +144,10 @@ namespace mongo { std::atomic _droppedPrefixesCount; boost::optional>> _oplogDeleteUntil; static const std::string kDroppedPrefix; + + std::atomic _oplogEntriesDeleted; + std::atomic _oplogSizeDeleted; + std::atomic _oplogCompactSkip; + std::atomic _oplogCompactKeep; }; } // namespace mongo diff --git a/src/rocks_durability_manager.cpp b/src/rocks_durability_manager.cpp index ffb1730..0f3a20e 100644 --- a/src/rocks_durability_manager.cpp +++ b/src/rocks_durability_manager.cpp @@ -34,8 +34,14 @@ #include "rocks_util.h" namespace mongo { - RocksDurabilityManager::RocksDurabilityManager(rocksdb::DB* db, bool durable) - : _db(db), _durable(durable), _journalListener(&NoOpJournalListener::instance) {} + RocksDurabilityManager::RocksDurabilityManager(rocksdb::DB* db, bool durable, + rocksdb::ColumnFamilyHandle* defaultCf, + rocksdb::ColumnFamilyHandle* oplogCf) + : _db(db), + _durable(durable), + _defaultCf(defaultCf), + _oplogCf(oplogCf), + _journalListener(&NoOpJournalListener::instance) {} void RocksDurabilityManager::setJournalListener(JournalListener* jl) { stdx::unique_lock lk(_journalListenerMutex); @@ -58,7 +64,7 @@ namespace mongo { stdx::unique_lock jlk(_journalListenerMutex); JournalListener::Token token = _journalListener->getToken(); if (!_durable || forceFlush) { - invariantRocksOK(_db->Flush(rocksdb::FlushOptions())); + invariantRocksOK(_db->Flush(rocksdb::FlushOptions(), {_defaultCf, _oplogCf})); } else { invariantRocksOK(_db->SyncWAL()); } diff --git a/src/rocks_durability_manager.h b/src/rocks_durability_manager.h old mode 100755 new mode 100644 index d2d4538..b43cb6e --- a/src/rocks_durability_manager.h +++ b/src/rocks_durability_manager.h @@ -45,23 +45,26 @@ namespace mongo { RocksDurabilityManager& operator=(const RocksDurabilityManager&) = delete; public: - RocksDurabilityManager(rocksdb::DB* db, bool durable); + RocksDurabilityManager(rocksdb::DB* db, bool durable, + rocksdb::ColumnFamilyHandle* defaultCf, + rocksdb::ColumnFamilyHandle* oplogCf); void setJournalListener(JournalListener* jl); void waitUntilDurable(bool forceFlush); /** - * Waits until a prepared unit of work has ended (either been commited or aborted). - * This should be used when encountering WT_PREPARE_CONFLICT errors. The caller is - * required to retry the conflicting WiredTiger API operation. A return from this - * function does not guarantee that the conflicting transaction has ended, only - * that one prepared unit of work in the process has signaled that it has ended. - * Accepts an OperationContext that will throw an AssertionException when interrupted. + * Waits until a prepared unit of work has ended (either been commited or aborted). This + * should be used when encountering ROCKS_PREPARE_CONFLICT errors. The caller is required to + * retry the conflicting WiredTiger API operation. A return from this function does not + * guarantee that the conflicting transaction has ended, only that one prepared unit of work + * in the process has signaled that it has ended. Accepts an OperationContext that will + * throw an AssertionException when interrupted. + * * This method is provided in RocksDurabilityManager and not RecoveryUnit because all - * recovery units share the same durable manager, and we want a recovery unit on one - * thread to signal all recovery units waiting for prepare conflicts across all - * other threads. + * recovery units share the same RocksDurabilityManager, and we want a recovery unit on one + * thread to signal all recovery units waiting for prepare conflicts across all other + * threads. */ void waitUntilPreparedUnitOfWorkCommitsOrAborts(OperationContext* opCtx, uint64_t lastCount); @@ -80,7 +83,8 @@ namespace mongo { rocksdb::DB* _db; // not owned bool _durable; - + rocksdb::ColumnFamilyHandle* _defaultCf; // not owned + rocksdb::ColumnFamilyHandle* _oplogCf; // not owned // Notified when we commit to the journal. JournalListener* _journalListener; diff --git a/src/rocks_engine.cpp b/src/rocks_engine.cpp old mode 100755 new mode 100644 index 5f4c45e..cf046c6 --- a/src/rocks_engine.cpp +++ b/src/rocks_engine.cpp @@ -174,13 +174,6 @@ namespace mongo { return _data->resize(num); } - // TODO(cuixin): consider interfaces below, mongoRocks has not implemented them yet - // WiredTigerKVEngine::setInitRsOplogBackgroundThreadCallback skip - // WiredTigerKVEngine::initRsOplogBackgroundThread skip - // getBackupInformationFromBackupCursor is used in - // WiredTigerKVEngine::beginNonBlockingBackup - // rocks db skip it - // first four bytes are the default prefix 0 const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 13); @@ -203,7 +196,7 @@ namespace mongo { unsigned long long memSizeMB = pi.getMemSizeMB(); if (memSizeMB > 0) { // reserve 1GB for system and binaries, and use 30% of the rest - double cacheMB = (memSizeMB - 1024) * 0.3; + double cacheMB = (memSizeMB - 1024) * 0.5; cacheSizeGB = static_cast(cacheMB / 1024); } if (cacheSizeGB < 1) { @@ -290,7 +283,8 @@ namespace mongo { {_defaultCf.get(), _oplogCf.get()}); _maxPrefix = std::max(_maxPrefix, maxDroppedPrefix); - _durabilityManager.reset(new RocksDurabilityManager(_db.get(), _durable)); + _durabilityManager.reset( + new RocksDurabilityManager(_db.get(), _durable, _defaultCf.get(), _oplogCf.get())); _oplogManager.reset(new RocksOplogManager(_db.get(), this, _durabilityManager.get())); rocksdb::RocksTimeStamp ts(0); @@ -327,10 +321,10 @@ namespace mongo { }(); if (newDB) { // init manifest so list column families will not fail when db is empty. - invariantRocksOK(rocksdb::TOTransactionDB::Open(_options(false /* isOplog */), - rocksdb::TOTransactionDBOptions(), - _path, - &db)); + invariantRocksOK(rocksdb::TOTransactionDB::Open( + _options(false /* isOplog */), + rocksdb::TOTransactionDBOptions(rocksGlobalOptions.maxConflictCheckSizeMB), _path, + &db)); invariantRocksOK(db->Close()); } @@ -346,10 +340,10 @@ namespace mongo { // init oplog columnfamily if not exists. if (!hasOplog) { rocksdb::ColumnFamilyHandle* cf = nullptr; - invariantRocksOK(rocksdb::TOTransactionDB::Open(_options(false /* isOplog */), - rocksdb::TOTransactionDBOptions(), - _path, - &db)); + invariantRocksOK(rocksdb::TOTransactionDB::Open( + _options(false /* isOplog */), + rocksdb::TOTransactionDBOptions(rocksGlobalOptions.maxConflictCheckSizeMB), _path, + &db)); invariantRocksOK(db->CreateColumnFamily(_options(true /* isOplog */), NamespaceString::kRsOplogNamespace.toString(), &cf)); @@ -359,14 +353,12 @@ namespace mongo { } std::vector cfs; - s = rocksdb::TOTransactionDB::Open(_options(false /* isOplog */), - rocksdb::TOTransactionDBOptions(), _path, - {{rocksdb::kDefaultColumnFamilyName, - _options(false /* isOplog */)}, - {NamespaceString::kRsOplogNamespace.toString(), - _options(true /* isOplog */)}}, - &cfs, - &db); + s = rocksdb::TOTransactionDB::Open( + _options(false /* isOplog */), + rocksdb::TOTransactionDBOptions(rocksGlobalOptions.maxConflictCheckSizeMB), _path, + {{rocksdb::kDefaultColumnFamilyName, _options(false /* isOplog */)}, + {NamespaceString::kRsOplogNamespace.toString(), _options(true /* isOplog */)}}, + &cfs, &db); invariantRocksOK(s); invariant(cfs.size() == 2); invariant(cfs[0]->GetName() == rocksdb::kDefaultColumnFamilyName); @@ -401,6 +393,27 @@ namespace mongo { bb.done(); } + std::map> RocksEngine::getDefaultCFNumEntries() const { + std::map> numEntriesMap; + + std::vector allFiles; + _db->GetRootDB()->GetLiveFilesMetaData(&allFiles); + for (const auto& f : allFiles) { + if (NamespaceString::oplog(f.column_family_name)) { + continue; + } + + if (numEntriesMap.find(f.level) == numEntriesMap.end()) { + numEntriesMap[f.level] = std::vector(2, 0); + } + + numEntriesMap[f.level][0] += f.num_entries; + numEntriesMap[f.level][1] += f.num_deletions; + } + + return numEntriesMap; + } + Status RocksEngine::okToRename(OperationContext* opCtx, StringData fromNS, StringData toNS, StringData ident, const RecordStore* originalRecordStore) const { _counterManager->sync(); @@ -767,17 +780,27 @@ namespace mongo { table_options.format_version = 2; options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); - // options.info_log = std::shared_ptr(new MongoRocksLogger); - options.write_buffer_size = 64 * 1024 * 1024; // 64MB - options.level0_slowdown_writes_trigger = 8; - options.max_write_buffer_number = 4; - options.max_background_compactions = 8; - options.max_background_flushes = 2; + options.write_buffer_size = rocksGlobalOptions.writeBufferSize; + options.max_write_buffer_number = rocksGlobalOptions.maxWriteBufferNumber; + options.max_background_jobs = rocksGlobalOptions.maxBackgroundJobs; + options.max_total_wal_size = rocksGlobalOptions.maxTotalWalSize; + options.db_write_buffer_size = rocksGlobalOptions.dbWriteBufferSize; + options.num_levels = rocksGlobalOptions.numLevels; + + options.delayed_write_rate = rocksGlobalOptions.delayedWriteRate; + options.level0_file_num_compaction_trigger = + rocksGlobalOptions.level0FileNumCompactionTrigger; + options.level0_slowdown_writes_trigger = rocksGlobalOptions.level0SlowdownWritesTrigger; + options.level0_stop_writes_trigger = rocksGlobalOptions.level0StopWritesTrigger; + options.soft_pending_compaction_bytes_limit = + static_cast(rocksGlobalOptions.softPendingCompactionMBLimit) * 1024 * 1024; + options.hard_pending_compaction_bytes_limit = + static_cast(rocksGlobalOptions.hardPendingCompactionMBLimit) * 1024 * 1024; options.target_file_size_base = 64 * 1024 * 1024; // 64MB options.soft_rate_limit = 2.5; options.hard_rate_limit = 3; options.level_compaction_dynamic_level_bytes = true; - options.max_bytes_for_level_base = 512 * 1024 * 1024; // 512 MB + options.max_bytes_for_level_base = rocksGlobalOptions.maxBytesForLevelBase; // This means there is no limit on open files. Make sure to always set ulimit so that it can // keep all RocksDB files opened. options.max_open_files = -1; @@ -838,6 +861,7 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(RocksPreserveSnapshotHistoryIndefinitely); + MONGO_FAIL_POINT_DEFINE(RocksSetOldestTSToStableTS); } // namespace @@ -898,6 +922,11 @@ namespace mongo { // TODO(wolfkdy): in 4.0.3, setOldestTimestamp considers oplogReadTimestamp // it disappears in mongo4.2, find why it happens void RocksEngine::setOldestTimestamp(Timestamp oldestTimestamp, bool force) { + // Set the oldest timestamp to the stable timestamp to ensure that there is no lag window + // between the two. + if (MONGO_FAIL_POINT(RocksSetOldestTSToStableTS)) { + force = false; + } if (MONGO_FAIL_POINT(RocksPreserveSnapshotHistoryIndefinitely)) { return; } @@ -986,10 +1015,10 @@ namespace mongo { LOG_FOR_ROLLBACK(0) << "Rolling back to the stable timestamp. StableTimestamp: " << stableTimestamp << " Initial Data Timestamp: " << initialDataTimestamp; - auto s = _db->RollbackToStable(_defaultCf.get()); - if (!s.ok()) { - return {ErrorCodes::UnrecoverableRollbackError, - str::stream() << "Error rolling back to stable. Err: " << s.ToString()}; + auto s = _compactionScheduler->rollbackToStable(_defaultCf.get()); + if (!s.isOK()) { + return {ErrorCodes::UnrecoverableRollbackError, + str::stream() << "Error rolling back to stable. Err: " << s}; } setInitialDataTimestamp(initialDataTimestamp); @@ -1049,7 +1078,7 @@ namespace mongo { RocksRecordStore* oplogRecordStore) { stdx::lock_guard lock(_oplogManagerMutex); if (_oplogManagerCount == 0) - _oplogManager->start(opCtx, oplogRecordStore, !_keepDataHistory); + _oplogManager->start(opCtx, oplogRecordStore); _oplogManagerCount++; } diff --git a/src/rocks_engine.h b/src/rocks_engine.h index d74be1e..454ebf6 100644 --- a/src/rocks_engine.h +++ b/src/rocks_engine.h @@ -268,6 +268,10 @@ namespace mongo { rocksdb::Statistics* getStatistics() const { return _statistics.get(); } + std::map> getDefaultCFNumEntries() const; + + rocksdb::ColumnFamilyHandle* getOplogCFHandle() const { return _oplogCf.get(); } + rocksdb::ColumnFamilyHandle* getDefaultCfHandle() const { return _defaultCf.get(); } bool canRecoverToStableTimestamp() const; rocksdb::ColumnFamilyHandle* getDefaultCf_ForTest() const { return _defaultCf.get(); } rocksdb::ColumnFamilyHandle* getOplogCf_ForTest() const { return _oplogCf.get(); } diff --git a/src/rocks_global_options.h b/src/rocks_global_options.h index 2cbf6cc..4bbc9fe 100644 --- a/src/rocks_global_options.h +++ b/src/rocks_global_options.h @@ -42,7 +42,8 @@ namespace mongo { crashSafeCounters(false), counters(true), singleDeleteIndex(false), - logLevel("info") {} + logLevel("info"), + maxConflictCheckSizeMB(200) {} Status store(const optionenvironment::Environment& params); static Status validateRocksdbLogLevel(const std::string& value); @@ -58,6 +59,20 @@ namespace mongo { bool singleDeleteIndex; std::string logLevel; + int maxConflictCheckSizeMB; + int maxBackgroundJobs; + long maxTotalWalSize; + long dbWriteBufferSize; + long writeBufferSize; + long delayedWriteRate; + int numLevels; + int maxWriteBufferNumber; + int level0FileNumCompactionTrigger; + int level0SlowdownWritesTrigger; + int level0StopWritesTrigger; + long maxBytesForLevelBase; + int softPendingCompactionMBLimit; + int hardPendingCompactionMBLimit; }; extern RocksGlobalOptions rocksGlobalOptions; diff --git a/src/rocks_global_options.idl b/src/rocks_global_options.idl index fdce889..971744b 100644 --- a/src/rocks_global_options.idl +++ b/src/rocks_global_options.idl @@ -106,3 +106,97 @@ configs: default: 'info' validator: callback: 'RocksGlobalOptions::validateRocksdbLogLevel' + "storage.rocksdb.maxConflictCheckSizeMB": + description: 'This is still experimental. Use this only if you know what you are doing' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.maxConflictCheckSizeMB' + short_name: rocksdbMaxConflictCheckSizeMB + default: 200 + validator: + gte: 1 + lte: 10000 + "storage.rocksdb.maxBackgroundJobs": + description: 'rocksdb engine max background jobs' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.maxBackgroundJobs' + short_name: rocksdbMaxBackgroundJobs + default: 2 + "storage.rocksdb.maxTotalWalSize": + description: 'rocksdb engine max total wal size' + arg_vartype: Long + cpp_varname: 'rocksGlobalOptions.maxTotalWalSize' + short_name: rocksdbMaxTotalWalSize + # 100 MB + default: 104857600 + "storage.rocksdb.dbWriteBufferSize": + description: 'rocksdb engine db write buffer size' + arg_vartype: Long + cpp_varname: 'rocksGlobalOptions.dbWriteBufferSize' + short_name: rocksdbDbWriteBufferSize + # 128 MB + default: 134217728 + "storage.rocksdb.writeBufferSize": + description: 'rocksdb engine write buffer size' + arg_vartype: Long + cpp_varname: 'rocksGlobalOptions.writeBufferSize' + short_name: rocksdbWriteBufferSize + # 16 MB + default: 16777216 + "storage.rocksdb.delayedWriteRate": + description: 'rocksdb engine delay write rate' + arg_vartype: Long + cpp_varname: 'rocksGlobalOptions.delayedWriteRate' + short_name: rocksdbDelayedWriteRate + # 512 MB + default: 536870912 + "storage.rocksdb.numLevels": + description: 'rocksdb engine num levels' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.numLevels' + short_name: rocksdbNumLevels + default: 5 + "storage.rocksdb.maxWriteBufferNumber": + description: 'rocksdb engine max write buffer number' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.maxWriteBufferNumber' + short_name: rocksdbMaxWriteBufferNumber + default: 4 + "storage.rocksdb.level0FileNumCompactionTrigger": + description: 'rocksdb engine level0 file num compaction trigger' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.level0FileNumCompactionTrigger' + short_name: rocksdbLevel0FileNumCompactionTrigger + default: 4 + "storage.rocksdb.level0SlowdownWritesTrigger": + description: 'rocksdb engine level0 stop writes trigger' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.level0SlowdownWritesTrigger' + short_name: rocksdbLevel0SlowdownWritesTrigger + default: 128 + "storage.rocksdb.level0_stop_writes_trigger": + description: 'rocksdb engine level0 stop writes trigger' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.level0StopWritesTrigger' + short_name: rocksdbLevel0StopWritesTrigger + default: 512 + "storage.rocksdb.maxBytesForLevelBase": + description: 'rocksdb engine max bytes for level base' + arg_vartype: Long + cpp_varname: 'rocksGlobalOptions.maxBytesForLevelBase' + short_name: rocksdbMaxBytesForLevelBase + # 512 MB + default: 536870912 + "storage.rocksdb.softPendingCompactionMBLimit": + description: 'rocksdb engine soft pending compaction MB limit' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.softPendingCompactionMBLimit' + short_name: rocksdbSoftPendingCompactionMBLimit + # 300 GB + default: 307200 + "storage.rocksdb.hardPendingCompactionMBLimit": + description: 'rocksdb engine hard pending compaction MB limit' + arg_vartype: Int + cpp_varname: 'rocksGlobalOptions.hardPendingCompactionMBLimit' + short_name: rocksdbHardPendingCompactionMBLimit + # 500 GB + default: 512000 diff --git a/src/rocks_init.cpp b/src/rocks_init.cpp index 45faa8e..1f9deef 100644 --- a/src/rocks_init.cpp +++ b/src/rocks_init.cpp @@ -69,7 +69,7 @@ namespace mongo { // Intentionally leaked. auto leaked __attribute__((unused)) = new RocksServerStatusSection(engine); auto leaked2 __attribute__((unused)) = new RocksRateLimiterServerParameter( - "rocksdbRuntimeConfigMaxWriteMBPerSec", ServerParameterType::kRuntimeOnly); + "rocksdbRateLimiter", ServerParameterType::kRuntimeOnly); auto leaked3 __attribute__((unused)) = new RocksBackupServerParameter( "rocksdbBackup", ServerParameterType::kRuntimeOnly); auto leaked4 __attribute__((unused)) = new RocksCompactServerParameter( @@ -78,11 +78,14 @@ namespace mongo { "rocksdbRuntimeConfigCacheSizeGB", ServerParameterType::kRuntimeOnly); auto leaked6 __attribute__((unused)) = new RocksOptionsParameter("rocksdbOptions", ServerParameterType::kRuntimeOnly); + auto leaked7 __attribute__((unused)) = new RocksdbMaxConflictCheckSizeParameter( + "rocksdbRuntimeConfigMaxWriteMBPerSec", ServerParameterType::kRuntimeOnly); leaked2->_data = engine; leaked3->_data = engine; leaked4->_data = engine; leaked5->_data = engine; leaked6->_data = engine; + leaked7->_data = engine; return new StorageEngineImpl(engine, options); } diff --git a/src/rocks_oplog_manager.cpp b/src/rocks_oplog_manager.cpp index 5824277..8b0f4bf 100755 --- a/src/rocks_oplog_manager.cpp +++ b/src/rocks_oplog_manager.cpp @@ -56,8 +56,7 @@ namespace mongo { RocksDurabilityManager* durabilityManager) : _db(db), _kvEngine(kvEngine), _durabilityManager(durabilityManager) {} - void RocksOplogManager::start(OperationContext* opCtx, RocksRecordStore* oplogRecordStore, - const bool updateOldestTimestamp) { + void RocksOplogManager::start(OperationContext* opCtx, RocksRecordStore* oplogRecordStore) { invariant(!_isRunning); auto reverseOplogCursor = oplogRecordStore->getCursor(opCtx, false /* false = reverse cursor */); @@ -80,7 +79,7 @@ namespace mongo { // see _shuttingDown as true and quit prematurely. stdx::lock_guard lk(_oplogVisibilityStateMutex); _oplogJournalThread = - stdx::thread(&RocksOplogManager::_oplogJournalThreadLoop, this, oplogRecordStore, updateOldestTimestamp); + stdx::thread(&RocksOplogManager::_oplogJournalThreadLoop, this, oplogRecordStore); _isRunning = true; _shuttingDown = false; } @@ -165,8 +164,7 @@ namespace mongo { } } - void RocksOplogManager::_oplogJournalThreadLoop(RocksRecordStore* oplogRecordStore, - const bool updateOldestTimestamp) noexcept { + void RocksOplogManager::_oplogJournalThreadLoop(RocksRecordStore* oplogRecordStore) noexcept { Client::initThread("RocksOplogJournalThread"); // This thread updates the oplog read timestamp, the timestamp used to read from the oplog @@ -248,11 +246,6 @@ namespace mongo { } lk.unlock(); - if (updateOldestTimestamp) { - const bool force = false; - _kvEngine->setOldestTimestamp(Timestamp(newTimestamp), force); - } - // Wake up any await_data cursors and tell them more data might be visible now. oplogRecordStore->notifyCappedWaitersIfNeeded(); } diff --git a/src/rocks_oplog_manager.h b/src/rocks_oplog_manager.h index f73a31c..17eaf98 100644 --- a/src/rocks_oplog_manager.h +++ b/src/rocks_oplog_manager.h @@ -55,7 +55,7 @@ namespace mongo { RocksDurabilityManager* durabilityManager); virtual ~RocksOplogManager(){}; - void start(OperationContext* opCtx, RocksRecordStore* oplogRecordStore, const bool updateOldestTimestamp); + void start(OperationContext* opCtx, RocksRecordStore* oplogRecordStore); void halt(); @@ -84,7 +84,7 @@ namespace mongo { Timestamp fetchAllDurableValue(); private: - void _oplogJournalThreadLoop(RocksRecordStore* oplogRecordStore, const bool updateOldestTimestamp) noexcept; + void _oplogJournalThreadLoop(RocksRecordStore* oplogRecordStore) noexcept; void _setOplogReadTimestamp(WithLock, uint64_t newTimestamp); diff --git a/src/rocks_parameters.cpp b/src/rocks_parameters.cpp index 40df1e4..6983919 100644 --- a/src/rocks_parameters.cpp +++ b/src/rocks_parameters.cpp @@ -36,6 +36,7 @@ #include "mongo/logger/parse_log_component_settings.h" #include "mongo/util/log.h" #include "mongo/util/str.h" +#include "rocks_global_options.h" #include #include @@ -181,18 +182,58 @@ namespace mongo { Status RocksOptionsParameter::setFromString(const std::string& str) { log() << "RocksDB: Attempting to apply settings: " << str; - + std::set supported_db_options = {"db_write_buffer_size", "delayed_write_rate", + "max_background_jobs", "max_total_wal_size"}; + + std::set supported_cf_options = {"max_write_buffer_number", + "disable_auto_compactions", + "level0_slowdown_writes_trigger", + "level0_stop_writes_trigger", + "soft_pending_compaction_bytes_limit", + "hard_pending_compaction_bytes_limit"}; std::unordered_map optionsMap; rocksdb::Status s = rocksdb::StringToMap(str, &optionsMap); if (!s.ok()) { return Status(ErrorCodes::BadValue, s.ToString()); } - - s = _data->getDB()->SetOptions(optionsMap); + for (const auto& v : optionsMap) { + if (supported_db_options.find(v.first) != supported_db_options.end()) { + s = _data->getDB()->SetDBOptions({v}); + } else if (supported_cf_options.find(v.first) != supported_cf_options.end()) { + s = _data->getDB()->SetOptions({v}); + } else { + return Status(ErrorCodes::BadValue, str::stream() << "unknown param: " << v.first); + } + } if (!s.ok()) { return Status(ErrorCodes::BadValue, s.ToString()); } return Status::OK(); } + + void RocksdbMaxConflictCheckSizeParameter::append(OperationContext* opCtx, BSONObjBuilder& b, + const std::string& name) { + b << name << rocksGlobalOptions.maxConflictCheckSizeMB; + } + + Status RocksdbMaxConflictCheckSizeParameter::set(const BSONElement& newValueElement) { + return setFromString(newValueElement.toString(false)); + } + + Status RocksdbMaxConflictCheckSizeParameter::setFromString(const std::string& str) { + std::string trimStr; + size_t pos = str.find('.'); + if (pos != std::string::npos) { + trimStr = str.substr(0, pos); + } + int newValue; + Status status = parseNumberFromString(trimStr, &newValue); + if (!status.isOK()) { + return status; + } + rocksGlobalOptions.maxConflictCheckSizeMB = newValue; + _data->getDB()->SetMaxConflictBytes(newValue * 1024 * 1024); + return Status::OK(); + } } // namespace mongo diff --git a/src/rocks_parameters.h b/src/rocks_parameters.h deleted file mode 100644 index d37a9bd..0000000 --- a/src/rocks_parameters.h +++ /dev/null @@ -1,153 +0,0 @@ -/** -* Copyright (C) 2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see . -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#include "mongo/base/disallow_copying.h" -#include "mongo/db/server_parameters.h" -#include "mongo/util/assert_util.h" - -#include "rocks_engine.h" - -namespace mongo { - - // To dynamically configure RocksDB's rate limit, run - // db.adminCommand({setParameter:1, rocksdbRuntimeConfigMaxWriteMBPerSec:30}) - class RocksRateLimiterServerParameter : public ServerParameter { - MONGO_DISALLOW_COPYING(RocksRateLimiterServerParameter); - - public: - RocksRateLimiterServerParameter(RocksEngine* engine); - virtual void append(OperationContext* opCtx, BSONObjBuilder& b, const std::string& name); - virtual Status set(const BSONElement& newValueElement); - virtual Status setFromString(const std::string& str); - - private: - Status _set(int newNum); - RocksEngine* _engine; - }; - - // We use mongo's setParameter() API to issue a backup request to rocksdb. - // To backup entire RocksDB instance, call: - // db.adminCommand({setParameter:1, rocksdbBackup: "/var/lib/mongodb/backup/1"}) - // The directory needs to be an absolute path. It should not exist -- it will be created - // automatically. - class RocksBackupServerParameter : public ServerParameter { - MONGO_DISALLOW_COPYING(RocksBackupServerParameter); - - public: - RocksBackupServerParameter(RocksEngine* engine); - virtual void append(OperationContext* opCtx, BSONObjBuilder& b, const std::string& name); - virtual Status set(const BSONElement& newValueElement); - virtual Status setFromString(const std::string& str); - - private: - RocksEngine* _engine; - }; - - // We use mongo's setParameter() API to issue a compact request to rocksdb. - // To compact entire RocksDB instance, call: - // db.adminCommand({setParameter:1, rocksdbCompact: 1}) - class RocksCompactServerParameter : public ServerParameter { - MONGO_DISALLOW_COPYING(RocksCompactServerParameter); - - public: - RocksCompactServerParameter(RocksEngine* engine); - virtual void append(OperationContext* opCtx, BSONObjBuilder& b, const std::string& name); - virtual Status set(const BSONElement& newValueElement); - virtual Status setFromString(const std::string& str); - - private: - RocksEngine* _engine; - }; - - // We use mongo's setParameter() API to dynamically change the size of the block cache - // To compact entire RocksDB instance, call: - // db.adminCommand({setParameter:1, rocksdbRuntimeConfigCacheSizeGB: 10}) - class RocksCacheSizeParameter : public ServerParameter { - MONGO_DISALLOW_COPYING(RocksCacheSizeParameter); - - public: - RocksCacheSizeParameter(RocksEngine* engine); - virtual void append(OperationContext* opCtx, BSONObjBuilder& b, const std::string& name); - virtual Status set(const BSONElement& newValueElement); - virtual Status setFromString(const std::string& str); - - private: - Status _set(int newNum); - RocksEngine* _engine; - }; - - // We use mongo's setParameter() API to dynamically change the RocksDB options using the - // SetOptions API - // To dynamically change an option, call: - // db.adminCommand({setParameter:1, "rocksdbOptions": "someoption=1; someoption2=3"}) - class RocksOptionsParameter : public ServerParameter { - MONGO_DISALLOW_COPYING(RocksOptionsParameter); - - public: - RocksOptionsParameter(RocksEngine* engine); - virtual void append(OperationContext* opCtx, BSONObjBuilder& b, const std::string& name); - virtual Status set(const BSONElement& newValueElement); - virtual Status setFromString(const std::string& str); - - private: - RocksEngine* _engine; - }; - -#ifdef __linux__ - - const int64_t kMinRateLimitIops = 100; - const int64_t kDefaultRateLimitIops = 16000; - const int64_t kMaxRateLimitIops = 100000000; - - const int64_t kMinRateLimitMbps = 50; - const int64_t kDefaultRateLimitMbps = 350; - const int64_t kMaxRateLimitMbps = 1000000; - - class MongoRateLimitParameter : public ServerParameter { - MONGO_DISALLOW_COPYING(MongoRateLimitParameter); - - public: - MongoRateLimitParameter(); - virtual void append(OperationContext* opCtx, BSONObjBuilder& b, const std::string& name); - virtual Status set(const BSONElement& newValueElement); - virtual Status setFromString(const std::string& str); - virtual Status setInternal(const BSONObj& newValue); - virtual std::string getDisk() { return _disk; } - virtual uint64_t getIops() { return _iops; } - virtual uint64_t getMbps() { return _mbps; } - - private: - std::string _disk; - uint64_t _iops; - uint64_t _mbps; - }; - - MongoRateLimitParameter& getMongoRateLimitParameter(); -#endif -} - diff --git a/src/rocks_parameters.idl b/src/rocks_parameters.idl index ce1dbd2..aefa1b7 100755 --- a/src/rocks_parameters.idl +++ b/src/rocks_parameters.idl @@ -1,105 +1,112 @@ -# Copyright (C) 2018-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# . -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -global: - cpp_namespace: "mongo" - cpp_includes: - - "mongo/db/modules/rocks/src/rocks_engine.h" - - "mongo/util/concurrency/ticketholder.h" - - "mongo/util/debug_util.h" - -server_parameters: - rocksConcurrentWriteTransactions: - description: "Rocks Concurrent Write Transactions" - set_at: [ startup, runtime ] - cpp_class: - name: ROpenWriteTransactionParam - data: 'TicketHolder*' - override_ctor: true - rocksConcurrentReadTransactions: - description: "Rocks Concurrent Read Transactions" - set_at: [ startup, runtime ] - cpp_class: - name: ROpenReadTransactionParam - data: 'TicketHolder*' - override_ctor: true - - rocksdbRuntimeConfigMaxWriteMBPerSec: - description: 'rate limiter to MB/s' - set_at: [ startup, runtime ] - cpp_class: - name: RocksRateLimiterServerParameter - data: 'RocksEngine*' - override_set: true - condition: { expr: false } - - rocksdbBackup: - description: 'rocksdb backup' - set_at: runtime - cpp_class: - name: RocksBackupServerParameter - data: 'RocksEngine*' - override_set: true - condition: { expr: false } - - rocksdbCompact: - description: 'rocksdb compact' - set_at: runtime - cpp_class: - name: RocksCompactServerParameter - data: 'RocksEngine*' - override_set: true - condition: { expr: false } - - rocksdbRuntimeConfigCacheSizeGB: - description: 'rocks cache sizeGB' - set_at: startup - cpp_class: - name: RocksCacheSizeParameter - data: 'RocksEngine*' - override_set: true - condition: { expr: false } - - rocksdbOptions: - description: 'set rocksdb options' - set_at: [ startup, runtime ] - cpp_class: - name: RocksOptionsParameter - data: 'RocksEngine*' - override_set: true - condition: { expr: false } - - minSSTFileCountReserved: - description: 'delete oplogs until minSSTFileCountReserved files exceeds the total max size' - set_at: [ startup, runtime ] - cpp_class: - name: ExportedMinSSTFileCountReservedParameter - data: 'AtomicWord*' - override_ctor: true - - +# Copyright (C) 2018-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# . +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/db/modules/rocks/src/rocks_engine.h" + - "mongo/util/concurrency/ticketholder.h" + - "mongo/util/debug_util.h" + +server_parameters: + rocksConcurrentWriteTransactions: + description: "Rocks Concurrent Write Transactions" + set_at: [ startup, runtime ] + cpp_class: + name: ROpenWriteTransactionParam + data: 'TicketHolder*' + override_ctor: true + rocksConcurrentReadTransactions: + description: "Rocks Concurrent Read Transactions" + set_at: [ startup, runtime ] + cpp_class: + name: ROpenReadTransactionParam + data: 'TicketHolder*' + override_ctor: true + + rocksdbRuntimeConfigMaxWriteMBPerSec: + description: 'rate limiter to MB/s' + set_at: [ startup, runtime ] + cpp_class: + name: RocksRateLimiterServerParameter + data: 'RocksEngine*' + override_set: true + condition: { expr: false } + + rocksdbBackup: + description: 'rocksdb backup' + set_at: runtime + cpp_class: + name: RocksBackupServerParameter + data: 'RocksEngine*' + override_set: true + condition: { expr: false } + + rocksdbCompact: + description: 'rocksdb compact' + set_at: runtime + cpp_class: + name: RocksCompactServerParameter + data: 'RocksEngine*' + override_set: true + condition: { expr: false } + + rocksdbRuntimeConfigCacheSizeGB: + description: 'rocks cache sizeGB' + set_at: startup + cpp_class: + name: RocksCacheSizeParameter + data: 'RocksEngine*' + override_set: true + condition: { expr: false } + + rocksdbOptions: + description: 'set rocksdb options' + set_at: [ startup, runtime ] + cpp_class: + name: RocksOptionsParameter + data: 'RocksEngine*' + override_set: true + condition: { expr: false } + + minSSTFileCountReserved: + description: 'delete oplogs until minSSTFileCountReserved files exceeds the total max size' + set_at: [ startup, runtime ] + cpp_class: + name: ExportedMinSSTFileCountReservedParameter + data: 'AtomicWord*' + override_ctor: true + + rocksdbRuntimeConfigMaxConflictCheckSize: + description: 'rocksdb max conflict check size' + set_at: startup + cpp_class: + name: RocksdbMaxConflictCheckSizeParameter + data: 'RocksEngine*' + override_set: true + condition: { expr: false } diff --git a/src/rocks_prepare_conflict.cpp b/src/rocks_prepare_conflict.cpp index bdac704..0ff7528 100644 --- a/src/rocks_prepare_conflict.cpp +++ b/src/rocks_prepare_conflict.cpp @@ -44,7 +44,7 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(RocksPrintPrepareConflictLog); void rocksPrepareConflictLog(int attempts) { - LOG(1) << "Caught conflict, attempt " << attempts + LOG(1) << "Caught ROCKS_PREPARE_CONFLICT, attempt " << attempts << ". Waiting for unit of work to commit or abort."; } diff --git a/src/rocks_record_store.cpp b/src/rocks_record_store.cpp old mode 100755 new mode 100644 index 78f5d9d..eb9c6b3 --- a/src/rocks_record_store.cpp +++ b/src/rocks_record_store.cpp @@ -744,7 +744,9 @@ namespace mongo { } else { iterator->SeekToFirst(); if(iterator->Valid()){ - cappedTruncateAfter(opCtx, _makeRecordId(iterator->key()), true); + const bool inclusive = true; + const bool isTruncate = true; + cappedTruncateAfter(opCtx, _makeRecordId(iterator->key()), inclusive, isTruncate); } } @@ -882,6 +884,14 @@ namespace mongo { void RocksRecordStore::cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) { + cappedTruncateAfter(opCtx, end, inclusive, false); + } + void RocksRecordStore::cappedTruncateAfter(OperationContext* opCtx, RecordId end, + bool inclusive, bool isTruncate) { + if (isTruncate) { + invariant(inclusive && _isOplog); + } + // Only log messages at a lower level here for testing. int logLevel = getTestCommandsEnabled() ? 0 : 2; @@ -901,7 +911,12 @@ namespace mongo { std::unique_ptr reverseCursor = getCursor(opCtx, false); invariant(reverseCursor->seekExact(end)); auto prev = reverseCursor->next(); - lastKeptId = prev ? prev->id : RecordId(); + if (prev) { + lastKeptId = prev->id; + } else { + invariant(_isOplog && isTruncate); + lastKeptId = RecordId(); + } firstRemovedId = end; LOG(0) << "lastKeptId: " << Timestamp(lastKeptId.repr()); } else { @@ -986,27 +1001,29 @@ namespace mongo { } } - // Immediately rewind visibility to our truncation point, to prevent new - // transactions from appearing. - LOG(logLevel) << "Rewinding oplog visibility point to " << truncTs - << " after truncation."; - - if (!serverGlobalParams.enableMajorityReadConcern && - _engine->getOldestTimestamp() > truncTs) { - // If majority read concern is disabled, we must set the oldest timestamp along with - // the commit timestamp. Otherwise, the commit timestamp might be set behind the - // oldest timestamp. - const bool force = true; - _engine->setOldestTimestamp(truncTs, force); - } else { - const bool force = false; - invariantRocksOK(_engine->getDB()->SetTimeStamp( - rocksdb::TimeStampType::kCommitted, rocksdb::RocksTimeStamp(truncTs.asULL()), - force)); - } + if (!isTruncate) { + // Immediately rewind visibility to our truncation point, to prevent new + // transactions from appearing. + LOG(logLevel) << "Rewinding oplog visibility point to " << truncTs + << " after truncation."; + + if (!serverGlobalParams.enableMajorityReadConcern && + _engine->getOldestTimestamp() > truncTs) { + // If majority read concern is disabled, we must set the oldest timestamp along + // with the commit timestamp. Otherwise, the commit timestamp might be set + // behind the oldest timestamp. + const bool force = true; + _engine->setOldestTimestamp(truncTs, force); + } else { + const bool force = false; + invariantRocksOK(_engine->getDB()->SetTimeStamp( + rocksdb::TimeStampType::kCommitted, + rocksdb::RocksTimeStamp(truncTs.asULL()), force)); + } - _oplogManager->setOplogReadTimestamp(truncTs); - LOG(1) << "truncation new read timestamp: " << truncTs; + _oplogManager->setOplogReadTimestamp(truncTs); + LOG(1) << "truncation new read timestamp: " << truncTs; + } } } diff --git a/src/rocks_record_store.h b/src/rocks_record_store.h index 94388ef..446e9cf 100644 --- a/src/rocks_record_store.h +++ b/src/rocks_record_store.h @@ -170,6 +170,8 @@ namespace mongo { double scale) const; virtual void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive); + void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive, + bool isTruncate); virtual boost::optional oplogStartHack(OperationContext* opCtx, const RecordId& startingPosition) const; diff --git a/src/rocks_record_store_mongod.cpp b/src/rocks_record_store_mongod.cpp index 10d611c..42833bd 100644 --- a/src/rocks_record_store_mongod.cpp +++ b/src/rocks_record_store_mongod.cpp @@ -41,9 +41,11 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/util/background.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -55,6 +57,25 @@ namespace mongo { namespace { + Timestamp getoldestPrepareTs(OperationContext* opCtx) { + auto alterClient = opCtx->getServiceContext()->makeClient("get-oldest-prepared-txn"); + AlternativeClientRegion acr(alterClient); + const auto tmpOpCtx = cc().makeOperationContext(); + tmpOpCtx->recoveryUnit()->setTimestampReadSource( + RecoveryUnit::ReadSource::kNoTimestamp); + DBDirectClient client(tmpOpCtx.get()); + Query query = QUERY("txnState" + << "kPrepared") + .sort("lastWriteOpTime", 1); + auto c = client.query(NamespaceString::kSessionTransactionsTableNamespace, query, 1); + if (c->more()) { + auto raw = c->next(); + SessionTxnRecord record = + SessionTxnRecord::parse(IDLParserErrorContext("init prepared txns"), raw); + return record.getLastWriteOpTime().getTimestamp(); + } + return Timestamp::max(); + } std::set _backgroundThreadNamespaces; Mutex _backgroundThreadMutex; @@ -76,10 +97,11 @@ namespace mongo { LOG(1) << "no global storage engine yet"; return false; } - + auto engine = getGlobalServiceContext()->getStorageEngine(); const auto opCtx = cc().makeOperationContext(); try { + const Timestamp oldestPreparedTxnTs = getoldestPrepareTs(opCtx.get()); // A Global IX lock should be good enough to protect the oplog truncation from // interruptions such as restartCatalog. PBWM, database lock or collection lock is not // needed. This improves concurrency if oplog truncation takes long time. @@ -110,8 +132,18 @@ namespace mongo { } rs = checked_cast(collection->getRecordStore()); } - - return rs->reclaimOplog(opCtx.get()); + if (!engine->supportsRecoverToStableTimestamp()) { + // For non-RTT storage engines, the oplog can always be truncated. + return rs->reclaimOplog(opCtx.get(), oldestPreparedTxnTs); + } + const auto lastStableCheckpointTsPtr = engine->getLastStableRecoveryTimestamp(); + Timestamp lastStableCheckpointTimestamp = + lastStableCheckpointTsPtr ? *lastStableCheckpointTsPtr : Timestamp::min(); + Timestamp persistedTimestamp = + std::min(oldestPreparedTxnTs, lastStableCheckpointTimestamp); + return rs->reclaimOplog(opCtx.get(), persistedTimestamp); + } catch (const ExceptionForCat&) { + return false; } catch (const std::exception& e) { severe() << "error in RocksRecordStoreThread: " << redact(e.what()); fassertFailedNoTrace(!"error in RocksRecordStoreThread"); diff --git a/src/rocks_recovery_unit.cpp b/src/rocks_recovery_unit.cpp index 1461a92..14ab297 100644 --- a/src/rocks_recovery_unit.cpp +++ b/src/rocks_recovery_unit.cpp @@ -655,7 +655,7 @@ namespace mongo { } } - LOG(0) << "Rocks begin_transaction for snapshot id " << _mySnapshotId << ",src " + LOG(1) << "Rocks begin_transaction for snapshot id " << _mySnapshotId << ",src " << (int)_timestampReadSource; } diff --git a/src/rocks_server_status.cpp b/src/rocks_server_status.cpp index 6a35f42..2310dab 100644 --- a/src/rocks_server_status.cpp +++ b/src/rocks_server_status.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + #include #include "mongo/platform/basic.h" @@ -42,6 +44,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/namespace_string.h" #include "mongo/util/assert_util.h" +#include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "rocks_engine.h" @@ -75,6 +78,33 @@ namespace mongo { BSONObjBuilder bob; + generatePropertiesSection(&bob); + generateThreadStatusSection(&bob); + generateCountersSection(&bob); + generateTxnStatsSection(&bob); + generateOplogDelStatsSection(&bob); + generateCompactSchedulerSection(&bob); + generateDefaultCFEntriesNumSection(&bob); + + RocksEngine::appendGlobalStats(bob); + + return bob.obj(); + } + + void RocksServerStatusSection::generateDefaultCFEntriesNumSection(BSONObjBuilder* bob) const { + auto defaultCFNumEntries = _engine->getDefaultCFNumEntries(); + + BSONObjBuilder objBuilder; + for (auto& numVec : defaultCFNumEntries) { + BSONObjBuilder ob; + ob.append("num-entries", static_cast(numVec.second[0])); + ob.append("num-deletions", static_cast(numVec.second[1])); + objBuilder.append("L" + std::to_string(numVec.first), ob.obj()); + } + bob->append("file-num-entries", objBuilder.obj()); + } + + void RocksServerStatusSection::generatePropertiesSection(BSONObjBuilder* bob) const { // if the second is true, that means that we pass the value through PrettyPrintBytes std::vector> properties = { {"stats", false}, @@ -90,12 +120,22 @@ namespace mongo { {"num-snapshots", false}, {"oldest-snapshot-time", false}, {"num-live-versions", false}}; - for (auto const& property : properties) { + auto getProperties = [&](const std::pair& property, + const std::string& prefix) { std::string statsString; - if (!_engine->getDB()->GetProperty("rocksdb." + property.first, &statsString)) { - statsString = " unable to retrieve statistics"; - bob.append(property.first, statsString); - continue; + if (prefix == "oplogcf-") { + if (!_engine->getDB()->GetProperty(_engine->getOplogCFHandle(), + "rocksdb." + property.first, &statsString)) { + statsString = " unable to retrieve statistics by oplogCF"; + bob->append(property.first, statsString); + return; + } + } else { + if (!_engine->getDB()->GetProperty("rocksdb." + property.first, &statsString)) { + statsString = " unable to retrieve statistics"; + bob->append(property.first, statsString); + return; + } } if (property.first == "stats") { // special casing because we want to turn it into array @@ -105,16 +145,24 @@ namespace mongo { while (std::getline(ss, line)) { a.append(line); } - bob.appendArray(property.first, a.arr()); + bob->appendArray(prefix + property.first, a.arr()); } else if (property.second) { - bob.append(property.first, PrettyPrintBytes(std::stoll(statsString))); + bob->append(prefix + property.first, PrettyPrintBytes(std::stoll(statsString))); } else { - bob.append(property.first, statsString); + bob->append(prefix + property.first, statsString); } + }; + + for (auto const& property : properties) { + getProperties(property, ""); + getProperties(property, "oplogcf-"); } - bob.append("total-live-recovery-units", RocksRecoveryUnit::getTotalLiveRecoveryUnits()); - bob.append("block-cache-usage", PrettyPrintBytes(_engine->getBlockCacheUsage())); + bob->append("total-live-recovery-units", RocksRecoveryUnit::getTotalLiveRecoveryUnits()); + bob->append("block-cache-usage", PrettyPrintBytes(_engine->getBlockCacheUsage())); + } + + void RocksServerStatusSection::generateThreadStatusSection(BSONObjBuilder* bob) const { std::vector threadList; auto s = rocksdb::Env::Default()->GetThreadList(&threadList); if (s.ok()) { @@ -171,9 +219,11 @@ namespace mongo { threadStatus.append(threadObjBuilder.obj()); } - bob.appendArray("thread-status", threadStatus.arr()); + bob->appendArray("thread-status", threadStatus.arr()); } + } + void RocksServerStatusSection::generateCountersSection(BSONObjBuilder* bob) const { // add counters auto stats = _engine->getStatistics(); if (stats) { @@ -200,74 +250,92 @@ namespace mongo { static_cast(stats->getTickerCount(counter_name.first))); } - bob.append("counters", countersObjBuilder.obj()); + bob->append("counters", countersObjBuilder.obj()); } + } + void RocksServerStatusSection::generateTxnStatsSection(BSONObjBuilder* bob) const { // transaction stats - { - rocksdb::TOTransactionStat txnStat; - memset(&txnStat, 0, sizeof txnStat); - rocksdb::TOTransactionDB* db = _engine->getDB(); - invariant(db->Stat(&txnStat).ok()); - BSONObjBuilder txnObjBuilder; - txnObjBuilder.append("max-conflict-bytes", - static_cast(txnStat.max_conflict_bytes)); - txnObjBuilder.append("cur-conflict-bytes", - static_cast(txnStat.cur_conflict_bytes)); - txnObjBuilder.append("uncommitted-keys", static_cast(txnStat.uk_num)); - txnObjBuilder.append("committed-keys", static_cast(txnStat.ck_num)); - txnObjBuilder.append("alive-txn-num", static_cast(txnStat.alive_txns_num)); - txnObjBuilder.append("read-queue-num", static_cast(txnStat.read_q_num)); - txnObjBuilder.append("commit-queue-num", static_cast(txnStat.commit_q_num)); - txnObjBuilder.append("oldest-timestamp", static_cast(txnStat.oldest_ts)); - txnObjBuilder.append("min-read-timestamp", static_cast(txnStat.min_read_ts)); - txnObjBuilder.append("max-commit-timestamp", - static_cast(txnStat.max_commit_ts)); - txnObjBuilder.append("committed-max-txnid", - static_cast(txnStat.committed_max_txnid)); - txnObjBuilder.append("min-uncommit-ts", - static_cast(txnStat.min_uncommit_ts)); - txnObjBuilder.append("update-max-commit-ts-times", - static_cast(txnStat.update_max_commit_ts_times)); - txnObjBuilder.append("update-max-commit-ts-retries", - static_cast(txnStat.update_max_commit_ts_retries)); - txnObjBuilder.append("txn-commits", static_cast(txnStat.txn_commits)); - txnObjBuilder.append("txn-aborts", static_cast(txnStat.txn_aborts)); - txnObjBuilder.append("commit-without-ts-times", - static_cast(txnStat.commit_without_ts_times)); - txnObjBuilder.append("read-without-ts-times", - static_cast(txnStat.read_without_ts_times)); - txnObjBuilder.append("read-with-ts-times", - static_cast(txnStat.read_with_ts_times)); - txnObjBuilder.append("read-queue-walk-len-sum", - static_cast(txnStat.read_q_walk_len_sum)); - txnObjBuilder.append("read-queue-walk-times", - static_cast(txnStat.read_q_walk_times)); - txnObjBuilder.append("commit-queue-walk-len-sum", - static_cast(txnStat.commit_q_walk_len_sum)); - txnObjBuilder.append("commit-queue-walk-times", - static_cast(txnStat.commit_q_walk_times)); - bob.append("transaction-stats", txnObjBuilder.obj()); - } + rocksdb::TOTransactionStat txnStat; + memset(&txnStat, 0, sizeof txnStat); + rocksdb::TOTransactionDB* db = _engine->getDB(); + invariant(db->Stat(&txnStat).ok()); + BSONObjBuilder txnObjBuilder; + txnObjBuilder.append("max-conflict-bytes", + static_cast(txnStat.max_conflict_bytes)); + txnObjBuilder.append("cur-conflict-bytes", + static_cast(txnStat.cur_conflict_bytes)); + txnObjBuilder.append("uncommitted-keys", static_cast(txnStat.uk_num)); + txnObjBuilder.append("committed-keys", static_cast(txnStat.ck_num)); + txnObjBuilder.append("alive-txn-num", static_cast(txnStat.alive_txns_num)); + txnObjBuilder.append("read-queue-num", static_cast(txnStat.read_q_num)); + txnObjBuilder.append("commit-queue-num", static_cast(txnStat.commit_q_num)); + txnObjBuilder.append("oldest-timestamp", static_cast(txnStat.oldest_ts)); + txnObjBuilder.append("min-read-timestamp", static_cast(txnStat.min_read_ts)); + txnObjBuilder.append("max-commit-timestamp", static_cast(txnStat.max_commit_ts)); + txnObjBuilder.append("committed-max-txnid", + static_cast(txnStat.committed_max_txnid)); + txnObjBuilder.append("min-uncommit-ts", static_cast(txnStat.min_uncommit_ts)); + txnObjBuilder.append("update-max-commit-ts-times", + static_cast(txnStat.update_max_commit_ts_times)); + txnObjBuilder.append("update-max-commit-ts-retries", + static_cast(txnStat.update_max_commit_ts_retries)); + txnObjBuilder.append("txn-commits", static_cast(txnStat.txn_commits)); + txnObjBuilder.append("txn-aborts", static_cast(txnStat.txn_aborts)); + txnObjBuilder.append("commit-without-ts-times", + static_cast(txnStat.commit_without_ts_times)); + txnObjBuilder.append("read-without-ts-times", + static_cast(txnStat.read_without_ts_times)); + txnObjBuilder.append("read-with-ts-times", + static_cast(txnStat.read_with_ts_times)); + txnObjBuilder.append("read-queue-walk-len-sum", + static_cast(txnStat.read_q_walk_len_sum)); + txnObjBuilder.append("read-queue-walk-times", + static_cast(txnStat.read_q_walk_times)); + txnObjBuilder.append("commit-queue-walk-len-sum", + static_cast(txnStat.commit_q_walk_len_sum)); + txnObjBuilder.append("commit-queue-walk-times", + static_cast(txnStat.commit_q_walk_times)); + bob->append("transaction-stats", txnObjBuilder.obj()); + } + void RocksServerStatusSection::generateOplogDelStatsSection(BSONObjBuilder* bob) const { + // oplog compact delete stats + BSONObjBuilder oplogDelBuilder; + auto oplogStats = _engine->getCompactionScheduler()->getOplogDelCompactStats(); + oplogDelBuilder.append("oplog-deleted-entries", + static_cast(oplogStats.oplogEntriesDeleted)); + oplogDelBuilder.append("oplog-deleted-size", + static_cast(oplogStats.oplogSizeDeleted)); + oplogDelBuilder.append("oplog-compact-skip-entries", + static_cast(oplogStats.oplogCompactSkip)); + oplogDelBuilder.append("opLog-compact-keep-entries", + static_cast(oplogStats.oplogCompactKeep)); + bob->append("oplog-compact-stats", oplogDelBuilder.obj()); + } + void RocksServerStatusSection::generateCompactSchedulerSection(BSONObjBuilder* bob) const { // compaction scheduler stats // TODO(wolfkdy): use jstests and failPoints to test primary/secondary status after dropIndex // and dropCollection, test prefixes draining before and after mongod reboot + BSONObjBuilder bb; + bool large = false; + auto droppedPrefixes = _engine->getCompactionScheduler()->getDroppedPrefixes(); { - BSONObjBuilder bb; - auto droppedPrefixes = _engine->getCompactionScheduler()->getDroppedPrefixes(); - { - BSONArrayBuilder a; - for (auto p : droppedPrefixes) { - a.append(BSON("prefix" << static_cast(p.first) << "debug-info" << p.second)); + BSONArrayBuilder a; + for (auto p : droppedPrefixes) { + a.append( + BSON("prefix" << static_cast(p.first) << "debug-info" << p.second)); + if (a.len() > 1024 * 1024 * 15) { + large = true; + break; } - bb.appendArray("dropped-prefixes", a.arr()); } - bob.append("compaction-scheduler", bb.obj()); + bb.appendArray("dropped-prefixes", a.arr()); } - RocksEngine::appendGlobalStats(bob); - - return bob.obj(); + auto obj = bb.obj(); + if (large) { + log() << "status is over 15MB"; + } + bob->append("compaction-scheduler", obj); } - } // namespace mongo diff --git a/src/rocks_server_status.h b/src/rocks_server_status.h index 34e7baf..f3bcbfb 100644 --- a/src/rocks_server_status.h +++ b/src/rocks_server_status.h @@ -44,6 +44,14 @@ namespace mongo { BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const override; + protected: + virtual void generatePropertiesSection(BSONObjBuilder* bob) const; + virtual void generateThreadStatusSection(BSONObjBuilder* bob) const; + virtual void generateCountersSection(BSONObjBuilder* bob) const; + virtual void generateTxnStatsSection(BSONObjBuilder* bob) const; + virtual void generateOplogDelStatsSection(BSONObjBuilder* bob) const; + virtual void generateCompactSchedulerSection(BSONObjBuilder* bob) const; + virtual void generateDefaultCFEntriesNumSection(BSONObjBuilder* bob) const; private: RocksEngine* _engine; }; diff --git a/src/rocks_snapshot_manager.cpp b/src/rocks_snapshot_manager.cpp index b1882c5..67ec623 100644 --- a/src/rocks_snapshot_manager.cpp +++ b/src/rocks_snapshot_manager.cpp @@ -50,7 +50,11 @@ namespace mongo { void RocksSnapshotManager::setLocalSnapshot(const Timestamp& timestamp) { stdx::lock_guard lock(_localSnapshotMutex); - _localSnapshot = timestamp; + if(timestamp.isNull()) { + _localSnapshot = boost::none; + } else { + _localSnapshot = timestamp; + } } boost::optional RocksSnapshotManager::getLocalSnapshot() {