From 056190480ee706c7b2249a2894b85070ffac0f7a Mon Sep 17 00:00:00 2001 From: Isaac Cook Date: Fri, 4 Apr 2014 00:00:09 -0500 Subject: [PATCH] Added ability to specify multiple backend server connections, and changed all caching to use redis instead of redis + database * monitor_addr is now deprecated, instead use monitor_addrs as in example * the Blob table is no longer used for anything. It will be removed in the future. All cached values that were stored there are now in the cache proper * Tasks run periodically to cache all currently online workers for all servers, instead of doing so on a per-address basis * Scheduled tasks have been split into cache related tasks and db-changing related tasks, allowing a passive version of celery to run for staging * Workers now display their stratum connection address when connected --- config.yml.example | 6 ++- setup.py | 2 +- simplecoin/celery_entry.py | 6 +-- simplecoin/celeryconfig.py | 35 ++++++++++----- simplecoin/tasks.py | 84 ++++++++++++++++++++++++------------ simplecoin/utils.py | 23 +++------- simplecoin/views.py | 26 ++++------- templates/block_table.html | 6 +-- templates/round_summary.html | 2 +- templates/user_stats.html | 1 + 10 files changed, 111 insertions(+), 80 deletions(-) diff --git a/config.yml.example b/config.yml.example index 0fdd23c..82fb0da 100644 --- a/config.yml.example +++ b/config.yml.example @@ -1,7 +1,11 @@ # General # ======================================================================= # Where's powerpool running? -monitor_addr: http://localhost:3855/ +monitor_addrs: + - stratum: tcp+stratum://localhost:3333 + mon_address: http://localhost:3855/ + - stratum: tcp+stratum://localhost:3334 + mon_address: http://localhost:3856/ # how many confirmations do we wait before marking blocks mature # and allowing payout over RPC block_mature_confirms: 250 diff --git a/setup.py b/setup.py index 3413371..6577de0 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='simplecoin', - version='0.4.0', + version='0.5.0', description='Dogecoin mining with no registration required.', author='Eric Cook', author_email='eric@simpload.com', diff --git a/simplecoin/celery_entry.py b/simplecoin/celery_entry.py index 0d5e133..2e3eb6a 100644 --- a/simplecoin/celery_entry.py +++ b/simplecoin/celery_entry.py @@ -5,10 +5,10 @@ app = create_app(celery=True) -# import celerybeat settings -celery.config_from_object('celeryconfig') -celery.conf.update(app.config['celery']) with app.app_context(): + # import celerybeat settings + celery.config_from_object('celeryconfig') + celery.conf.update(current_app.config['celery']) current_app.logger.info("Celery worker powering up... BBBVVVRRR!") main(app=celery) diff --git a/simplecoin/celeryconfig.py b/simplecoin/celeryconfig.py index 2314450..ca0a6ed 100644 --- a/simplecoin/celeryconfig.py +++ b/simplecoin/celeryconfig.py @@ -1,7 +1,8 @@ from datetime import timedelta +from flask import current_app -CELERYBEAT_SCHEDULE = { +caching_tasks = { 'cache_user_donations': { 'task': 'simplecoin.tasks.cache_user_donation', 'schedule': timedelta(minutes=15) @@ -10,9 +11,24 @@ 'task': 'simplecoin.tasks.update_pplns_est', 'schedule': timedelta(minutes=15) }, + 'update_online_workers': { + 'task': 'simplecoin.tasks.update_online_workers', + 'schedule': timedelta(minutes=2) + }, + 'update_diff_average': { + 'task': 'simplecoin.tasks.difficulty_avg', + 'schedule': timedelta(hours=1), + }, + 'server_status': { + 'task': 'simplecoin.tasks.server_status', + 'schedule': timedelta(minutes=2), + }, +} + +database_tasks = { 'compress_min_shares': { 'task': 'simplecoin.tasks.compress_minute', - 'schedule': timedelta(minutes=5), + 'schedule': timedelta(seconds=5), }, 'compress_five_min_shares': { 'task': 'simplecoin.tasks.compress_five_minute', @@ -26,20 +42,19 @@ 'task': 'simplecoin.tasks.update_block_state', 'schedule': timedelta(minutes=5), }, - 'server_status': { - 'task': 'simplecoin.tasks.server_status', - 'schedule': timedelta(minutes=2), - }, 'check_worker_down': { 'task': 'simplecoin.tasks.check_down', 'schedule': timedelta(minutes=1), }, - 'update_diff_average': { - 'task': 'simplecoin.tasks.difficulty_avg', - 'schedule': timedelta(hours=1), - }, 'update_coin_trans': { 'task': 'simplecoin.tasks.update_coin_transaction', 'schedule': timedelta(minutes=10), }, } + +CELERYBEAT_SCHEDULE = caching_tasks +# we want to let celery run in staging mode where it only handles updating +# caches while the prod celery runner is handling real work. Allows separate +# cache databases between stage and prod +if not current_app.config.get('stage', False): + CELERYBEAT_SCHEDULE.update(database_tasks) diff --git a/simplecoin/tasks.py b/simplecoin/tasks.py index d02eca2..15f05ad 100644 --- a/simplecoin/tasks.py +++ b/simplecoin/tasks.py @@ -22,6 +22,35 @@ celery = Celery('simplecoin') +@celery.task(bind=True) +def update_online_workers(self): + """ + Grabs a list of workers from the running powerpool instances and caches + them + """ + try: + users = {} + for i, pp_config in enumerate(current_app.config['monitor_addrs']): + mon_addr = pp_config['mon_address'] + '/clients' + try: + req = requests.get(mon_addr) + data = req.json() + except Exception: + current_app.logger.warn("Unable to connect to {} to gather worker summary." + .format(mon_addr)) + else: + for address, workers in data['clients'].iteritems(): + users.setdefault('addr_online_' + address, []) + for d in workers: + users['addr_online_' + address].append((d['worker'], i)) + + cache.set_many(users, timeout=480) + + except Exception as exc: + logger.error("Unhandled exception in estimating pplns", exc_info=True) + raise self.retry(exc=exc) + + @celery.task(bind=True) def update_pplns_est(self): """ @@ -31,11 +60,16 @@ def update_pplns_est(self): # grab configured N mult = int(current_app.config['last_n']) # generate average diff from last 500 blocks - diff = Blob.query.filter_by(key="diff").first().data['diff'] + diff = cache.get('difficulty_avg') + if diff is None: + current_app.logger.warn( + "Difficulty average is blank, can't calculate pplns estimate") + return # Calculate the total shares to that are 'counted' total_shares = ((float(diff) * (2 ** 16)) * mult) - # Loop through all shares, descending order, until we'd distributed the shares + # Loop through all shares, descending order, until we'd distributed the + # shares remain = total_shares user_shares = {} for share in Share.query.order_by(Share.id.desc()).yield_per(5000): @@ -250,16 +284,13 @@ def count_share(typ, amount, user_=user): def new_block(self, blockheight, bits=None, reward=None): """ Notification that a new block height has been reached in the network. + Sets some things into the cache for display on the website. """ logger.info("Recieved notice of new block height {}".format(blockheight)) - if not isinstance(blockheight, int): - logger.error("Invalid block height submitted, must be integer") - blob = Blob(key='block', data={'height': str(blockheight), - 'difficulty': str(bits_to_difficulty(bits)), - 'reward': str(reward)}) - db.session.merge(blob) - db.session.commit() + cache.set('blockheight', blockheight) + cache.set('difficulty', bits_to_difficulty(bits)) + cache.set('reward', reward) # keep the last 500 blocks in the cache for getting average difficulty cache.cache._client.lpush('block_cache', bits) @@ -661,23 +692,24 @@ def check_down(self): @celery.task(bind=True) def server_status(self): """ - Periodic pull update of server stats + Periodicly poll the backend to get number of workers and throw it in the cache """ try: - mon_addr = current_app.config['monitor_addr'] - try: - req = requests.get(mon_addr) - data = req.json() - except Exception: - logger.warn("Couldn't connect to internal monitor at {}".format(mon_addr), - exc_info=True) - output = {'stratum_clients': 0, 'agent_clients': 0} - else: - output = {'stratum_clients': data['stratum_clients'], - 'agent_clients': data['agent_clients']} - blob = Blob(key='server', data={k: str(v) for k, v in output.iteritems()}) - db.session.merge(blob) - db.session.commit() + total_workers = 0 + for i, pp_config in enumerate(current_app.config['monitor_addrs']): + mon_addr = pp_config['mon_address'] + try: + req = requests.get(mon_addr) + data = req.json() + except Exception: + logger.warn("Couldn't connect to internal monitor at {}" + .format(mon_addr)) + continue + else: + cache.set('stratum_workers_' + str(i), data['stratum_clients']) + total_workers += data['stratum_clients'] + + cache.set('total_workers', total_workers) except Exception: logger.error("Unhandled exception in server_status", exc_info=True) db.session.rollback() @@ -691,9 +723,7 @@ def difficulty_avg(self): try: diff_list = cache.cache._client.lrange('block_cache', 0, 500) total_diffs = sum([bits_to_difficulty(diff) for diff in diff_list]) - blob = Blob(key='diff', data={'diff': str(total_diffs / len(diff_list))}) - db.session.merge(blob) - db.session.commit() + cache.set('difficulty_avg', total_diffs / len(diff_list)) except Exception as exc: logger.warn("Unknown failure in difficulty_avg", exc_info=True) raise self.retry(exc=exc) diff --git a/simplecoin/utils.py b/simplecoin/utils.py index dc56ead..bde682c 100644 --- a/simplecoin/utils.py +++ b/simplecoin/utils.py @@ -2,7 +2,6 @@ import datetime import time import itertools -import requests import yaml from flask import current_app @@ -194,13 +193,13 @@ def collect_user_stats(address): if pplns_cached_time != None: pplns_cached_time.strftime("%Y-%m-%d %H:%M:%S") - pplns_total_shares = cache.get('pplns_total_shares') + pplns_total_shares = cache.get('pplns_total_shares') or 0 # store all the raw data of we're gonna grab workers = {} # blank worker template def_worker = {'accepted': 0, 'rejected': 0, 'last_10_shares': 0, - 'online': False, 'status': None} + 'online': False, 'status': None, 'server': {}} # for picking out the last 10 minutes worth shares... now = datetime.datetime.utcnow().replace(second=0, microsecond=0) twelve_ago = now - datetime.timedelta(minutes=12) @@ -240,9 +239,13 @@ def collect_user_stats(address): workers[st.worker]['status_version'] = "Unsupp" # pull online status from cached pull direct from powerpool servers - for name in workers_online(address): + for name, host in cache.get('addr_online_' + address) or []: workers.setdefault(name, def_worker.copy()) workers[name]['online'] = True + try: + workers[name]['server'] = current_app.config['monitor_addrs'][host] + except KeyError: + workers[name]['server'] = {} # pre-calculate a few of the values here to abstract view logic for name, w in workers.iteritems(): @@ -284,18 +287,6 @@ def collect_user_stats(address): unconfirmed_balance=unconfirmed_balance) -@cache.memoize(timeout=120) -def workers_online(address): - """ Returns all workers online for an address """ - client_mon = current_app.config['monitor_addr']+'client/'+address - try: - req = requests.get(client_mon) - data = req.json() - except Exception: - return [] - return [w['worker'] for w in data[address]] - - def get_pool_eff(): rej, acc = get_pool_acc_rej() # avoid zero division error diff --git a/simplecoin/views.py b/simplecoin/views.py index b09290c..a6084dc 100755 --- a/simplecoin/views.py +++ b/simplecoin/views.py @@ -7,7 +7,7 @@ jsonify, g, session, Response) from lever import get_joined -from .models import (OneMinuteShare, Block, Blob, FiveMinuteShare, +from .models import (OneMinuteShare, Block, FiveMinuteShare, OneHourShare, Status, FiveMinuteReject, OneMinuteReject, OneHourReject, DonationPercent, BonusPayout) from . import db, root, cache @@ -41,8 +41,9 @@ def blocks(): @main.route("/pool_stats") def pool_stats(): - current_block = db.session.query(Blob).filter_by(key="block").first() - current_block.data['reward'] = int(current_block.data['reward']) + current_block = {'reward': cache.get('reward') or 0, + 'difficulty': cache.get('difficulty') or 0, + 'height': cache.get('blockheight') or 0} blocks = db.session.query(Block).order_by(Block.height.desc()).limit(10) reject_total, accept_total = get_pool_acc_rej() @@ -62,18 +63,9 @@ def add_pool_stats(): g.round_duration = (datetime.datetime.utcnow() - last_block_time()).total_seconds() g.hashrate = get_pool_hashrate() - blobs = Blob.query.filter(Blob.key.in_(("server", "diff"))).all() - try: - server = [b for b in blobs if b.key == "server"][0] - g.worker_count = int(server.data['stratum_clients']) - except IndexError: - g.worker_count = 0 - try: - diff = float([b for b in blobs if b.key == "diff"][0].data['diff']) - except IndexError: - diff = -1 - g.average_difficulty = diff - g.shares_to_solve = diff * (2 ** 16) + g.worker_count = cache.get('total_workers') or 0 + g.average_difficulty = cache.get('difficulty_avg') or 0 + g.shares_to_solve = g.average_difficulty * (2 ** 16) g.total_round_shares = g.shares_to_solve * current_app.config['last_n'] g.alerts = get_alerts() @@ -130,11 +122,9 @@ def user_match(user): user_list = [([shares, user, (65536 * last_10_shares(user[6:]) / 600), user_match(user[6:])]) for user, shares in user_shares.iteritems()] user_list = sorted(user_list, key=lambda x: x[0], reverse=True) - current_block = db.session.query(Blob).filter_by(key="block").first() - return render_template('round_summary.html', users=user_list, - current_block=current_block, + blockheight=cache.get('blockheight') or 0, cached_time=cached_time) diff --git a/templates/block_table.html b/templates/block_table.html index 6983aec..f44bc49 100644 --- a/templates/block_table.html +++ b/templates/block_table.html @@ -22,11 +22,11 @@ 0 0% 00:00:00 - {{ '{:,}'.format((current_block.data['reward'] / 100000000) | round(2)) }} - {{ '{:,}'.format((current_block.data['difficulty'] | int)) }} + {{ '{:,}'.format((current_block['reward'] / 100000000) | round(2)) }} + {{ '{:,}'.format((current_block['difficulty'] | round(4))) }} ... ... - {{ '{:,}'.format((current_block.data['height']) | int) }} + {{ '{:,}'.format((current_block['height']) | int) }} In progress {% endif %} diff --git a/templates/round_summary.html b/templates/round_summary.html index a6f6668..d7a9f63 100644 --- a/templates/round_summary.html +++ b/templates/round_summary.html @@ -14,7 +14,7 @@

Current Round Stats

title="Current height of the blockchain.">

-

{{ '{:2,}'.format((current_block.data['height']|int)) }}

+

{{ '{:2,}'.format((blockheight | int)) }}

diff --git a/templates/user_stats.html b/templates/user_stats.html index 8ab86ae..213459e 100755 --- a/templates/user_stats.html +++ b/templates/user_stats.html @@ -132,6 +132,7 @@

data-toggle="tooltip" data-placement="right" title="{% if worker['online'] %}Online{% else %}Offline{% endif %}">  {% if worker['name'] %}{{ worker['name'] }}{% else %}[unnamed]{% endif %} +   {{ worker['server'].get('stratum', '') }} Hashrate: {{ worker['last_10_hashrate'] | round(3) }} MH/sec