This repository has been archived by the owner on Feb 21, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 174
/
client.cpp
3146 lines (2729 loc) · 126 KB
/
client.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#define DEFAULT_LOGGER "client"
#include <bts/client/client.hpp>
#include <bts/client/messages.hpp>
#include <bts/cli/cli.hpp>
#include <bts/net/node.hpp>
#include <bts/net/exceptions.hpp>
#include <bts/net/upnp.hpp>
#include <bts/net/peer_database.hpp>
#include <bts/blockchain/chain_database.hpp>
#include <bts/blockchain/time.hpp>
#include <bts/blockchain/transaction_evaluation_state.hpp>
#include <bts/blockchain/exceptions.hpp>
#include <bts/utilities/key_conversion.hpp>
#include <bts/utilities/git_revision.hpp>
#include <bts/rpc/rpc_client.hpp>
#include <bts/rpc/rpc_server.hpp>
#include <bts/api/common_api.hpp>
#include <bts/wallet/exceptions.hpp>
#include <bts/wallet/config.hpp>
#include <bts/network/node.hpp>
#include <bts/db/level_map.hpp>
#include <fc/reflect/variant.hpp>
#include <fc/io/fstream.hpp>
#include <fc/io/json.hpp>
#include <fc/network/http/connection.hpp>
#include <fc/network/resolve.hpp>
#include <fc/crypto/base58.hpp>
#include <fc/crypto/elliptic.hpp>
#include <fc/crypto/hex.hpp>
#include <fc/thread/thread.hpp>
#include <fc/thread/scoped_lock.hpp>
#include <fc/log/logger.hpp>
#include <fc/log/file_appender.hpp>
#include <fc/log/logger_config.hpp>
#include <fc/io/raw.hpp>
#include <fc/filesystem.hpp>
#include <fc/git_revision.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include <boost/range/algorithm/reverse.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/program_options.hpp>
#include <boost/iostreams/tee.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics/stats.hpp>
#include <boost/accumulators/statistics/rolling_mean.hpp>
#include <iostream>
#include <algorithm>
#include <fstream>
#include <iomanip>
#include <boost/lexical_cast.hpp>
using namespace boost;
using std::string;
// delegate network breaks win32
#define DISABLE_DELEGATE_NETWORK 1
namespace bts { namespace client {
const string BTS_MESSAGE_MAGIC = "BitShares Signed Message:\n";
void print_banner();
fc::logging_config create_default_logging_config(const fc::path&);
fc::path get_data_dir(const program_options::variables_map& option_variables);
config load_config( const fc::path& datadir );
void load_and_configure_chain_database(const fc::path& datadir,
const program_options::variables_map& option_variables);
fc::variant_object version_info();
program_options::variables_map parse_option_variables(int argc, char** argv)
{
// parse command-line options
program_options::options_description option_config("Usage");
option_config.add_options()
("help", "Display this help message and exit")
("version", "Print version information and exit")
("data-dir", program_options::value<string>(), "Set client data directory")
("genesis-config", program_options::value<string>(),
"Generate a genesis state with the given JSON file instead of using the built-in "
"genesis block (only accepted when the blockchain is empty)")
("rebuild-index", "Same as --resync-blockchain, except it preserves the raw blockchain data rather "
"than downloading a new copy")
("resync-blockchain", "Delete our copy of the blockchain at startup and download a "
"fresh copy of the entire blockchain from the network")
("p2p-port", program_options::value<uint16_t>(), "Set network port to listen on")
("upnp", program_options::value<bool>()->default_value(true), "Enable UPNP")
("max-connections", program_options::value<uint16_t>(),
"Set the maximum number of peers this node will accept at any one time")
("total-bandwidth-limit", program_options::value<uint32_t>()->default_value(1000000),
"Limit total bandwidth to this many bytes per second")
("min-delegate-connection-count", program_options::value<uint32_t>(),
"Override the default minimum connection count needed to produce a block")
("clear-peer-database", "Erase all information in the peer database")
("connect-to", program_options::value<std::vector<string> >(), "Set a remote host to connect to")
("disable-default-peers", "Disable automatic connection to default peers")
("disable-peer-advertising", "Don't let any peers know which other nodes we're connected to")
("server", "Enable JSON-RPC server")
("daemon", "Run in daemon mode with no CLI and start JSON-RPC server")
("rpcuser", program_options::value<string>(), "Set username for JSON-RPC")
("rpcpassword", program_options::value<string>(), "Set password for JSON-RPC")
("rpcport", program_options::value<uint16_t>(), "Set port to listen for JSON-RPC connections")
("httpport", program_options::value<uint16_t>(), "Set port to listen for HTTP JSON-RPC connections")
("input-log", program_options::value< vector<string> >(), "Set log file with CLI commands to execute at startup")
("log-commands", "Log all command input and output")
;
program_options::variables_map option_variables;
try
{
program_options::store(program_options::command_line_parser(argc, argv).
options(option_config).run(), option_variables);
program_options::notify(option_variables);
}
catch (program_options::error& cmdline_error)
{
std::cerr << "Error: " << cmdline_error.what() << "\n";
std::cerr << option_config << "\n";
exit(1);
}
if (option_variables.count("help"))
{
std::cout << option_config << "\n";
exit(0);
}
else if (option_variables.count("version"))
{
std::cout << fc::json::to_pretty_string( bts::client::version_info() ) << "\n";
exit(0);
}
return option_variables;
}
string extract_commands_from_log_stream(std::istream& log_stream)
{
string command_list;
string line;
while (std::getline(log_stream,line))
{
//if line begins with a prompt, add to input buffer
size_t prompt_position = line.find(CLI_PROMPT_SUFFIX);
if (prompt_position != string::npos )
{
size_t command_start_position = prompt_position + strlen(CLI_PROMPT_SUFFIX);
command_list += line.substr(command_start_position);
command_list += "\n";
}
}
return command_list;
}
string extract_commands_from_log_file(fc::path test_file)
{
std::ifstream test_input(test_file.string());
return extract_commands_from_log_stream(test_input);
}
void print_banner()
{
std::cout<<"================================================================\n";
std::cout<<"= =\n";
std::cout<<"= Welcome to BitShares "<< std::setw(5) << std::left << BTS_ADDRESS_PREFIX<<" =\n";
std::cout<<"= =\n";
std::cout<<"= This software is in alpha testing and is not suitable for =\n";
std::cout<<"= real monetary transactions or trading. Use at your own =\n";
std::cout<<"= risk. =\n";
std::cout<<"= =\n";
std::cout<<"= Type 'help' for usage information. =\n";
std::cout<<"================================================================\n";
}
fc::logging_config create_default_logging_config(const fc::path& data_dir)
{
fc::logging_config cfg;
fc::path log_dir = data_dir / "logs";
fc::file_appender::config ac;
ac.filename = log_dir / "default" / "default.log";
ac.truncate = false;
ac.flush = true;
ac.rotate = true;
ac.rotation_interval = fc::hours( 1 );
ac.rotation_limit = fc::days( 1 );
ac.rotation_compression = true;
std::cout << "Logging to file: " << ac.filename.generic_string() << "\n";
fc::file_appender::config ac_rpc;
ac_rpc.filename = log_dir / "rpc" / "rpc.log";
ac_rpc.truncate = false;
ac_rpc.flush = true;
ac_rpc.rotate = true;
ac_rpc.rotation_interval = fc::hours( 1 );
ac_rpc.rotation_limit = fc::days( 1 );
ac_rpc.rotation_compression = true;
std::cout << "Logging RPC to file: " << ac_rpc.filename.generic_string() << "\n";
fc::file_appender::config ac_blockchain;
ac_blockchain.filename = log_dir / "blockchain" / "blockchain.log";
ac_blockchain.truncate = false;
ac_blockchain.flush = true;
ac_blockchain.rotate = true;
ac_blockchain.rotation_interval = fc::hours( 1 );
ac_blockchain.rotation_limit = fc::days( 1 );
ac_blockchain.rotation_compression = true;
std::cout << "Logging blockchain to file: " << ac_blockchain.filename.generic_string() << "\n";
fc::file_appender::config ac_p2p;
ac_p2p.filename = log_dir / "p2p" / "p2p.log";
ac_p2p.truncate = false;
#ifdef NDEBUG
ac_p2p.flush = false;
#else // NDEBUG
ac_p2p.flush = true;
#endif // NDEBUG
ac_p2p.rotate = true;
ac_p2p.rotation_interval = fc::hours( 1 );
ac_p2p.rotation_limit = fc::days( 1 );
ac_p2p.rotation_compression = true;
std::cout << "Logging P2P to file: " << ac_p2p.filename.generic_string() << "\n";
fc::variants c {
fc::mutable_variant_object( "level","debug")("color", "green"),
fc::mutable_variant_object( "level","warn")("color", "brown"),
fc::mutable_variant_object( "level","error")("color", "red") };
cfg.appenders.push_back(
fc::appender_config( "stderr", "console",
fc::mutable_variant_object()
( "stream","std_error")
( "level_colors", c )
) );
cfg.appenders.push_back(fc::appender_config( "default", "file", fc::variant(ac)));
cfg.appenders.push_back(fc::appender_config( "rpc", "file", fc::variant(ac_rpc)));
cfg.appenders.push_back(fc::appender_config( "blockchain", "file", fc::variant(ac_blockchain)));
cfg.appenders.push_back(fc::appender_config( "p2p", "file", fc::variant(ac_p2p)));
fc::logger_config dlc;
dlc.level = fc::log_level::warn;
dlc.name = "default";
dlc.appenders.push_back("default");
dlc.appenders.push_back("p2p");
// dlc.appenders.push_back("stderr");
fc::logger_config dlc_client;
dlc_client.level = fc::log_level::warn;
dlc_client.name = "client";
dlc_client.appenders.push_back("default");
dlc_client.appenders.push_back("p2p");
// dlc.appenders.push_back("stderr");
fc::logger_config dlc_rpc;
dlc_rpc.level = fc::log_level::warn;
dlc_rpc.name = "rpc";
dlc_rpc.appenders.push_back("rpc");
fc::logger_config dlc_blockchain;
dlc_blockchain.level = fc::log_level::warn;
dlc_blockchain.name = "blockchain";
dlc_blockchain.appenders.push_back("blockchain");
fc::logger_config dlc_p2p;
dlc_p2p.level = fc::log_level::debug;
dlc_p2p.name = "p2p";
dlc_p2p.appenders.push_back("p2p");
fc::logger_config dlc_user;
dlc_user.level = fc::log_level::debug;
dlc_user.name = "user";
dlc_user.appenders.push_back("user");
cfg.loggers.push_back(dlc);
cfg.loggers.push_back(dlc_client);
cfg.loggers.push_back(dlc_rpc);
cfg.loggers.push_back(dlc_p2p);
cfg.loggers.push_back(dlc_user);
cfg.loggers.push_back(dlc_blockchain);
return cfg;
}
fc::path get_data_dir(const program_options::variables_map& option_variables)
{ try {
fc::path datadir;
if (option_variables.count("data-dir"))
{
datadir = fc::path(option_variables["data-dir"].as<string>().c_str());
}
else
{
#ifdef WIN32
datadir = fc::app_path() / BTS_BLOCKCHAIN_NAME;
#elif defined( __APPLE__ )
datadir = fc::app_path() / BTS_BLOCKCHAIN_NAME;
#else
std::string blockchain_name(BTS_BLOCKCHAIN_NAME);
std::string::iterator end_pos = std::remove(blockchain_name.begin(), blockchain_name.end(), ' ');
blockchain_name.erase(end_pos, blockchain_name.end());
datadir = fc::app_path() / ("." + blockchain_name);
#endif
}
return datadir;
} FC_RETHROW_EXCEPTIONS( warn, "error loading config" ) }
void load_and_configure_chain_database( const fc::path& datadir,
const program_options::variables_map& option_variables)
{ try {
if (option_variables.count("resync-blockchain"))
{
std::cout << "Deleting old copy of the blockchain in: " << ( datadir / "chain" ).generic_string() << "\n";
try
{
fc::remove_all(datadir / "chain");
}
catch (const fc::exception& e)
{
std::cout << "Error while deleting old copy of the blockchain: " << e.what() << "\n";
std::cout << "You may need to manually delete your blockchain and relaunch the client\n";
}
}
else if (option_variables.count("rebuild-index"))
{
std::cout << "Clearing database index\n";
try
{
fc::remove_all(datadir / "chain/index");
}
catch (const fc::exception& e)
{
std::cout << "Error while deleting database index: " << e.what() << "\n";
}
}
else
{
std::cout << "Loading blockchain from: " << ( datadir / "chain" ).generic_string() << "\n";
}
} FC_RETHROW_EXCEPTIONS( warn, "unable to open blockchain from ${data_dir}", ("data_dir",datadir/"chain") ) }
config load_config( const fc::path& datadir )
{ try {
fc::path config_file = datadir/"config.json";
config cfg;
if( fc::exists( config_file ) )
{
std::cout << "Loading config from: " << config_file.generic_string() << "\n";
auto default_peers = cfg.default_peers;
cfg = fc::json::from_file( config_file ).as<config>();
int merged_peer_count = 0;
for( const auto& peer : default_peers )
{
if( std::find(cfg.default_peers.begin(), cfg.default_peers.end(), peer) == cfg.default_peers.end() )
{
++merged_peer_count;
cfg.default_peers.push_back(peer);
}
}
if( merged_peer_count > 0 )
std::cout << "Merged " << merged_peer_count << " default peers into config.\n";
}
else
{
std::cerr<<"Creating default config file at: "<<config_file.generic_string()<<"\n";
cfg.logging = create_default_logging_config(datadir);
fc::json::save_to_file( cfg, config_file );
}
return cfg;
} FC_RETHROW_EXCEPTIONS( warn, "unable to load config file ${cfg}", ("cfg",datadir/"config.json")) }
using namespace bts::wallet;
using namespace bts::blockchain;
// wrap the exception database in a class that logs the exception to the normal logging stream
// in addition to just storing it
class logging_exception_db
{
public:
typedef bts::db::level_map<fc::time_point,fc::exception> exception_leveldb_type;
private:
exception_leveldb_type _db;
public:
void open(const fc::path& filename, bool create = true)
{
_db.open(filename, create);
}
void store(const fc::exception& e)
{
elog("storing error in database: ${e}", ("e", e));
_db.store(fc::time_point::now(), e);
}
exception_leveldb_type::iterator lower_bound(const fc::time_point& time) const
{
return _db.lower_bound(time);
}
exception_leveldb_type::iterator begin() const
{
return _db.begin();
}
void remove(const fc::time_point& key)
{
_db.remove(key);
}
};
typedef boost::iostreams::tee_device<std::ostream, std::ofstream> TeeDevice;
typedef boost::iostreams::stream<TeeDevice> TeeStream;
namespace detail
{
class client_impl : public bts::net::node_delegate,
public bts::api::common_api
{
public:
class user_appender : public fc::appender
{
public:
user_appender( client_impl& c )
:_client_impl(c){}
virtual void log( const fc::log_message& m ) override
{
string format = m.get_format();
// lookup translation on format here
// perform variable substitution;
string message = format_string( format, m.get_data() );
{ // appenders can be called from any thread
fc::scoped_lock<boost::mutex> lock(_history_lock);
_history.emplace_back( message );
if( _client_impl._cli )
_client_impl._cli->display_status_message( message );
else
std::cout << message << "\n";
}
// call a callback to the client...
// we need an RPC call to fetch this log and display the
// current status.
}
vector<string> get_history()const
{
fc::scoped_lock<boost::mutex> lock(_history_lock);
return _history;
}
void clear_history()
{
fc::scoped_lock<boost::mutex> lock(_history_lock);
_history.clear();
}
private:
mutable boost::mutex _history_lock;
// TODO: consider a deque and enforce maximum length?
vector<string> _history;
client_impl& _client_impl;
};
client_impl(bts::client::client* self) :
_self(self),
_last_sync_status_message_indicated_in_sync(true),
_last_sync_status_head_block(0),
_remaining_items_to_sync(0),
_sync_speed_accumulator(boost::accumulators::tag::rolling_window::window_size = 5)
{ try {
_user_appender = fc::shared_ptr<user_appender>( new user_appender(*this) );
fc::logger::get( "user" ).add_appender( _user_appender );
try {
_rpc_server = std::make_shared<rpc_server>(self);
} FC_RETHROW_EXCEPTIONS(warn,"rpc server")
try {
_chain_db = std::make_shared<chain_database>();
} FC_RETHROW_EXCEPTIONS(warn,"chain_db")
} FC_RETHROW_EXCEPTIONS( warn, "" ) }
virtual ~client_impl() override
{
try
{
_rebroadcast_pending_loop.cancel_and_wait();
}
catch (const fc::exception& e)
{
wlog("Unexpected error from rebroadcast_pending(): ${e}", ("e", e));
}
_p2p_node.reset();
delete _cli;
}
void start()
{
_cli->start();
}
void reschedule_delegate_loop();
void start_delegate_loop();
void cancel_delegate_loop();
void delegate_loop();
void set_target_connections( uint32_t target );
void rebroadcast_pending();
fc::future<void> _rebroadcast_pending_loop;
void configure_rpc_server(config& cfg,
const program_options::variables_map& option_variables);
block_fork_data on_new_block(const full_block& block,
const block_id_type& block_id,
bool sync_mode);
bool on_new_transaction(const signed_transaction& trx);
/* Implement node_delegate */
// @{
virtual bool has_item(const bts::net::item_id& id) override;
virtual bool handle_message(const bts::net::message&, bool sync_mode) override;
virtual std::vector<bts::net::item_hash_t> get_item_ids(uint32_t item_type,
const vector<bts::net::item_hash_t>& blockchain_synopsis,
uint32_t& remaining_item_count,
uint32_t limit = 2000) override;
virtual bts::net::message get_item(const bts::net::item_id& id) override;
virtual fc::sha256 get_chain_id() const override
{
FC_ASSERT( _chain_db != nullptr );
return _chain_db->chain_id();
}
virtual std::vector<bts::net::item_hash_t> get_blockchain_synopsis(uint32_t item_type,
const bts::net::item_hash_t& reference_point = bts::net::item_hash_t(),
uint32_t number_of_blocks_after_reference_point = 0) override;
virtual void sync_status(uint32_t item_type, uint32_t item_count) override;
virtual void connection_count_changed(uint32_t c) override;
virtual uint32_t get_block_number(const bts::net::item_hash_t& block_id) override;
virtual fc::time_point_sec get_block_time(const bts::net::item_hash_t& block_id) override;
virtual fc::time_point_sec get_blockchain_now() override;
virtual void error_encountered(const std::string& message, const fc::oexception& error) override;
/// @}
bts::client::client* _self = nullptr;
bts::cli::cli* _cli = nullptr;
#ifndef DISABLE_DELEGATE_NETWORK
bts::network::node _delegate_network;
#endif
std::unique_ptr<std::istream> _command_script_holder;
std::ofstream _console_log;
std::unique_ptr<std::ostream> _output_stream;
std::unique_ptr<TeeDevice> _tee_device;
std::unique_ptr<TeeStream> _tee_stream;
fc::path _data_dir;
fc::shared_ptr<user_appender> _user_appender;
bool _simulate_disconnect = false;
fc::scoped_connection _time_discontinuity_connection;
bts::rpc::rpc_server_ptr _rpc_server;
std::unique_ptr<bts::net::upnp_service> _upnp_service;
chain_database_ptr _chain_db;
unordered_map<transaction_id_type, signed_transaction> _pending_trxs;
wallet_ptr _wallet;
fc::future<void> _delegate_loop_complete;
fc::time_point _last_sync_status_message_time;
bool _last_sync_status_message_indicated_in_sync;
uint32_t _last_sync_status_head_block;
uint32_t _remaining_items_to_sync;
boost::accumulators::accumulator_set<double, boost::accumulators::stats<boost::accumulators::tag::rolling_mean> > _sync_speed_accumulator;
config _config;
logging_exception_db _exception_db;
uint32_t _min_delegate_connection_count = BTS_MIN_DELEGATE_CONNECTION_COUNT;
bool _sync_mode = true;
bool _in_sync = true;
rpc_server_config _tmp_rpc_config;
bts::net::node_ptr _p2p_node;
//-------------------------------------------------- JSON-RPC Method Implementations
// include all of the method overrides generated by the bts_api_generator
// this file just contains a bunch of lines that look like:
// virtual void some_method(const string& some_arg) override;
#include <bts/rpc_stubs/common_api_overrides.ipp> //include auto-generated RPC API declarations
};
//should this function be moved to rpc server eventually? probably...
void client_impl::configure_rpc_server(config& cfg,
const program_options::variables_map& option_variables)
{
if( option_variables.count("server") || option_variables.count("daemon") )
{
// the user wants us to launch the RPC server.
// First, override any config parameters they
// bts::rpc::rpc_server::config rpc_config(cfg.rpc);
if (option_variables.count("rpcuser"))
cfg.rpc.rpc_user = option_variables["rpcuser"].as<string>();
if (option_variables.count("rpcpassword"))
cfg.rpc.rpc_password = option_variables["rpcpassword"].as<string>();
if (option_variables.count("rpcport"))
cfg.rpc.rpc_endpoint.set_port(option_variables["rpcport"].as<uint16_t>());
if (option_variables.count("httpport"))
cfg.rpc.httpd_endpoint.set_port(option_variables["httpport"].as<uint16_t>());
if (cfg.rpc.rpc_user.empty() ||
cfg.rpc.rpc_password.empty())
{
std::cout << "Error starting RPC server\n";
std::cout << "You specified " << (option_variables.count("server") ? "--server" : "--daemon") << " on the command line,\n";
std::cout << "but did not provide a username or password to authenticate RPC connections.\n";
std::cout << "You can provide these by using --rpcuser=username and --rpcpassword=password on the\n";
std::cout << "command line, or by setting the \"rpc_user\" and \"rpc_password\" properties in the\n";
std::cout << "config file.\n";
exit(1);
}
// launch the RPC servers
bool rpc_success = _rpc_server->configure_rpc(cfg.rpc);
rpc_success &= _rpc_server->configure_http(cfg.rpc);
// this shouldn't fail due to the above checks, but just to be safe...
if (!rpc_success)
std::cerr << "Error starting rpc server\n\n";
fc::optional<fc::ip::endpoint> actual_rpc_endpoint = _rpc_server->get_rpc_endpoint();
if (actual_rpc_endpoint)
{
std::cout << "Starting JSON RPC server on port " << actual_rpc_endpoint->port();
if (actual_rpc_endpoint->get_address() == fc::ip::address("127.0.0.1"))
std::cout << " (localhost only)";
std::cout << "\n";
}
fc::optional<fc::ip::endpoint> actual_httpd_endpoint = _rpc_server->get_httpd_endpoint();
if (actual_httpd_endpoint)
{
std::cout << "Starting HTTP JSON RPC server on port " << actual_httpd_endpoint->port();
if (actual_httpd_endpoint->get_address() == fc::ip::address("127.0.0.1"))
std::cout << " (localhost only)";
std::cout << "\n";
}
}
else
{
std::cout << "Not starting RPC server, use --server to enable the RPC interface\n";
}
}
// Call this whenever a change occurs that may enable block production by the client
void client_impl::reschedule_delegate_loop()
{
if( !_delegate_loop_complete.valid() || _delegate_loop_complete.ready() )
start_delegate_loop();
}
void client_impl::start_delegate_loop()
{
if (!_time_discontinuity_connection.connected())
_time_discontinuity_connection = bts::blockchain::time_discontinuity_signal.connect([=](){ reschedule_delegate_loop(); });
_delegate_loop_complete = fc::async( [=](){ delegate_loop(); }, "delegate_loop" );
}
void client_impl::cancel_delegate_loop()
{
try
{
ilog( "Canceling delegate loop..." );
_delegate_loop_complete.cancel_and_wait();
ilog( "Delegate loop canceled" );
}
catch( const fc::exception& e )
{
wlog( "Unexpected exception thrown from delegate_loop(): ${e}", ("e",e.to_detail_string() ) );
}
}
void client_impl::delegate_loop()
{
if( !_wallet->is_open() || _wallet->is_locked() )
return;
vector<wallet_account_record> enabled_delegates = _wallet->get_my_delegates( enabled_delegate_status );
if( enabled_delegates.empty() )
return;
const auto now = blockchain::now();
ilog( "Starting delegate loop at time: ${t}", ("t",now) );
set_target_connections( BTS_NET_DELEGATE_DESIRED_CONNECTIONS );
const auto next_block_time = _wallet->get_next_producible_block_timestamp( enabled_delegates );
if( next_block_time.valid() )
{
// delegates don't get to skip this check, they must check up on everyone else
_chain_db->skip_signature_verification( false );
ilog( "Producing block at time: ${t}", ("t",*next_block_time) );
#ifndef DISABLE_DELEGATE_NETWORK
// sign in to delegate server using private keys of my delegates
//_delegate_network.signin( _wallet->get_my_delegate( enabled_delegate_status | active_delegate_status ) );
#endif
if( *next_block_time <= now )
{
try
{
FC_ASSERT( network_get_connection_count() >= _min_delegate_connection_count,
"Client must have ${count} connections before you may produce blocks!",
("count",_min_delegate_connection_count) );
FC_ASSERT( _wallet->is_unlocked(), "Wallet must be unlocked to produce blocks!" );
FC_ASSERT( (now - *next_block_time) < fc::seconds( BTS_BLOCKCHAIN_BLOCK_INTERVAL_SEC ),
"You missed your slot at time: ${t}!", ("t",*next_block_time) );
full_block next_block = _chain_db->generate_block( *next_block_time );
_wallet->sign_block( next_block );
on_new_block( next_block, next_block.id(), false );
#ifndef DISABLE_DELEGATE_NETWORK
_delegate_network.broadcast_block( next_block );
// broadcast block to delegates first, starting with the next delegate
#endif
_p2p_node->broadcast( block_message( next_block ) );
ilog( "Produced block #${n}!", ("n",next_block.block_num) );
}
catch( const fc::exception& e )
{
_exception_db.store( e );
}
}
}
uint32_t slot_number = blockchain::get_slot_number( now );
time_point_sec next_slot_time = blockchain::get_slot_start_time( slot_number + 1 );
ilog( "Rescheduling delegate loop for time: ${t}", ("t",next_slot_time) );
time_point scheduled_time = next_slot_time;
if( blockchain::ntp_time().valid() )
scheduled_time -= blockchain::ntp_error();
/* Don't reschedule immediately in case we are in simulation */
const auto system_now = time_point::now();
if( scheduled_time <= system_now )
scheduled_time = system_now + fc::seconds( 1 );
_delegate_loop_complete = fc::schedule( [=](){ delegate_loop(); }, scheduled_time, "delegate_loop" );
}
void client_impl::set_target_connections( uint32_t target )
{
auto params = fc::mutable_variant_object();
params["desired_number_of_connections"] = target;
network_set_advanced_node_parameters( params );
}
vector<account_record> client_impl::blockchain_list_active_delegates( uint32_t first, uint32_t count )const
{
if( first > 0 ) --first;
FC_ASSERT( first < BTS_BLOCKCHAIN_NUM_DELEGATES );
FC_ASSERT( first + count <= BTS_BLOCKCHAIN_NUM_DELEGATES );
vector<account_id_type> all_delegate_ids = _chain_db->get_active_delegates();
FC_ASSERT( all_delegate_ids.size() == BTS_BLOCKCHAIN_NUM_DELEGATES );
vector<account_id_type> delegate_ids( all_delegate_ids.begin() + first, all_delegate_ids.begin() + first + count );
vector<account_record> delegate_records;
delegate_records.reserve( count );
for( const auto& delegate_id : delegate_ids )
{
auto delegate_record = _chain_db->get_account_record( delegate_id );
FC_ASSERT( delegate_record.valid() && delegate_record->is_delegate() );
delegate_records.push_back( *delegate_record );
}
return delegate_records;
}
vector<account_record> client_impl::blockchain_list_delegates( uint32_t first, uint32_t count )const
{
if( first > 0 ) --first;
vector<account_id_type> delegate_ids = _chain_db->get_delegates_by_vote( first, count );
vector<account_record> delegate_records;
delegate_records.reserve( count );
for( const auto& delegate_id : delegate_ids )
{
auto delegate_record = _chain_db->get_account_record( delegate_id );
FC_ASSERT( delegate_record.valid() && delegate_record->is_delegate() );
delegate_records.push_back( *delegate_record );
}
return delegate_records;
}
vector<string> client_impl::blockchain_list_missing_block_delegates( uint32_t block_num )
{
if (block_num == 0 || block_num == 1)
return vector<string>();
vector<string> delegates;
auto this_block = _chain_db->get_block_record( block_num );
FC_ASSERT(this_block.valid(), "Cannot use this call on a block that has not yet been produced");
auto prev_block = _chain_db->get_block_record( block_num - 1 );
auto timestamp = prev_block->timestamp;
timestamp += BTS_BLOCKCHAIN_BLOCK_INTERVAL_SEC;
while (timestamp != this_block->timestamp)
{
auto slot_record = _chain_db->get_slot_record( timestamp );
FC_ASSERT( slot_record.valid() );
auto delegate_record = _chain_db->get_account_record( slot_record->block_producer_id );
FC_ASSERT( delegate_record.valid() );
delegates.push_back( delegate_record->name );
timestamp += BTS_BLOCKCHAIN_BLOCK_INTERVAL_SEC;
}
return delegates;
}
vector<block_record> client_impl::blockchain_list_blocks( uint32_t first, int32_t count )
{
FC_ASSERT( count <= 1000 );
FC_ASSERT( count >= -1000 );
vector<block_record> result;
if (count == 0) return result;
uint32_t total_blocks = _chain_db->get_head_block_num();
FC_ASSERT( first <= total_blocks );
int32_t increment = 1;
//Normalize first and count if count < 0 and adjust count if necessary to not try to list nonexistent blocks
if( count < 0 )
{
first = total_blocks - first;
count *= -1;
if( signed(first) - count < 0 )
count = first;
increment = -1;
}
else
{
if ( first == 0 )
first = 1;
if( first + count - 1 > total_blocks )
count = total_blocks - first + 1;
}
result.reserve( count );
for( int32_t block_num = first; count; --count, block_num += increment )
{
auto record = _chain_db->get_block_record( block_num );
FC_ASSERT( record.valid() );
result.push_back( *record );
}
return result;
}
signed_transactions client_impl::blockchain_get_pending_transactions() const
{
signed_transactions trxs;
vector<transaction_evaluation_state_ptr> pending = _chain_db->get_pending_transactions();
trxs.reserve(pending.size());
for (auto trx_eval_ptr : pending)
{
trxs.push_back(trx_eval_ptr->trx);
}
return trxs;
}
void client_impl::rebroadcast_pending()
{
#ifndef NDEBUG
static bool currently_running = false;
struct checker {
bool& var;
checker(bool& var) : var(var) { assert(!var); var = true; }
~checker() { var = false; }
} _checker(currently_running);
#endif // !NDEBUG
if( !_sync_mode )
{
wlog( "rebroadcasting... " );
try
{
signed_transactions pending = blockchain_get_pending_transactions();
for( auto trx : pending )
{
network_broadcast_transaction( trx );
}
}
catch ( const fc::exception& e )
{
wlog( "error rebroadcasting transacation: ${e}", ("e",e.to_detail_string() ) );
}
}
_rebroadcast_pending_loop = fc::schedule( [=](){ rebroadcast_pending(); },
fc::time_point::now() + fc::seconds((int64_t)(BTS_BLOCKCHAIN_BLOCK_INTERVAL_SEC*1.3)),
"rebroadcast_pending" );
}
///////////////////////////////////////////////////////
// Implement chain_client_delegate //
///////////////////////////////////////////////////////
block_fork_data client_impl::on_new_block(const full_block& block,
const block_id_type& block_id,
bool sync_mode)
{
try
{
_sync_mode = sync_mode;
if (sync_mode && _remaining_items_to_sync > 0)
--_remaining_items_to_sync;
try
{
FC_ASSERT( !_simulate_disconnect );
ilog("Received a new block from the p2p network, current head block is ${num}, "
"new block is ${block}, current head block is ${num}",
("num", _chain_db->get_head_block_num())("block", block)("num", _chain_db->get_head_block_num()));
fc::optional<block_fork_data> fork_data = _chain_db->get_block_fork_data( block_id );
if( fork_data && fork_data->is_known )
{
if (sync_mode && !fork_data->is_linked)
FC_THROW_EXCEPTION(bts::blockchain::unlinkable_block,
"The blockchain already has this block, but it isn't linked");
ilog("The block we just received is one I've already seen, ignoring it");
return *fork_data;
}
else
{
block_fork_data result = _chain_db->push_block(block);
if (sync_mode && !result.is_linked)
FC_THROW_EXCEPTION(bts::blockchain::unlinkable_block, "The blockchain accepted this block, but it isn't linked");
ilog("After push_block, current head block is ${num}", ("num", _chain_db->get_head_block_num()));
fc::time_point_sec now = blockchain::now();
fc::time_point_sec head_block_timestamp = _chain_db->now();
if (_cli
&& result.is_included
&& (now - head_block_timestamp) > fc::minutes(5)
&& _last_sync_status_message_time < (now - fc::seconds(30)))
{
std::ostringstream message;
message << "--- syncing with p2p network, our last block is "
<< fc::get_approximate_relative_time_string(head_block_timestamp, now, " old");
ulog( message.str() );
uint32_t current_head_block_num = _chain_db->get_head_block_num();
if (_last_sync_status_message_time > (now - fc::seconds(60)) &&
_last_sync_status_head_block != 0 &&
current_head_block_num > _last_sync_status_head_block)
{
uint32_t seconds_since_last_status_message = (uint32_t)((fc::time_point(now) - _last_sync_status_message_time).count() / fc::seconds(1).count());
uint32_t blocks_since_last_status_message = current_head_block_num - _last_sync_status_head_block;
double current_sync_speed_in_blocks_per_sec = (double)blocks_since_last_status_message / seconds_since_last_status_message;
_sync_speed_accumulator(current_sync_speed_in_blocks_per_sec);
double average_sync_speed = boost::accumulators::rolling_mean(_sync_speed_accumulator);
double remaining_seconds_to_sync = _remaining_items_to_sync / average_sync_speed;
std::ostringstream speed_message;
speed_message << "--- currently syncing at ";
if (average_sync_speed >= 10.)
speed_message << (int)average_sync_speed << " blocks/sec, ";
else if (average_sync_speed >= 0.1)