Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kostasrim committed May 9, 2024
1 parent 5c8b215 commit 40e2eaa
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ void Connection::HandleMigrateRequest() {
this->Migrate(dest);
}

// This triggers on rueidis SingleIntegrationTest
// DCHECK(dispatch_q_.empty());

// In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started.
Expand Down Expand Up @@ -1325,6 +1326,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
while (!builder->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
evc_.await(
[this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); });
if (cc_->conn_closing)
Expand Down
13 changes: 6 additions & 7 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, facade::Connection* own

ConnectionContext::ConnectionContext(ConnectionContext* owner, Transaction* tx,
facade::CapturingReplyBuilder* crb)
: facade::ConnectionContext(nullptr, nullptr), transaction{tx}, parent_cntx_(owner) {
: facade::ConnectionContext(nullptr, nullptr), transaction{tx} {
acl_commands = std::vector<uint64_t>(acl::NumberOfFamilies(), acl::ALL_COMMANDS);
if (tx) { // If we have a carrier transaction, this context is used for squashing
DCHECK(owner);
Expand Down Expand Up @@ -296,14 +296,13 @@ OpResult<void> OpTrackKeys(const OpArgs slice_args, const facade::Connection::We

void ConnectionState::ClientTracking::Track(ConnectionContext* cntx, const CommandId* cid) {
auto& info = cntx->conn_state.tracking_info_;
auto shards = cntx->transaction->GetActiveShards();
if ((cid->opt_mask() & CO::READONLY) && cid->IsTransactional() && info.ShouldTrackKeys()) {
auto conn = cntx->parent_cntx_ ? cntx->parent_cntx_->conn()->Borrow() : cntx->conn()->Borrow();
auto conn = cntx->conn()->Borrow();
auto cb = [&, conn](unsigned i, auto* pb) {
if (shards.find(i) != shards.end()) {
auto* t = cntx->transaction;
CHECK(t);
auto* shard = EngineShard::tlocal();
auto* t = cntx->transaction;
CHECK(t);
auto* shard = EngineShard::tlocal();
if (shard && t->IsActive(i)) {
OpTrackKeys(t->GetOpArgs(shard), conn, t->GetShardArgs(shard->shard_id()));
}
};
Expand Down
1 change: 0 additions & 1 deletion src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ class ConnectionContext : public facade::ConnectionContext {
// TODO: to introduce proper accessors.
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;
ConnectionContext* parent_cntx_ = nullptr;

ConnectionState conn_state;

Expand Down
3 changes: 2 additions & 1 deletion src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,8 @@ void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
return;
}
auto& client_set = it->second;
// notify all the clients.
// notify all the clients. copy key because we dispatch briefly below and we need to preserve
// lifetime
auto cb = [key = std::string(key), client_set = std::move(client_set)](unsigned idx,
util::ProactorBase*) {
for (auto& client : client_set) {
Expand Down
6 changes: 5 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo

auto* trans = cntx->transaction;
if (trans) {
cntx->transaction->SetConnectionContextAndInvokeCid(cntx);
cntx->transaction->SetConnectionContext(cntx);
}

#ifndef NDEBUG
Expand Down Expand Up @@ -1372,6 +1372,10 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
for (auto args : args_list) {
ToUpper(&args[0]);
const auto [cid, tail_args] = FindCmd(args);
// is client tracking command
if (cid->name() == "CLIENT" && !tail_args.empty() && ToSV(tail_args[0]) == "TRACKING") {
break;
}

// MULTI...EXEC commands need to be collected into a single context, so squashing is not
// possible
Expand Down
8 changes: 1 addition & 7 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,16 +363,10 @@ class Transaction {
return shard_data_[SidToId(sid)].local_mask;
}

void SetConnectionContextAndInvokeCid(ConnectionContext* cntx) {
void SetConnectionContext(ConnectionContext* cntx) {
cntx_ = cntx;
}

std::set<unsigned> GetActiveShards() {
std::set<unsigned> active_shards;
IterateActiveShards([&](const auto& sd, ShardId i) mutable { active_shards.insert(i); });
return active_shards;
}

private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
struct LockCnt {
Expand Down

0 comments on commit 40e2eaa

Please sign in to comment.