Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

500 plugin infrastructure #502

Open
wants to merge 3 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion dexbot/controllers/main_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, bitshares_instance, config):
self.config = config
self.worker_manager = None

# Configure logging
# Configure per_worker logging
data_dir = user_data_dir(APP_NAME, AUTHOR)
filename = os.path.join(data_dir, 'dexbot.log')
formatter = logging.Formatter(
Expand All @@ -35,6 +35,14 @@ def __init__(self, bitshares_instance, config):
logger.info("DEXBot {} on python {} {}".format(VERSION, sys.version[:6], sys.platform), extra={
'worker_name': 'NONE', 'account': 'NONE', 'market': 'NONE'})

# Configure root logger
logger = logging.getLogger("dexbot")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh = logging.FileHandler(filename)
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.setLevel(logging.INFO)

# Configure orders logging
initialize_orders_log()

Expand Down
10 changes: 10 additions & 0 deletions dexbot/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import errno
import logging
import pkgutil
from appdirs import user_data_dir

from dexbot import APP_NAME, AUTHOR
Expand Down Expand Up @@ -77,6 +78,15 @@ def initialize_orders_log():
logger.info("worker_name;ID;operation_type;base_asset;base_amount;quote_asset;quote_amount;timestamp")


def iter_namespace(ns_pkg):
# https://packaging.python.org/guides/creating-and-discovering-plugins/
# Specifying the second argument (prefix) to iter_modules makes the
# returned name an absolute name instead of a relative one. This allows
# import_module to work without having to do additional modification to
# the name.
return pkgutil.iter_modules(ns_pkg.__path__, ns_pkg.__name__ + ".")


try:
# Unfortunately setuptools is only "kinda-sorta" a standard module
# it's available on pretty much any modern Python system, but some embedded Pythons may not have it
Expand Down
81 changes: 81 additions & 0 deletions dexbot/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
import threading
import importlib
import logging

import dexbot.plugins
from dexbot.helper import iter_namespace

from bitshares import BitShares
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works but to be more specific change to from bitshares.bitshares import BitShares.


log = logging.getLogger(__name__)

class PluginInfrastructure(threading.Thread):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one extra line above, PEP8 error.

""" Run plugins as asyncio tasks

:param dict config: dexbot config

PluginInfrastructure class is needed to be able to run asyncio plugins while having synchronous core. After
switching to asyncio-aware main thread we may continue to use all plugins without refactoring them.
"""

def __init__(self, config):
super().__init__()

self.bitshares = BitShares(node=config['node'], num_retries=-1)
self.config = config
self.loop = None
self.need_stop = False
self.plugins = []

def run(self):
log.debug('Starting PluginInfrastructure thread')
self.init_plugins()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.create_task(self.run_plugins())
self.loop.create_task(self.stop_handler())
self.loop.run_forever()

def init_plugins(self):
""" Initialize plugin instances
"""
plugins = {name: importlib.import_module(name) for finder, name, ispkg in iter_namespace(dexbot.plugins)}

for name, plugin in plugins.items():
self.plugins.append(plugin.Plugin(config=self.config, bitshares_instance=self.bitshares))

async def run_plugins(self):
""" Run each discovered plugin by calling Plugin.main()
"""
# Schedule every plugin as asyncio Task; use ensure_future() for python3.6 compatibility
tasks = [asyncio.ensure_future(plugin.main()) for plugin in self.plugins]
try:
# Wait until all plugins are finished, but catch exceptions immediately as they occure
await asyncio.gather(*tasks, return_exceptions=False)
except asyncio.CancelledError:
# Note: task.cancel() will not propagate this exception here, so it will appear only on current task cancel
log.debug('Stopping run_plugins()')
except Exception:
log.exception('Task finished with exception:')

async def stop_handler(self):
""" Watch for self.need_stop flag to cancel tasks and stop the thread

With this solution it's easier to achieve correct tasks stopping. self.loop.call_soon_threadsafe() requires
additional wrapping to stop tasks or catch exceptions.
"""
while True:
if self.need_stop:
log.debug('Stopping event loop')
tasks = [task for task in asyncio.Task.all_tasks() if task is not asyncio.tasks.Task.current_task()]
# Cancel all tasks
list(map(lambda task: task.cancel(), tasks))
# Wait for tasks finish
results = await asyncio.gather(*tasks, return_exceptions=True)
log.debug('Finished awaiting cancelled tasks, results: {0}'.format(results))
# Stop the event loop
self.loop.stop()
return
else:
await asyncio.sleep(1)
Empty file added dexbot/plugins/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions dexbot/plugins/dummy.py.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import logging

log = logging.getLogger(__name__)


class Plugin:
""" Example plugin class

Plugin must have main() method to run. main() is expected to be an asyncio coroutine
"""

def __init__(self, config=None, bitshares_instance=None):
pass

async def do_stuff(self):
log.info('Doing some stuff')
await asyncio.sleep(10)
log.info('Stuff done')

async def boom(self):
raise Exception('Boom!')

async def main(self):
try:
while True:
await self.do_stuff()
await asyncio.sleep(5)
await self.boom()
except asyncio.CancelledError:
log.info('Stopping correctly')
22 changes: 22 additions & 0 deletions dexbot/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,37 +163,59 @@ def __init__(self):

self.lock = threading.Lock()
self.event = threading.Event()
# Daemon thread means it will be abruptly killed on parent thread shutdown
self.daemon = True
self.start()

def run(self):
# Continuously iterate over task queue and execute tasks
for func, args, token in iter(self.task_queue.get, None):
if token is not None:
args = args+(token,)
func(*args)

def _get_result(self, token):
""" Get task result from all results by token

This function is invoked from self.execute() when caller thread executes a method which needs to return some
result back to caller. On each loop iteration results are being examined for specified task token. If it is
not yet available, the loop execution suspended until DatabaseWorker thread will set self.event flag
indicating some task processing is completed. _get_result() is called from another thread while queue
processing is performed inside the current thread.
"""
while True:
with self.lock:
if token in self.results:
# Find and return task results by token
return_value = self.results[token]
del self.results[token]
return return_value
else:
# Suspend loop execution and wait for flag
self.event.clear()
# Block loop execution waiting Event flag set() from DatabaseWorker thread
self.event.wait()

def _set_result(self, token, result):
""" Associate query results with task token
"""
with self.lock:
self.results[token] = result
self.event.set()

def execute(self, func, *args):
""" Create queue task and return task result
"""
# Token is an unique task identifier
token = str(uuid.uuid4)
# Schedule query execution into DatabaseWorker thread queue
self.task_queue.put((func, args, token))
# Return results when they will be available
return self._get_result(token)

def execute_noreturn(self, func, *args):
""" Create queue task without returning a result
"""
self.task_queue.put((func, args, None))

def set_item(self, category, key, value):
Expand Down
6 changes: 6 additions & 0 deletions dexbot/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def new_func(ctx, *args, **kwargs):
# Set the root logger with basic format
ch = logging.StreamHandler()
ch.setFormatter(formatter1)
fh = logging.FileHandler('dexbot.log')
fh.setFormatter(formatter1)

# Root logger also logs into stream and file with respect of cli-defined verbosity
logging.getLogger("dexbot").addHandler(ch)
logging.getLogger("dexbot").addHandler(fh)
logging.getLogger("dexbot").setLevel(getattr(logging, verbosity.upper()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolve conflict in ui.py

logging.getLogger("").handlers = []

# GrapheneAPI logging
Expand Down
6 changes: 6 additions & 0 deletions dexbot/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import copy

import dexbot.errors as errors
from dexbot.plugin import PluginInfrastructure
from dexbot.strategies.base import StrategyBase

from bitshares import BitShares
Expand Down Expand Up @@ -173,6 +174,8 @@ def add_worker(self, worker_name, config):
self.update_notify()

def run(self):
self.plugins_thread = PluginInfrastructure(self.config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize self.plugins_thread in __init__

self.plugins_thread.start()
self.init_workers(self.config)
self.update_notify()
self.notify.listen()
Expand Down Expand Up @@ -206,6 +209,9 @@ def stop(self, worker_name=None, pause=False):
self.workers[worker].pause()
self.workers = []

# Notify plugins to stop
self.plugins_thread.need_stop = True

# Update other workers
if len(self.workers) > 0:
self.update_notify()
Expand Down