forked from cculianu/Fulcrum
/
BitcoinD.cpp
335 lines (297 loc) · 12.3 KB
/
BitcoinD.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
//
// Fulcrum - A fast & nimble SPV Server for Bitcoin Cash
// Copyright (C) 2019-2020 Calin A. Culianu <calin.culianu@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program (see LICENSE.txt). If not, see
// <https://www.gnu.org/licenses/>.
//
#include <QPointer>
#include "bitcoin/rpc/protocol.h"
#include "BitcoinD.h"
BitcoinDMgr::BitcoinDMgr(const QHostAddress &host, quint16 port,
const QString &user, const QString &pass)
: Mgr(nullptr), IdMixin(newId()), host(host), port(port), user(user), pass(pass)
{
setObjectName("BitcoinDMgr");
_thread.setObjectName(objectName());
}
BitcoinDMgr::~BitcoinDMgr() { cleanup(); }
void BitcoinDMgr::startup() {
Log() << objectName() << ": starting " << N_CLIENTS << " " << Util::Pluralize("bitcoin rpc client", N_CLIENTS) << " ...";
for (auto & client : clients) {
client = std::make_unique<BitcoinD>(host, port, user, pass);
// connect client to us -- TODO: figure out workflow: how requests for work and results will get dispatched
connect(client.get(), &BitcoinD::gotMessage, this, &BitcoinDMgr::on_Message);
connect(client.get(), &BitcoinD::gotErrorMessage, this, &BitcoinDMgr::on_ErrorMessage);
connect(client.get(), &BitcoinD::authenticated, this, [this](BitcoinD *b){
// guard against stale/old signal
if (!b->isGood()) {
Debug() << "got authenticated for id:" << b->id << " but isGood() is false!";
return; // false/stale signal
}
const bool wasEmpty = goodSet.empty();
goodSet.insert(b->id);
if (wasEmpty)
emit gotFirstGoodConnection(b->id);
});
connect(client.get(), &BitcoinD::lostConnection, this, [this](AbstractConnection *c){
// guard against stale/old signal
if (c->isGood()) {
Debug() << "got lostConnection for id:" << c->id << " but isGood() is true!";
return; // false/stale signal
}
goodSet.erase(c->id);
auto constexpr chkTimer = "checkNoMoreBitcoinDs";
// we throttle the spamming of the allConnectionsLost signal via this mechanism
callOnTimerSoonNoRepeat(miniTimeout, chkTimer, [this]{
if (goodSet.empty())
emit allConnectionsLost();
}, true);
});
client->start();
}
start();
Log() << objectName() << ": started ok";
}
void BitcoinDMgr::cleanup() {
stop();
for (auto & client : clients) {
client.reset(); /// implicitly calls client->stop()
}
goodSet.clear();
Debug() << "BitcoinDMgr cleaned up";
}
void BitcoinDMgr::on_Message(quint64 bid, const RPC::Message &msg)
{
if (Trace::isEnabled()) Trace() << "Msg from: " << bid << " method=" << msg.method;
}
void BitcoinDMgr::on_ErrorMessage(quint64 bid, const RPC::Message &msg)
{
Debug() << "ErrMsg from: " << bid << " error=" << msg.errorMessage();
if (msg.errorCode() == bitcoin::RPCErrorCode::RPC_IN_WARMUP) {
emit inWarmUp(msg.errorMessage());
}
}
auto BitcoinDMgr::stats() const -> Stats
{
QVariantList l;
constexpr int timeout = kDefaultTimeout/qMax(N_CLIENTS,1);
for (const auto & client : clients) {
if (!client) continue;
auto map = client->statsSafe(timeout).toMap();
auto name = map.take("name").toString();
l += QVariantMap({{ name, map }});
}
QVariantMap m;
m["rpc clients"] = l;
m["extant request contexts"] = BitcoinDMgrHelper::ReqCtxObj::extant.load();
return m;
}
BitcoinD *BitcoinDMgr::getBitcoinD()
{
BitcoinD *ret = nullptr;
unsigned which = 0;
if (unsigned n = quint32(goodSet.size()); n > 1)
which = QRandomGenerator::system()->bounded(n); // pick a random client in the set (which is an index of the "good clients")
if (!goodSet.empty()) {
// linear search for a bitcoind that is not lastBitcoinDUsed
unsigned i = 0;
for (auto & client : clients) {
if (goodSet.count(client->id) && i++ == which && client->isGood()) {
ret = client.get();
break;
}
}
}
return ret;
}
/// This is safe to call from any thread. Internally it dispatches messages to this obejct's thread.
/// Does not throw. Results/Error/Fail functions are called in the context of the `sender` thread.
/// Returns the BitcoinD->id that was given the message.
void BitcoinDMgr::submitRequest(QObject *sender, const RPC::Message::Id &rid, const QString & method, const QVariantList & params,
const ResultsF & resf, const ErrorF & errf, const FailF & failf)
{
using namespace BitcoinDMgrHelper;
constexpr bool debugDeletes = false; // set this to true to print debug messages tracking all the below object deletions (tested: no leaks!)
// A note about ownership: this context object is owned by the connections below both to ->sender and from bitcoind
// ->context. It will be auto-deleted when the shared_ptr refct held by the lambdas drops to 0. This is guaranteed
// to happen either as a result of a successful request reply, or due to bitcoind failure.
auto context = std::shared_ptr<ReqCtxObj>(new ReqCtxObj, [](ReqCtxObj *context){
if constexpr (debugDeletes) {
Debug() << context->objectName() << " shptr deleter";
connect(context, &QObject::destroyed, qApp, [n=context->objectName()]{ Debug() << n << " destroyed"; }, Qt::DirectConnection);
}
context->deleteLater();
});
context->setObjectName(QString("context for '%1' request id: %2").arg(sender ? sender->objectName() : "").arg(rid.toString()));
connect(context.get(), &ReqCtxObj::results, sender, [context, resf](const RPC::Message &response) {
if (!context->replied.exchange(true) && resf)
resf(response);
context->disconnect(); // kills all lambdas and shared ptr, should cause deleter to execute
});
connect(context.get(), &ReqCtxObj::error, sender, [context, errf](const RPC::Message &response) {
if (!context->replied.exchange(true) && errf)
errf(response);
context->disconnect(); // kills all lambdas and shared ptr, should cause deleter to execute
});
connect(context.get(), &ReqCtxObj::fail, sender, [context, failf](const RPC::Message::Id &origId, const QString & failureReason) {
if (!context->replied.exchange(true) && failf)
failf(origId, failureReason);
context->disconnect(); // kills all lambdas and shared ptr, should cause deleter to execute
});
context->moveToThread(this->thread());
// schedule this ASAP
Util::AsyncOnObject(this, [this, context, rid, method, params] {
auto bd = getBitcoinD();
if (UNLIKELY(!bd)) {
emit context->fail(rid, "Unable to find a good BitcoinD connection");
return;
}
using ConnsList = decltype (context->conns);
static const auto killConns = [](ConnsList & conns) {
for (const auto & conn : conns) {
QObject::disconnect(conn);
}
conns.clear();
};
context->conns +=
connect(bd, &BitcoinD::gotMessage, context.get(), [context, rid](quint64, const RPC::Message &reply){
if (reply.id == rid) {// filter out messages not for us
emit context->results(reply);
killConns(context->conns); // to kill lambdas, shared ptr captures
}
});
context->conns +=
connect(bd, &BitcoinD::gotErrorMessage, context.get(), [context, rid](quint64, const RPC::Message &errMsg){
if (errMsg.id == rid) { // filter out error messages not for us
emit context->error(errMsg);
killConns(context->conns); // to kill lambdas, shared ptr captures
}
});
context->conns +=
connect(bd, &BitcoinD::lostConnection, context.get(), [context, rid](AbstractConnection *){
emit context->fail(rid, "connection lost");
killConns(context->conns); // to kill lambdas, shared ptr captures
});
context->conns +=
connect(bd, &QObject::destroyed, context.get(), [context, rid](QObject *){
emit context->fail(rid, "bitcoind client deleted");
killConns(context->conns); // to kill lambdas, shared ptr captures
});
bd->sendRequest(rid, method, params);
});
// .. aand.. return right away
}
namespace BitcoinDMgrHelper {
/* static */ std::atomic_int ReqCtxObj::extant{0};
ReqCtxObj::ReqCtxObj() : QObject(nullptr) { ++extant; }
ReqCtxObj::~ReqCtxObj() { --extant; }
}
/* --- BitcoinD --- */
auto BitcoinD::stats() const -> Stats
{
auto m = RPC::HttpConnection::stats().toMap();
m["lastPeerError"] = badAuth ? "Auth Failure" : lastPeerError;
m.remove("nErrorsSent"); // should always be 0
m.remove("nNotificationsSent"); // again, 0
m.remove("nResultsSent"); // again, 0
return m;
}
BitcoinD::BitcoinD(const QHostAddress &host, quint16 port, const QString & user, const QString &pass, qint64 maxBuffer)
: RPC::HttpConnection(RPC::MethodMap{}, newId(), nullptr, maxBuffer), host(host), port(port)
{
static int N = 1;
setObjectName(QString("BitcoinD.%1").arg(N++));
_thread.setObjectName(objectName());
setAuth(user, pass);
setV1(true); // bitcoind uses jsonrpc v1
pingtime_ms = 10000;
stale_threshold = pingtime_ms * 2;
connectMiscSignals();
}
BitcoinD::~BitcoinD()
{
stop();
}
void BitcoinD::connectMiscSignals()
{
connect(this, &BitcoinD::gotMessage, this, [this]{
// this hook emits "authenticated" as soon as we get a good result message via 'do_ping' initiated from 'on_connected' below
if (needAuth || badAuth) {
needAuth = badAuth = false;
emit authenticated(this);
}
});
}
bool BitcoinD::isGood() const
{
return !badAuth && !needAuth && RPC::HttpConnection::isGood();
}
void BitcoinD::on_started()
{
ThreadObjectMixin::on_started();
{ // setup the "reconnect timer"
constexpr auto reconnectTimer = "reconnectTimer";
const auto SetTimer = [this] {
callOnTimerSoon(5000, reconnectTimer, [this]{
if (!isGood()) {
Debug() << prettyName() << " reconnecting...";
reconnect();
return true; // keep the timer alive
}
return false; // kill timer
});
};
conns += connect(this, &BitcoinD::lostConnection, this, [SetTimer]{
Log() << "Lost connection to bitcoind, will retry every 5 seconds ...";
SetTimer();
});
conns += connect(this, &BitcoinD::authFailure, this, [SetTimer, this] {
Error() << "Authentication to bitcoind rpc failed. Please check the rpcuser and rpcpass are correct and restart!";
badAuth = true;
SetTimer();
});
conns += connect(this, &BitcoinD::authenticated, this, [this] { stopTimer(reconnectTimer); });
SetTimer();
}
reconnect();
}
void BitcoinD::reconnect()
{
if (socket) delete socket;
socket = new QTcpSocket(this);
socketConnectSignals();
socket->connectToHost(host, port);
}
void BitcoinD::on_connected()
{
RPC::HttpConnection::on_connected();
lastGood = Util::getTime();
nSent = nReceived = 0;
lastPeerError.clear();
lastSocketError.clear();
badAuth = false;
needAuth = true;
emit connected(this);
// note that the 'authenticated' signal is only emitted after good auth is confirmed via the reply from the do_ping below
do_ping();
}
void BitcoinD::do_ping()
{
if (isStale()) {
Debug() << "Stale connection, reconnecting.";
reconnect();
} else
emit sendRequest(newId(), "ping");
}