Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: Merge bitcoin#20773, 22147, 21800, #5751

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions contrib/message-capture/message-capture-docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Per-Peer Message Capture

## Purpose

This feature allows for message capture on a per-peer basis. It answers the simple question: "Can I see what messages my node is sending and receiving?"

## Usage and Functionality

* Run `bitcoind` with the `-capturemessages` option.
* Look in the `message_capture` folder in your datadir.
* Typically this will be `~/.bitcoin/message_capture`.
* See that there are many folders inside, one for each peer names with its IP address and port.
* Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`).
* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments.
* See the `-h` option for help.
* To see all messages, both sent and received, for all peers use:
```
./contrib/message-capture/message-capture-parser.py -o out.json \
~/.bitcoin/message_capture/**/*.dat
```
* Note: The messages in the given `.dat` files will be interleaved in chronological order. So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order.
* If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`.
* View the resulting output.
* The output file is `JSON` formatted.
* Suggestion: use `jq` to view the output, with `jq . out.json`
214 changes: 214 additions & 0 deletions contrib/message-capture/message-capture-parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#!/usr/bin/env python3
# Copyright (c) 2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Parse message capture binary files. To be used in conjunction with -capturemessages."""

import argparse
import os
import shutil
import sys
from io import BytesIO
import json
from pathlib import Path
from typing import Any, List, Optional

sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))

from test_framework.messages import ser_uint256 # noqa: E402
from test_framework.p2p import MESSAGEMAP # noqa: E402

TIME_SIZE = 8
LENGTH_SIZE = 4
MSGTYPE_SIZE = 12

# The test framework classes stores hashes as large ints in many cases.
# These are variables of type uint256 in core.
# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
# As such, they are itemized here.
# Any variables with these names that are of type int are actually uint256 variables.
# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
HASH_INTS = [
"blockhash",
"block_hash",
"hash",
"hashMerkleRoot",
"hashPrevBlock",
"hashstop",
"prev_header",
"sha256",
"stop_hash",
]

HASH_INT_VECTORS = [
"hashes",
"headers",
"vHave",
"vHash",
]


class ProgressBar:
def __init__(self, total: float):
self.total = total
self.running = 0

def set_progress(self, progress: float):
cols = shutil.get_terminal_size()[0]
if cols <= 12:
return
max_blocks = cols - 9
num_blocks = int(max_blocks * progress)
print('\r[ {}{} ] {:3.0f}%'
.format('#' * num_blocks,
' ' * (max_blocks - num_blocks),
progress * 100),
end ='')

def update(self, more: float):
self.running += more
self.set_progress(self.running / self.total)


def to_jsonable(obj: Any) -> Any:
if hasattr(obj, "__dict__"):
return obj.__dict__
elif hasattr(obj, "__slots__"):
ret = {} # type: Any
for slot in obj.__slots__:
val = getattr(obj, slot, None)
if slot in HASH_INTS and isinstance(val, int):
ret[slot] = ser_uint256(val).hex()
elif slot in HASH_INT_VECTORS and isinstance(val[0], int):
ret[slot] = [ser_uint256(a).hex() for a in val]
else:
ret[slot] = to_jsonable(val)
return ret
elif isinstance(obj, list):
return [to_jsonable(a) for a in obj]
elif isinstance(obj, bytes):
return obj.hex()
else:
return obj


def process_file(path: str, messages: List[Any], recv: bool, progress_bar: Optional[ProgressBar]) -> None:
with open(path, 'rb') as f_in:
if progress_bar:
bytes_read = 0

while True:
if progress_bar:
# Update progress bar
diff = f_in.tell() - bytes_read - 1
progress_bar.update(diff)
bytes_read = f_in.tell() - 1

# Read the Header
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
if not tmp_header_raw:
break
tmp_header = BytesIO(tmp_header_raw)
time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") # type: int
msgtype = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0] # type: bytes
length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") # type: int

# Start converting the message to a dictionary
msg_dict = {}
msg_dict["direction"] = "recv" if recv else "sent"
msg_dict["time"] = time
msg_dict["size"] = length # "size" is less readable here, but more readable in the output

msg_ser = BytesIO(f_in.read(length))

# Determine message type
if msgtype not in MESSAGEMAP:
# Unrecognized message type
try:
msgtype_tmp = msgtype.decode()
if not msgtype_tmp.isprintable():
raise UnicodeDecodeError
msg_dict["msgtype"] = msgtype_tmp
except UnicodeDecodeError:
msg_dict["msgtype"] = "UNREADABLE"
msg_dict["body"] = msg_ser.read().hex()
msg_dict["error"] = "Unrecognized message type."
messages.append(msg_dict)
print(f"WARNING - Unrecognized message type {msgtype} in {path}", file=sys.stderr)
continue

