Skip to content

Commit

Permalink
Merge pull request #201 from eharry/cx_use_official_rocksdb_commit
Browse files Browse the repository at this point in the history
set mbd_catalog as no timestamp collection and change unique index key format: add the record id in key
  • Loading branch information
wolfkdy committed Jun 9, 2022
2 parents 38f3f8f + b24108c commit 6f95928
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 52 deletions.
5 changes: 4 additions & 1 deletion src/rocks_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ namespace mongo {
if (ns.startsWith("local.")) {
return false;
}
if (ns == "_mdb_catalog") {
return false;
}
return true;
}
} // namespace
Expand Down Expand Up @@ -578,7 +581,7 @@ namespace mongo {
index = new RocksUniqueIndex(_db.get(), _defaultCf.get(), prefix, ident.toString(),
Ordering::make(desc->keyPattern()), std::move(config),
desc->parentNS().toString(), desc->indexName(),
desc->keyPattern(), desc->isPartial());
desc->keyPattern(), desc->isPartial(), desc->isIdIndex());
} else {
auto si = new RocksStandardIndex(_db.get(), _defaultCf.get(), prefix, ident.toString(),
Ordering::make(desc->keyPattern()), std::move(config));
Expand Down
200 changes: 172 additions & 28 deletions src/rocks_index.cpp

Large diffs are not rendered by default.

21 changes: 20 additions & 1 deletion src/rocks_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ namespace mongo {
RocksUniqueIndex(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, std::string prefix,
std::string ident, Ordering order, const BSONObj& config,
std::string collectionNamespace, std::string indexName,
const BSONObj& keyPattern, bool partial = false);
const BSONObj& keyPattern, bool partial = false, bool isIdIdx = false);

virtual StatusWith<SpecialFormatInserted> insert(OperationContext* opCtx,
const BSONObj& key, const RecordId& loc,
Expand All @@ -121,10 +121,29 @@ namespace mongo {
bool dupsAllowed) override;

private:
StatusWith<SpecialFormatInserted> _insertTimestampSafe(OperationContext* opCtx,
const BSONObj& key,
const RecordId& loc,
bool dupsAllowed);

StatusWith<SpecialFormatInserted> _insertTimestampUnsafe(OperationContext* opCtx,
const BSONObj& key,
const RecordId& loc,
bool dupsAllowed);

void _unindexTimestampUnsafe(OperationContext* opCtx, const BSONObj& key,
const RecordId& loc, bool dupsAllowed);

void _unindexTimestampSafe(OperationContext* opCtx, const BSONObj& key, const RecordId& loc,
bool dupsAllowed);

bool _keyExistsTimestampSafe(OperationContext* opCtx, const KeyString& prefixedKey);

std::string _collectionNamespace;
std::string _indexName;
const BSONObj _keyPattern;
const bool _partial;
const bool _isIdIndex;
};

class RocksStandardIndex : public RocksIndexBase {
Expand Down
3 changes: 3 additions & 0 deletions src/totdb/totransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class TOTransaction {
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0;
virtual Status Delete(const Slice& key) = 0;

virtual Status GetForUpdate(ColumnFamilyHandle* column_family, const Slice& key) = 0;
virtual Status GetForUpdate(const Slice& key) = 0;

virtual WriteBatchWithIndex* GetWriteBatch() = 0;

virtual Status SetName(const TransactionName& name) = 0;
Expand Down
50 changes: 32 additions & 18 deletions src/totdb/totransaction_db_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,8 @@ Status TOTransactionDBImpl::SetPrepareTimeStamp(
}

Status TOTransactionDBImpl::CommitTransaction(
const std::shared_ptr<ATN>& core, const std::set<TxnKey>& written_keys) {
const std::shared_ptr<ATN>& core, const std::set<TxnKey>& written_keys,
const std::set<TxnKey>& get_for_updates) {
TransactionID max_to_clean_txn_id = 0;
RocksTimeStamp max_to_clean_ts = 0;
RocksTimeStamp candidate_durable_timestamp = 0;
Expand Down Expand Up @@ -883,20 +884,27 @@ Status TOTransactionDBImpl::CommitTransaction(

AdvanceTS(&max_to_clean_ts);

// Move Uncommited keys for this txn to committed keys
std::map<size_t, std::set<TxnKey>> stripe_keys_map;
// Move uncommited keys for this txn to committed keys
std::map<size_t, std::set<TxnKey>> stripe_commit_keys_map;
auto keys_iter = written_keys.begin();
while (keys_iter != written_keys.end()) {
auto stripe_num = GetStripe(*keys_iter);
if (stripe_keys_map.find(stripe_num) == stripe_keys_map.end()) {
stripe_keys_map[stripe_num] = {};
if (stripe_commit_keys_map.find(stripe_num) == stripe_commit_keys_map.end()) {
stripe_commit_keys_map[stripe_num] = {};
}
stripe_keys_map[stripe_num].insert(std::move(*keys_iter));
stripe_commit_keys_map[stripe_num].insert(std::move(*keys_iter));
keys_iter++;
}

auto stripe_keys_iter = stripe_keys_map.begin();
while (stripe_keys_iter != stripe_keys_map.end()) {
// remove get_for_update keys
std::map<size_t, std::set<TxnKey>> stripe_get_for_update_keys_map;
for (const auto& k : get_for_updates) {
auto stripe_num = GetStripe(k);
stripe_get_for_update_keys_map[stripe_num].insert(k);
}

auto stripe_keys_iter = stripe_commit_keys_map.begin();
while (stripe_keys_iter != stripe_commit_keys_map.end()) {
std::lock_guard<std::mutex> lock(*keys_mutex_[stripe_keys_iter->first]);
// the key in one txn insert to the CK with the max commit ts
for (auto& key : stripe_keys_iter->second) {
Expand All @@ -909,6 +917,13 @@ Status TOTransactionDBImpl::CommitTransaction(
stripe_keys_iter++;
}

for (const auto& stripe : stripe_get_for_update_keys_map) {
std::lock_guard<std::mutex> lock(*keys_mutex_[stripe.first]);
for (const auto& key : stripe.second) {
uncommitted_keys_.RemoveKeyInLock(key, stripe.first, &current_conflict_bytes_);
}
}

LOG(2) << "TOTDB end commit txn id " << core->txn_id_
<< " cid " << core->commit_txn_id_
<< " commit ts " << core->commit_ts_;
Expand All @@ -933,7 +948,8 @@ Status TOTransactionDBImpl::CommitTransaction(
}

Status TOTransactionDBImpl::RollbackTransaction(
const std::shared_ptr<ATN>& core, const std::set<TxnKey>& written_keys) {
const std::shared_ptr<ATN>& core, const std::set<TxnKey>& written_keys,
const std::set<TxnKey>& get_for_updates) {
LOG(2) << "TOTDB start to rollback txn id " << core->txn_id_;
auto state = core->state_.load(std::memory_order_relaxed);
invariant(state == TOTransaction::kStarted || state == TOTransaction::kPrepared);
Expand Down Expand Up @@ -971,15 +987,13 @@ Status TOTransactionDBImpl::RollbackTransaction(

// Remove written keys from uncommitted keys
std::map<size_t, std::set<TxnKey>> stripe_keys_map;
auto keys_iter = written_keys.begin();
while (keys_iter != written_keys.end()) {
auto stripe_num = GetStripe(*keys_iter);
if (stripe_keys_map.find(stripe_num) == stripe_keys_map.end()) {
stripe_keys_map[stripe_num] = {};
}
stripe_keys_map[stripe_num].insert(std::move(*keys_iter));

keys_iter++;
for (const auto& key : written_keys) {
auto stripe_num = GetStripe(key);
stripe_keys_map[stripe_num].insert(key);
}
for (const auto& key : get_for_updates) {
auto stripe_num = GetStripe(key);
stripe_keys_map[stripe_num].insert(key);
}

auto stripe_keys_iter = stripe_keys_map.begin();
Expand Down
6 changes: 4 additions & 2 deletions src/totdb/totransaction_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ class TOTransactionDBImpl : public TOTransactionDB {

using ATN = TOTransactionImpl::ActiveTxnNode;
Status CommitTransaction(const std::shared_ptr<ATN>& core,
const std::set<TxnKey>& written_keys);
const std::set<TxnKey>& written_keys,
const std::set<TxnKey>& get_for_updates);

Status RollbackTransaction(const std::shared_ptr<ATN>& core,
const std::set<TxnKey>& written_keys);
const std::set<TxnKey>& written_keys,
const std::set<TxnKey>& get_for_updates);

Status SetTimeStamp(const TimeStampType& ts_type, const RocksTimeStamp& ts,
bool force) override;
Expand Down
32 changes: 30 additions & 2 deletions src/totdb/totransaction_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,34 @@ Status TOTransactionImpl::Delete(const Slice& key) {
return Delete(db_->DefaultColumnFamily(), key);
}

Status TOTransactionImpl::GetForUpdate(ColumnFamilyHandle* column_family, const Slice& key) {
if (txn_db_impl_->IsReadOnly()) {
return Status::NotSupported("readonly db cannot accept del");
}
if (core_->state_ >= kPrepared) {
return Status::NotSupported("txn is already prepared, committed rollback");
}
if (core_->read_only_) {
if (core_->ignore_prepare_) {
return Status::NotSupported(
"Transactions with ignore_prepare=true cannot perform updates");
}
return Status::NotSupported("Attempt to update in a read-only transaction");
}

const TxnKey txn_key(column_family->GetID(), key.ToString());
Status s = CheckWriteConflict(txn_key);

if (s.ok()) {
get_for_updates_.emplace(std::move(txn_key));
}
return s;
}

Status TOTransactionImpl::GetForUpdate(const Slice& key) {
return GetForUpdate(db_->DefaultColumnFamily(), key);
}

Iterator* TOTransactionImpl::GetIterator(ReadOptions& read_options) {
return GetIterator(read_options, db_->DefaultColumnFamily());
}
Expand Down Expand Up @@ -370,7 +398,7 @@ Status TOTransactionImpl::Commit(std::function<void()>* hook) {
// Move uncommitted keys to committed keys,
// Clean data when the committed txn is activeTxnSet's header
// TODO(xxxxxxxx): in fact, here we must not fail
s = txn_db_impl_->CommitTransaction(core_, written_keys_);
s = txn_db_impl_->CommitTransaction(core_, written_keys_, get_for_updates_);
} else {
s = Status::InvalidArgument("Transaction is fail for commit.");
}
Expand All @@ -390,7 +418,7 @@ Status TOTransactionImpl::Rollback() {

// Change active txn set,
// Clean uncommitted keys
Status s = txn_db_impl_->RollbackTransaction(core_, written_keys_);
Status s = txn_db_impl_->RollbackTransaction(core_, written_keys_, get_for_updates_);

GetWriteBatch()->Clear();

Expand Down
6 changes: 6 additions & 0 deletions src/totdb/totransaction_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ class TOTransactionImpl : public TOTransaction {

virtual Status Delete(const Slice& key) override;

virtual Status GetForUpdate(ColumnFamilyHandle* column_family, const Slice& key) override;

virtual Status GetForUpdate(const Slice& key) override;

virtual Status SetName(const TransactionName& name) override;

virtual TransactionID GetID() const override;
Expand Down Expand Up @@ -144,6 +148,8 @@ class TOTransactionImpl : public TOTransaction {
// this
std::set<TxnKey> written_keys_;

std::set<TxnKey> get_for_updates_;

DB* db_;
TOTransactionDBImpl* txn_db_impl_;

Expand Down

0 comments on commit 6f95928

Please sign in to comment.