Skip to content

Commit

Permalink
Merge pull request #179 from mongodb-partners/v4.2.5
Browse files Browse the repository at this point in the history
V4.2.5
  • Loading branch information
wolfkdy committed Jun 1, 2021
2 parents a50c999 + 51be282 commit a129cd5
Show file tree
Hide file tree
Showing 41 changed files with 2,255 additions and 1,600 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.pyc
*.sw*
23 changes: 14 additions & 9 deletions SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,34 @@ env.Library(
'src/rocks_oplog_manager.cpp',
'src/rocks_begin_transaction_block.cpp',
'src/rocks_prepare_conflict.cpp',
'src/mongo_rate_limiter_checker.cpp',
env.Idlc('src/rocks_parameters.idl')[0],
env.Idlc('src/rocks_global_options.idl')[0],
'src/rocks_parameters.cpp',
],
LIBDEPS= [
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/prepare_conflict_tracker',
'$BUILD_DIR/mongo/db/catalog/collection_options',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/index/index_descriptor',
'$BUILD_DIR/mongo/db/storage/bson_collection_catalog_entry',
'$BUILD_DIR/mongo/db/storage/index_entry_comparison',
'$BUILD_DIR/mongo/db/storage/journal_listener',
'$BUILD_DIR/mongo/db/storage/key_string',
'$BUILD_DIR/mongo/db/storage/oplog_hack',
'$BUILD_DIR/mongo/db/storage/kv/kv_prefix',
'$BUILD_DIR/mongo/util/background_job',
'$BUILD_DIR/mongo/util/concurrency/ticketholder',
'$BUILD_DIR/mongo/util/processinfo',
'$BUILD_DIR/third_party/shim_snappy',
],
LIBDEPS_PRIVATE= [
'$BUILD_DIR/mongo/db/snapshot_window_options',
],
SYSLIBDEPS=["rocksdb",
"z",
"bz2"] #z and bz2 are dependencies for rocks
Expand All @@ -66,28 +73,26 @@ env.Library(
],
LIBDEPS= [
'storage_rocks_base',
'$BUILD_DIR/mongo/db/storage/kv/kv_engine'
],
LIBDEPS_DEPENDENTS=['$BUILD_DIR/mongo/db/serveronly']
PROGDEPS_DEPENDENTS=['$BUILD_DIR/mongo/mongod']
)

env.Library(
target= 'storage_rocks_mock',
source= [
'src/rocks_record_store_mock.cpp',
],
],
LIBDEPS= [
'storage_rocks_base',
# Temporary crutch since the ssl cleanup is hard coded in background.cpp
'$BUILD_DIR/mongo/util/net/network',
]
)


