Skip to content

Commit

Permalink
Merge #5954: refactor: significant Mutex refactoring
Browse files Browse the repository at this point in the history
acd0f49 refactor: significant Mutex refactoring (pasta)

Pull request description:

  ## Issue being fixed or feature implemented
  Don't use generic names; recursive mutexes where not needed; etc

  ## What was done?
  Includes:
  RecursiveMutex -> Mutex,
  renaming of `cs` to something more meaningful,
  usage of atomics where trivially possible,
  introduce a method CQuorum::SetVerificationVector to avoid needing to lock an internal mutex externally

  ## How Has This Been Tested?
  Compiling

  ## Breaking Changes
  None

  ## Checklist:
    _Go over all the following points, and put an `x` in all the boxes that apply._
  - [ ] I have performed a self-review of my own code
  - [ ] I have commented my code, particularly in hard-to-understand areas
  - [ ] I have added or updated relevant unit/integration/functional/e2e tests
  - [ ] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

Top commit has no ACKs.

Tree-SHA512: 76d0ee37e348680bdcd8f03237d3fc1febbf908a9c13e6ddea7be52a35adfca35cde3001ce6ecb140d7dba950ad19519d34d137de17a073306e3e7b26cb95b70
  • Loading branch information
PastaPastaPasta committed May 14, 2024
2 parents 26cfbb0 + acd0f49 commit 806fc73
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 152 deletions.
10 changes: 5 additions & 5 deletions src/llmq/debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, const Chainsta

void CDKGDebugManager::GetLocalDebugStatus(llmq::CDKGDebugStatus& ret) const
{
LOCK(cs);
LOCK(cs_lockStatus);
ret = localStatus;
}

void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand All @@ -157,7 +157,7 @@ void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int