# Deserialize the message
msg = MESSAGEMAP[msgtype]()
msg_dict["msgtype"] = msgtype.decode()

try:
msg.deserialize(msg_ser)
except KeyboardInterrupt:
raise
except Exception:
# Unable to deserialize message body
msg_ser.seek(0, os.SEEK_SET)
msg_dict["body"] = msg_ser.read().hex()
msg_dict["error"] = "Unable to deserialize message."
messages.append(msg_dict)
print(f"WARNING - Unable to deserialize message in {path}", file=sys.stderr)
continue

# Convert body of message into a jsonable object
if length:
msg_dict["body"] = to_jsonable(msg)
messages.append(msg_dict)

if progress_bar:
# Update the progress bar to the end of the current file
# in case we exited the loop early
f_in.seek(0, os.SEEK_END) # Go to end of file
diff = f_in.tell() - bytes_read - 1
progress_bar.update(diff)


def main():
parser = argparse.ArgumentParser(
description=__doc__,
epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(sys.argv[0]),
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
"capturepaths",
nargs='+',
help="binary message capture files to parse.")
parser.add_argument(
"-o", "--output",
help="output file. If unset print to stdout")
parser.add_argument(
"-n", "--no-progress-bar",
action='store_true',
help="disable the progress bar. Automatically set if the output is not a terminal")
args = parser.parse_args()
capturepaths = [Path.cwd() / Path(capturepath) for capturepath in args.capturepaths]
output = Path.cwd() / Path(args.output) if args.output else False
use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()

messages = [] # type: List[Any]
if use_progress_bar:
total_size = sum(capture.stat().st_size for capture in capturepaths)
progress_bar = ProgressBar(total_size)
else:
progress_bar = None

for capture in capturepaths:
process_file(str(capture), messages, "recv" in capture.stem, progress_bar)

messages.sort(key=lambda msg: msg['time'])

if use_progress_bar:
progress_bar.set_progress(1)

jsonrep = json.dumps(messages)
if output:
with open(str(output), 'w+', encoding="utf8") as f_out:
f_out.write(jsonrep)
else:
print(jsonrep)

if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions doc/release-notes-19776.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Updated RPCs
------------