env.CppUnitTest(
target='storage_rocks_index_test',
source=['src/rocks_index_test.cpp'
],
source=[
'src/rocks_index_test.cpp'
],
LIBDEPS=[
'storage_rocks_mock',
'$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness'
Expand All @@ -98,7 +103,7 @@ env.CppUnitTest(
env.CppUnitTest(
target='storage_rocks_record_store_test',
source=[
'src/rocks_record_store_test.cpp'
'src/rocks_record_store_test.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
Expand Down
5 changes: 5 additions & 0 deletions src/KNOWN_ISSUES.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
MongoRocks r4.2.5
1) RocksDB layer bottommost compaction may be triggered frequently with no progress when enableMajorityReadConcern=true, TODO: add a issue somewhere
2) jstests/core/txns/commit_prepared_transaction_errors.js wont pass now because mongo-wt introduced the timestamped-safe unique index, which does dupkey check in wt-layer. mongoRocks does this in mongoRocks layer, which hangs into PrepareConflict error, while mongo-wt throws WriteConflict
3) src/mongo/db/storage/sorted_data_interface_test_dupkeycheck.cpp:TEST(SortedDataInterface, DupKeyCheckWithDuplicates) wont pass, because mongoRocks currently do not have timestamped-safe unique index

MongoRocks r4.0.3
1) RocksDB layer bottommost compaction may be triggered frequently with no progress when enableMajorityReadConcern=true, TODO: add a issue somewhere
41 changes: 29 additions & 12 deletions src/rocks_begin_transaction_block.cpp
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,46 @@

namespace mongo {
RocksBeginTxnBlock::RocksBeginTxnBlock(rocksdb::TOTransactionDB* db,
std::unique_ptr<rocksdb::TOTransaction>* txn)
std::unique_ptr<rocksdb::TOTransaction>* txn,
PrepareConflictBehavior prepareConflictBehavior,
RoundUpPreparedTimestamps roundUpPreparedTimestamps,
RoundUpReadTimestamp roundUpReadTimestamp)
: _db(db) {
invariant(!_rollback);
rocksdb::WriteOptions wOpts;
rocksdb::TOTransactionOptions txnOpts;

if (prepareConflictBehavior == PrepareConflictBehavior::kIgnoreConflicts) {
txnOpts.ignore_prepare = true;
txnOpts.read_only = true;
} else if (prepareConflictBehavior ==
PrepareConflictBehavior::kIgnoreConflictsAllowWrites) {
txnOpts.ignore_prepare = true;
}

if (roundUpPreparedTimestamps == RoundUpPreparedTimestamps::kRound) {
txnOpts.timestamp_round_prepared = true;
}
if (roundUpReadTimestamp == RoundUpReadTimestamp::kRound) {
txnOpts.timestamp_round_read = true;
}

_transaction = _db->BeginTransaction(wOpts, txnOpts);
invariant(_transaction);
txn->reset(_transaction);
_rollback = true;
}

Status RocksBeginTxnBlock::setTimestamp(Timestamp readTs, RoundToOldest roundToOldest) {
RocksBeginTxnBlock::~RocksBeginTxnBlock() {
if (_rollback) {
invariant(_transaction->Rollback().ok());
}
}

Status RocksBeginTxnBlock::setReadSnapshot(Timestamp readTs) {
invariant(_rollback);
rocksdb::RocksTimeStamp ts(readTs.asULL());
auto status =
_transaction->SetReadTimeStamp(ts, roundToOldest == RoundToOldest::kRound ? 1 : 0);
auto status = _transaction->SetReadTimeStamp(ts);
if (!status.ok()) {
if (status.IsInvalidArgument()) {
return Status(ErrorCodes::SnapshotTooOld,
Expand All @@ -61,11 +85,9 @@ namespace mongo {
}
return rocksToMongoStatus(status);
}

status = _transaction->GetReadTimeStamp(&ts);
invariant(status.ok(), status.ToString());
if (roundToOldest != RoundToOldest::kRound) {
invariant(readTs == Timestamp(ts));
}
_readTimestamp = Timestamp(ts);
return Status::OK();
}
Expand All @@ -80,9 +102,4 @@ namespace mongo {
return _readTimestamp;
}

RocksBeginTxnBlock::~RocksBeginTxnBlock() {
if (_rollback) {
invariant(_transaction->Rollback().ok());
}
}
} // namespace mongo
46 changes: 26 additions & 20 deletions src/rocks_begin_transaction_block.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <rocksdb/utilities/totransaction_db.h>
#include "mongo/base/status.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/storage/recovery_unit.h"

namespace mongo {

Expand All @@ -42,42 +43,47 @@ namespace mongo {
*/
class RocksBeginTxnBlock {
public:
// Whether or not to ignore prepared transactions.
enum class IgnorePrepared {
kNoIgnore, // Do not ignore prepared transactions and return prepare conflicts.
kIgnore // Ignore prepared transactions and show prepared, but uncommitted data.
};

// Whether or not to round up to the oldest timestamp when the read timestamp is behind it.
enum class RoundToOldest {
enum class RoundUpReadTimestamp {
kNoRound, // Do not round to the oldest timestamp. BadValue error may be returned.
kRound // Round the read timestamp up to the oldest timestamp when it is behind.
};

RocksBeginTxnBlock(rocksdb::TOTransactionDB* db,
std::unique_ptr<rocksdb::TOTransaction>* txn);
// Dictates whether to round up prepare and commit timestamp of a prepared transaction.
// 'kNoRound' - Does not round up prepare and commit timestamp of a prepared transaction.
// 'kRound' - The prepare timestamp will be rounded up to the oldest timestamp if found to
// be earlier; and the commit timestamp will be rounded up to the prepare timestamp if
// found to be earlier.
enum class RoundUpPreparedTimestamps { kNoRound, kRound };

RocksBeginTxnBlock(
rocksdb::TOTransactionDB* db, std::unique_ptr<rocksdb::TOTransaction>* txn,
PrepareConflictBehavior prepareConflictBehavior,
RoundUpPreparedTimestamps roundUpPreparedTimestamps,
RoundUpReadTimestamp roundUpReadTimestamp = RoundUpReadTimestamp::kNoRound);

~RocksBeginTxnBlock();

/**
* End the begin transaction block. Must be called to ensure the opened transaction
* is not be rolled back.
*/
void done();

/**
* Sets the read timestamp on the opened transaction. Cannot be called after a call to
* done().
*/
Status setTimestamp(Timestamp, RoundToOldest roundToOldest = RoundToOldest::kNoRound);
Status setReadSnapshot(Timestamp);

/* Get the read timestamp on the opened transaction */
Timestamp getTimestamp() const;

/**
* End the begin transaction block. Must be called to ensure the opened transaction
* is not be rolled back.
*/
void done();

private:
rocksdb::TOTransactionDB* _db;
rocksdb::TOTransaction* _transaction;
bool _rollback = false;
Timestamp _readTimestamp;
rocksdb::TOTransactionDB* _db; // not own
rocksdb::TOTransaction* _transaction; // not own
bool _rollback = false; // not own
Timestamp _readTimestamp; // not own
};

} // namespace mongo
36 changes: 19 additions & 17 deletions src/rocks_compaction_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
#include <queue>

#include "mongo/db/client.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/background.h"
#include "mongo/util/concurrency/idle_thread_block.h"
Expand Down Expand Up @@ -170,7 +170,7 @@ namespace mongo {
private:
const RocksCompactionScheduler* _compactionScheduler;
};
} // end of anon namespace
} // namespace

class CompactionBackgroundJob : public BackgroundJob {
public:
Expand Down Expand Up @@ -206,7 +206,7 @@ namespace mongo {
RocksCompactionScheduler* _compactionScheduler; // not owned

bool _compactionThreadRunning = true;
stdx::mutex _compactionMutex;
Mutex _compactionMutex = MONGO_MAKE_LATCH("CompactionBackgroundJob::_compactionMutex");
stdx::condition_variable _compactionWakeUp;
using CompactQueue =
std::priority_queue<CompactOp, std::vector<CompactOp>, std::greater<CompactOp>>;
Expand All @@ -223,7 +223,7 @@ namespace mongo {

CompactionBackgroundJob::~CompactionBackgroundJob() {
{
stdx::lock_guard<stdx::mutex> lk(_compactionMutex);
stdx::lock_guard<Latch> lk(_compactionMutex);
_compactionThreadRunning = false;
// Clean up the queue
while (!_compactionQueue.empty()) {
Expand Down Expand Up @@ -259,11 +259,11 @@ namespace mongo {
private:
T& lk_;
};
}
} // namespace

void CompactionBackgroundJob::run() {
Client::initThread(_name);
stdx::unique_lock<stdx::mutex> lk(_compactionMutex);
stdx::unique_lock<Latch> lk(_compactionMutex);
while (_compactionThreadRunning) {
// check if we have something to compact
if (MONGO_FAIL_POINT(RocksCompactionSchedulerPause)) {
Expand Down Expand Up @@ -293,7 +293,7 @@ namespace mongo {
bool rangeDropped, uint32_t order,
boost::optional<std::shared_ptr<Notification<Status>>> notification) {
{
stdx::lock_guard<stdx::mutex> lk(_compactionMutex);
stdx::lock_guard<Latch> lk(_compactionMutex);
_compactionQueue.push({cf, begin, end, rangeDropped, order, notification});
}
_compactionWakeUp.notify_one();
Expand Down Expand Up @@ -333,7 +333,7 @@ namespace mongo {
(*op._notification)->set(rocksToMongoStatus(s));
}
// Let's leave as quickly as possible if in shutdown
stdx::lock_guard<stdx::mutex> lk(_compactionMutex);
stdx::lock_guard<Latch> lk(_compactionMutex);
if (!_compactionThreadRunning) {
return;
}
Expand Down Expand Up @@ -363,7 +363,7 @@ namespace mongo {
const std::string& prefix) {
bool schedule = false;
{
stdx::lock_guard<stdx::mutex> lk(_lock);
stdx::lock_guard<Latch> lk(_lock);
if (_timer.minutes() >= kMinCompactionIntervalMins) {
schedule = true;
_timer.reset();
Expand Down Expand Up @@ -394,12 +394,12 @@ namespace mongo {
const std::string& begin,
const std::string& end) {
{
stdx::lock_guard<stdx::mutex> lk(_droppedDataMutex);
stdx::lock_guard<Latch> lk(_droppedDataMutex);
invariant(begin <= end);
if (_oplogDeleteUntil != boost::none) {
invariant(cf->GetID() == _oplogDeleteUntil->first);
}
_oplogDeleteUntil = {cf->GetID(), {begin, end}};
_oplogDeleteUntil = std::make_pair(cf->GetID(), std::make_pair(begin, end));
}
auto notification = std::make_shared<Notification<Status>>();
compact(cf, begin, end, false, kOrderOplog, notification);
Expand Down Expand Up @@ -434,7 +434,7 @@ namespace mongo {
}

std::unordered_map<uint32_t, BSONObj> RocksCompactionScheduler::getDroppedPrefixes() const {
stdx::lock_guard<stdx::mutex> lk(_droppedDataMutex);
stdx::lock_guard<Latch> lk(_droppedDataMutex);
// this will copy the set. that way compaction filter has its own copy and doesn't need to
// worry about thread safety
std::unordered_map<uint32_t, BSONObj> ret;
Expand All @@ -451,6 +451,8 @@ namespace mongo {
(uint32_t)get_internal_delete_skipped_count();
int dropped_count = 0;
uint32_t int_prefix = 0;

// NOTE(cuixin): only invoke in rocksengine contruct function, no need check conflict
for (iter->Seek(kDroppedPrefix); iter->Valid() && iter->key().starts_with(kDroppedPrefix);
iter->Next()) {
invariantRocksOK(iter->status());
Expand All @@ -462,7 +464,7 @@ namespace mongo {
bool ok = extractPrefix(prefix, &int_prefix);
invariant(ok);
{
stdx::lock_guard<stdx::mutex> lk(_droppedDataMutex);
stdx::lock_guard<Latch> lk(_droppedDataMutex);
_droppedPrefixes.emplace(int_prefix, BSONObj(iter->value().data()).copy());
}
LOG(1) << "Compacting dropped prefix: " << prefix.ToString(true);
Expand All @@ -479,7 +481,7 @@ namespace mongo {
}

boost::optional<std::pair<uint32_t, std::pair<std::string, std::string>>> RocksCompactionScheduler::getOplogDeleteUntil() const {
stdx::lock_guard<stdx::mutex> lk(_droppedDataMutex);
stdx::lock_guard<Latch> lk(_droppedDataMutex);
return _oplogDeleteUntil;
}

Expand Down Expand Up @@ -509,7 +511,7 @@ namespace mongo {

// instruct compaction filter to start deleting
{
stdx::lock_guard<stdx::mutex> lk(_droppedDataMutex);
stdx::lock_guard<Latch> lk(_droppedDataMutex);
for (const auto& prefix : prefixesToDrop) {
uint32_t int_prefix;
bool ok = extractPrefix(prefix, &int_prefix);
Expand Down Expand Up @@ -542,7 +544,7 @@ namespace mongo {
<< (opSucceeded ? " success" : " failed");
invariant(ok);
{
stdx::lock_guard<stdx::mutex> lk(_droppedDataMutex);
stdx::lock_guard<Latch> lk(_droppedDataMutex);
_droppedPrefixes.erase(int_prefix);
}
if (opSucceeded) {
Expand All @@ -565,4 +567,4 @@ namespace mongo {
}
}
}
}
} // namespace mongo

0 comments on commit a129cd5

Please sign in to comment.