void CDKGDebugManager::InitLocalSessionStatus(const Consensus::LLMQParams& llmqParams, int quorumIndex, const uint256& quorumHash, int quorumHeight)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqParams.type, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand All @@ -176,7 +176,7 @@ void CDKGDebugManager::InitLocalSessionStatus(const Consensus::LLMQParams& llmqP

void CDKGDebugManager::UpdateLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex, std::function<bool(CDKGDebugSessionStatus& status)>&& func)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand All @@ -190,7 +190,7 @@ void CDKGDebugManager::UpdateLocalSessionStatus(Consensus::LLMQType llmqType, in

void CDKGDebugManager::UpdateLocalMemberStatus(Consensus::LLMQType llmqType, int quorumIndex, size_t memberIdx, std::function<bool(CDKGDebugMemberStatus& status)>&& func)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand Down
4 changes: 2 additions & 2 deletions src/llmq/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class CDKGDebugStatus
class CDKGDebugManager
{
private:
mutable RecursiveMutex cs;
CDKGDebugStatus localStatus GUARDED_BY(cs);
mutable Mutex cs_lockStatus;
CDKGDebugStatus localStatus GUARDED_BY(cs_lockStatus);

public:
CDKGDebugManager();
Expand Down
8 changes: 1 addition & 7 deletions src/llmq/dkgsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@ bool CDKGSession::PreVerifyMessage(const CDKGContribution& qc, bool& retBan) con

void CDKGSession::ReceiveMessage(const CDKGContribution& qc, bool& retBan)
{
LOCK(cs_pending);

CDKGLogger logger(*this, __func__, __LINE__);

retBan = false;
Expand Down Expand Up @@ -336,15 +334,11 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc, bool& retBan)

logger.Batch("decrypted our contribution share. time=%d", t2.count());

bool verifyPending = false;
receivedSkContributions[member->idx] = skContribution;
vecEncryptedContributions[member->idx] = qc.contributions;
LOCK(cs_pending);
pendingContributionVerifications.emplace_back(member->idx);
if (pendingContributionVerifications.size() >= 32) {
verifyPending = true;
}

if (verifyPending) {
VerifyPendingContributions();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/llmq/dkgsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ class CDKGSession
// we expect to only receive a single vvec and contribution per member, but we must also be able to relay
// conflicting messages as otherwise an attacker might be able to broadcast conflicting (valid+invalid) messages
// and thus split the quorum. Such members are later removed from the quorum.
mutable RecursiveMutex invCs;
mutable Mutex invCs;
std::map<uint256, CDKGContribution> contributions GUARDED_BY(invCs);
std::map<uint256, CDKGComplaint> complaints GUARDED_BY(invCs);
std::map<uint256, CDKGJustification> justifications GUARDED_BY(invCs);
std::map<uint256, CDKGPrematureCommitment> prematureCommitments GUARDED_BY(invCs);

mutable RecursiveMutex cs_pending;
mutable Mutex cs_pending;
std::vector<size_t> pendingContributionVerifications GUARDED_BY(cs_pending);

// filled by ReceivePrematureCommitment and used by FinalizeCommitments
Expand Down
60 changes: 26 additions & 34 deletions src/llmq/dkgsessionhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman,
EraseObjectRequest(from, CInv(invType, hash));
}

LOCK(cs);
LOCK(cs_messages);

if (messagesPerNode[from] >= maxMessagesPerNode) {
// TODO ban?
Expand All @@ -95,7 +95,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman,

std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMessages(size_t maxCount)
{
LOCK(cs);
LOCK(cs_messages);

std::list<BinaryMessage> ret;
while (!pendingMessages.empty() && ret.size() < maxCount) {
Expand All @@ -108,7 +108,7 @@ std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMes

bool CDKGPendingMessages::HasSeen(const uint256& hash) const
{
LOCK(cs);
LOCK(cs_messages);
return seenMessages.count(hash) != 0;
}

Expand All @@ -120,7 +120,7 @@ void CDKGPendingMessages::Misbehaving(const NodeId from, const int score)

void CDKGPendingMessages::Clear()
{
LOCK(cs);
LOCK(cs_messages);
pendingMessages.clear();
messagesPerNode.clear();
seenMessages.clear();
Expand All @@ -135,7 +135,7 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
if (quorumIndex > 0 && !IsQuorumRotationEnabled(params, pindexNew)) {
return;
}
LOCK(cs);
LOCK(cs_phase_qhash);

int quorumStageInt = (pindexNew->nHeight - quorumIndex) % params.dkgInterval;

Expand Down Expand Up @@ -207,7 +207,7 @@ bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex)

std::pair<QuorumPhase, uint256> CDKGSessionHandler::GetPhaseAndQuorumHash() const
{
LOCK(cs);
LOCK(cs_phase_qhash);
return std::make_pair(phase, quorumHash);
}

Expand Down Expand Up @@ -304,9 +304,8 @@ void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,

int64_t sleepTime = (int64_t)(adjustedPhaseSleepTimePerMember * curSession->GetMyMemberIndex().value_or(0));
int64_t endTime = GetTimeMillis() + sleepTime;
int heightTmp{-1};
int heightStart{-1};
heightTmp = heightStart = WITH_LOCK(cs, return currentHeight);
int heightTmp{currentHeight.load()};
int heightStart{heightTmp};

LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - starting sleep for %d ms, curPhase=%d\n", __func__, params.name, quorumIndex, sleepTime, ToUnderlying(curPhase));

Expand All @@ -315,22 +314,20 @@ void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due to stop/shutdown requested\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
}
{
LOCK(cs);
if (currentHeight > heightTmp) {
// New block(s) just came in
int64_t expectedBlockTime = (currentHeight - heightStart) * Params().GetConsensus().nPowTargetSpacing * 1000;
if (expectedBlockTime > sleepTime) {
// Blocks came faster than we expected, jump into the phase func asap
break;
}
heightTmp = currentHeight;
}
if (phase != curPhase || quorumHash != expectedQuorumHash) {
// Something went wrong and/or we missed quite a few blocks and it's just too late now
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
auto cur_height = currentHeight.load();
if (cur_height > heightTmp) {
// New block(s) just came in
int64_t expectedBlockTime = (cur_height - heightStart) * Params().GetConsensus().nPowTargetSpacing * 1000;
if (expectedBlockTime > sleepTime) {
// Blocks came faster than we expected, jump into the phase func asap
break;
}
heightTmp = cur_height;
}
if (WITH_LOCK(cs_phase_qhash, return phase != curPhase || quorumHash != expectedQuorumHash)) {
// Something went wrong and/or we missed quite a few blocks and it's just too late now
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
}
if (!runWhileWaiting()) {
UninterruptibleSleep(std::chrono::milliseconds{100});
Expand Down Expand Up @@ -505,18 +502,13 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi

void CDKGSessionHandler::HandleDKGRound()
{
uint256 curQuorumHash;

WaitForNextPhase(std::nullopt, QuorumPhase::Initialized);

{
LOCK(cs);
pendingContributions.Clear();
pendingComplaints.Clear();
pendingJustifications.Clear();
pendingPrematureCommitments.Clear();
curQuorumHash = quorumHash;
}
pendingContributions.Clear();
pendingComplaints.Clear();
pendingJustifications.Clear();
pendingPrematureCommitments.Clear();
uint256 curQuorumHash = WITH_LOCK(cs_phase_qhash, return quorumHash);

const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(cs_main, return m_chainstate.m_blockman.LookupBlockIndex(curQuorumHash));

Expand Down
18 changes: 9 additions & 9 deletions src/llmq/dkgsessionhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class CDKGPendingMessages
using BinaryMessage = std::pair<NodeId, std::shared_ptr<CDataStream>>;

private:
mutable RecursiveMutex cs;
std::atomic<PeerManager*> m_peerman{nullptr};
const int invType;
size_t maxMessagesPerNode GUARDED_BY(cs);
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs);
std::map<NodeId, size_t> messagesPerNode GUARDED_BY(cs);
std::set<uint256> seenMessages GUARDED_BY(cs);
const size_t maxMessagesPerNode;
mutable Mutex cs_messages;
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs_messages);
std::map<NodeId, size_t> messagesPerNode GUARDED_BY(cs_messages);
std::set<uint256> seenMessages GUARDED_BY(cs_messages);

public:
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
Expand Down Expand Up @@ -117,7 +117,6 @@ class CDKGSessionHandler
friend class CDKGSessionManager;

private:
mutable RecursiveMutex cs;
std::atomic<bool> stopRequested{false};

CBLSWorker& blsWorker;
Expand All @@ -134,9 +133,10 @@ class CDKGSessionHandler
const Consensus::LLMQParams params;
const int quorumIndex;

QuorumPhase phase GUARDED_BY(cs) {QuorumPhase::Idle};
int currentHeight GUARDED_BY(cs) {-1};
uint256 quorumHash GUARDED_BY(cs);
std::atomic<int> currentHeight {-1};
mutable Mutex cs_phase_qhash;
QuorumPhase phase GUARDED_BY(cs_phase_qhash) {QuorumPhase::Idle};
uint256 quorumHash GUARDED_BY(cs_phase_qhash);

std::unique_ptr<CDKGSession> curSession;
std::thread phaseHandlerThread;
Expand Down
8 changes: 4 additions & 4 deletions src/llmq/dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ bool CDKGSessionManager::GetContribution(const uint256& hash, CDKGContribution&

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Initialized || dkgType.phase > QuorumPhase::Contribute) {
continue;
}
Expand All @@ -314,7 +314,7 @@ bool CDKGSessionManager::GetComplaint(const uint256& hash, CDKGComplaint& ret) c

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Contribute || dkgType.phase > QuorumPhase::Complain) {
continue;
}
Expand All @@ -335,7 +335,7 @@ bool CDKGSessionManager::GetJustification(const uint256& hash, CDKGJustification

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Complain || dkgType.phase > QuorumPhase::Justify) {
continue;
}
Expand All @@ -356,7 +356,7 @@ bool CDKGSessionManager::GetPrematureCommitment(const uint256& hash, CDKGPrematu

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Justify || dkgType.phase > QuorumPhase::Commit) {
continue;
}
Expand Down

0 comments on commit 806fc73

Please sign in to comment.