- The `getpeerinfo` RPC returns two new boolean fields, `bip152_hb_to` and
`bip152_hb_from`, that respectively indicate whether we selected a peer to be
in compact blocks high-bandwidth mode or whether a peer selected us as a
compact blocks high-bandwidth peer. High-bandwidth peers send new block
announcements via a `cmpctblock` message rather than the usual inv/headers
announcements. See BIP 152 for more details. (#19776)
12 changes: 12 additions & 0 deletions doc/release-notes-20833.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Updated RPCs
------------

- The `testmempoolaccept` RPC now accepts multiple transactions (still experimental at the moment,
API may be unstable). This is intended for testing transaction packages with dependency
relationships; it is not recommended for batch-validating independent transactions. In addition to
mempool policy, package policies apply: the list cannot contain more than 25 transactions or have a
total size exceeding 101K virtual bytes, and cannot conflict with (spend the same inputs as) each other or
the mempool, even if it would be a valid BIP125 replace-by-fee. There are some known limitations to
the accuracy of the test accept: it's possible for `testmempoolaccept` to return "allowed"=True for a
group of transactions, but "too-long-mempool-chain" if they are actually submitted. (#20833)

6 changes: 6 additions & 0 deletions doc/release-notes-21056.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
New bitcoin-cli settings
------------------------

- A new `-rpcwaittimeout` argument to `bitcoin-cli` sets the timeout
in seconds to use with `-rpcwait`. If the timeout expires,
`bitcoin-cli` will report a failure. (#21056)
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ BITCOIN_CORE_H = \
outputtype.h \
policy/feerate.h \
policy/fees.h \
policy/packages.h \
policy/policy.h \
policy/settings.h \
pow.h \
Expand Down
3 changes: 1 addition & 2 deletions src/bench/wallet_balance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b
CWallet wallet{test_setup.m_node.chain.get(), "", CreateMockWalletDatabase()};
{
wallet.SetupLegacyScriptPubKeyMan();
bool first_run;
if (wallet.LoadWallet(first_run) != DBErrors::LOAD_OK) assert(false);
if (wallet.LoadWallet() != DBErrors::LOAD_OK) assert(false);
}
auto handler = test_setup.m_node.chain->handleNotifications({&wallet, [](CWallet*) {}});

Expand Down
14 changes: 10 additions & 4 deletions src/bitcoin-cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ UrlDecodeFn* const URL_DECODE = urlDecode;

static const char DEFAULT_RPCCONNECT[] = "127.0.0.1";
static const int DEFAULT_HTTP_CLIENT_TIMEOUT=900;
static constexpr int DEFAULT_WAIT_CLIENT_TIMEOUT = 0;
static const bool DEFAULT_NAMED=false;
static const int CONTINUE_EXECUTION=-1;

Expand Down Expand Up @@ -69,6 +70,7 @@ static void SetupCliArgs(ArgsManager& argsman)
argsman.AddArg("-rpcuser=<user>", "Username for JSON-RPC connections", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-rpcwait", "Wait for RPC server to start", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-rpcwallet=<walletname>", "Send RPC for non-default wallet on RPC server (needs to exactly match corresponding -wallet option passed to dashd). This changes the RPC endpoint used, e.g. http://127.0.0.1:9998/wallet/<walletname>", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-rpcwaittimeout=<n>", strprintf("Timeout in seconds to wait for the RPC server to start, or 0 for no timeout. (default: %d)", DEFAULT_WAIT_CLIENT_TIMEOUT), ArgsManager::ALLOW_INT, OptionsCategory::OPTIONS);
argsman.AddArg("-stdin", "Read extra arguments from standard input, one per line until EOF/Ctrl-D (recommended for sensitive information such as passphrases). When combined with -stdinrpcpass, the first line from standard input is used for the RPC password.", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-stdinrpcpass", "Read RPC password from standard input as a single line. When combined with -stdin, the first line from standard input is used for the RPC password. When combined with -stdinwalletpassphrase, -stdinrpcpass consumes the first line, and -stdinwalletpassphrase consumes the second.", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
argsman.AddArg("-stdinwalletpassphrase", "Read wallet passphrase from standard input as a single line. When combined with -stdin, the first line from standard input is used for the wallet passphrase.", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
Expand Down Expand Up @@ -705,6 +707,9 @@ static UniValue ConnectAndCallRPC(BaseRequestHandler* rh, const std::string& str
UniValue response(UniValue::VOBJ);
// Execute and handle connection failures with -rpcwait.
const bool fWait = gArgs.GetBoolArg("-rpcwait", false);
const int timeout = gArgs.GetArg("-rpcwaittimeout", DEFAULT_WAIT_CLIENT_TIMEOUT);
const int64_t deadline = GetTime<std::chrono::seconds>().count() + timeout;

do {
try {
response = CallRPC(rh, strMethod, args, rpcwallet);
Expand All @@ -715,11 +720,12 @@ static UniValue ConnectAndCallRPC(BaseRequestHandler* rh, const std::string& str
}
}
break; // Connection succeeded, no need to retry.
} catch (const CConnectionFailed&) {
if (fWait) {
UninterruptibleSleep(std::chrono::milliseconds{1000});
} catch (const CConnectionFailed& e) {
const int64_t now = GetTime<std::chrono::seconds>().count();
if (fWait && (timeout <= 0 || now < deadline)) {
UninterruptibleSleep(std::chrono::seconds{1});
} else {
throw;
throw CConnectionFailed(strprintf("timeout on transient error: %s", e.what()));
}
}
} while (fWait);
Expand Down
9 changes: 5 additions & 4 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-stopatheight", strprintf("Stop running after reaching the given height in the main chain (default: %u)", DEFAULT_STOPATHEIGHT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
argsman.AddArg("-watchquorums=<n>", strprintf("Watch and validate quorum communication (default: %u)", llmq::DEFAULT_WATCH_QUORUMS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);

argsman.AddArg("-capturemessages", "Capture all P2P messages to disk", ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
argsman.AddArg("-debug=<category>", "Output debugging information (default: -nodebug, supplying <category> is optional). "
"If <category> is not supplied or if <category> = 1, output all debugging information. <category> can be: " + LogInstance().LogCategoriesString() + ".", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
argsman.AddArg("-debugexclude=<category>", strprintf("Exclude debugging information for a category. Can be used in conjunction with -debug=1 to output debug logs for all categories except one or more specified categories."), ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
Expand Down Expand Up @@ -1252,16 +1252,17 @@ bool AppInitParameterInteraction(const ArgsManager& args)

// Trim requested connection counts, to fit into system limitations
// <int> in std::min<int>(...) to work around FreeBSD compilation issue described in #2695
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind);
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind + NUM_FDS_MESSAGE_CAPTURE);

#ifdef USE_POLL
int fd_max = nFD;
#else
int fd_max = FD_SETSIZE;
#endif
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS), 0);
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE), 0);
if (nFD < MIN_CORE_FILEDESCRIPTORS)
return InitError(_("Not enough file descriptors available."));
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS, nMaxConnections);
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE, nMaxConnections);

if (nMaxConnections < nUserMaxConnections)
InitWarning(strprintf(_("Reducing -maxconnections from %d to %d, because of system limitations."), nUserMaxConnections, nMaxConnections));
Expand Down