Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for starting plots via the Chia daemon plotter API #538

Open
wants to merge 13 commits into
base: development
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions config.yaml.default
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ manager:
# check_interval: The number of seconds to wait before checking to see if a new job should start.
# log_level: Keep this on ERROR to only record when there are errors. Change this to INFO in order to see more
# detailed logging. Warning: INFO will write a lot of information.
# use_daemon: Start plotting by calling the chia daemon plotter API instead of starting the chia process directly.
# This allows plot progress to be shown in the GUI or Chia-Dashboard but farmer, pool key
# and log path for the plot will be ignored
check_interval: 60
log_level: ERROR
use_daemon: false


log:
Expand Down
8 changes: 4 additions & 4 deletions plotmanager/library/parse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def _get_manager_settings(config):
if 'manager' not in config:
raise InvalidYAMLConfigException('Failed to find the log parameter in the YAML.')
manager = config['manager']
expected_parameters = ['check_interval', 'log_level']
expected_parameters = ['check_interval', 'log_level', 'use_daemon']
_check_parameters(parameter=manager, expected_parameters=expected_parameters, parameter_type='manager')
return manager['check_interval'], manager['log_level']
return manager['check_interval'], manager['log_level'], manager['use_daemon']


def _get_log_settings(config):
Expand Down Expand Up @@ -137,7 +137,7 @@ def _check_parameters(parameter, expected_parameters, parameter_type):
def get_config_info():
config = _get_config()
chia_location = _get_chia_location(config=config)
manager_check_interval, log_level = _get_manager_settings(config=config)
manager_check_interval, log_level, use_daemon = _get_manager_settings(config=config)
log_directory = _get_log_settings(config=config)
if not os.path.exists(log_directory):
os.makedirs(log_directory)
Expand All @@ -148,6 +148,6 @@ def get_config_info():
view_settings = _get_view_settings(config=config)
instrumentation_settings = _get_instrumentation_settings(config=config)

return chia_location, log_directory, jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
return chia_location, log_directory, use_daemon, jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
minimum_minutes_between_jobs, progress_settings, notification_settings, log_level, view_settings, \
instrumentation_settings
8 changes: 4 additions & 4 deletions plotmanager/library/utilities/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def start_manager():
debug_log_file = open(debug_log_file_path, 'a')
python_file_path = sys.executable

chia_location, log_directory, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
chia_location, log_directory, use_daemon, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
minimum_minutes_between_jobs, progress_settings, notification_settings, debug_level, view_settings, \
instrumentation_settings = get_config_info()

Expand Down Expand Up @@ -78,7 +78,7 @@ def stop_manager():


def json_output():
chia_location, log_directory, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
chia_location, log_directory, use_daemon, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
minimum_minutes_between_jobs, progress_settings, notification_settings, debug_level, view_settings, \
instrumentation_settings = get_config_info()

Expand Down Expand Up @@ -129,7 +129,7 @@ def json_output():


def view(loop=True):
chia_location, log_directory, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
chia_location, log_directory, use_daemon, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
minimum_minutes_between_jobs, progress_settings, notification_settings, debug_level, view_settings, \
instrumentation_settings = get_config_info()
view_check_interval = view_settings['check_interval']
Expand Down Expand Up @@ -188,7 +188,7 @@ def view(loop=True):


def analyze_logs():
chia_location, log_directory, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
chia_location, log_directory, use_daemon, config_jobs, manager_check_interval, max_concurrent, max_for_phase_1, \
minimum_minutes_between_jobs, progress_settings, notification_settings, debug_level, view_settings, \
instrumentation_settings = get_config_info()
analyze_log_times(log_directory)
52 changes: 52 additions & 0 deletions plotmanager/library/utilities/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
import logging

from typing import Any, Dict

from chia.daemon.client import DaemonProxy, connect_to_daemon_and_validate
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.ws_message import WsRpcMessage


class DaemonPlotterProxy(DaemonProxy):
async def start_plotting(self, request: Dict[str, Any]) -> WsRpcMessage:
data = {"service": "chia plots create"}
data.update(request)
request = self.format_request("start_plotting", data)
response = await self._get(request)
return response


async def start_plotting_async(queue, size, memory_buffer, temporary_directory, temporary2_directory,
destination_directory, threads, buckets, bitfield, exclude_final_directory) -> WsRpcMessage:
logging.info(f'Waiting for daemon to be reachable')

client = None
while client is None:
try:
client = await connect_to_daemon_and_validate(DEFAULT_ROOT_PATH)
except ConnectionAbortedError as e:
logging.error(f'Connection aborted when connecting to daemon')
if client is None:
await asyncio.sleep(3)

client.__class__ = DaemonPlotterProxy

result = await client.start_plotting({
"queue": queue,
"k": size,
"t": temporary_directory,
"t2": temporary2_directory if temporary2_directory else temporary_directory,
"d": destination_directory,
"b": memory_buffer,
"u": buckets,
"r": threads,
"e": not bitfield,
"x": exclude_final_directory,
"overrideK": False,
"parallel": True
})

await client.close()

return result
88 changes: 61 additions & 27 deletions plotmanager/library/utilities/jobs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import asyncio
import logging
import psutil
import time

from copy import deepcopy
from datetime import datetime, timedelta

from plotmanager.library.commands import plots
from plotmanager.library.utilities.daemon import start_plotting_async
from plotmanager.library.utilities.exceptions import InvalidConfigurationSetting
from plotmanager.library.utilities.processes import identify_drive, is_windows, start_process
from plotmanager.library.utilities.objects import Job, Work
from plotmanager.library.utilities.log import get_log_file_name
from plotmanager.library.utilities.objects import Job, Work
from plotmanager.library.utilities.processes import is_windows, identify_drive, start_process, get_chia_plot_processes, get_log_path_for_plot


def has_active_jobs_and_work(jobs):
Expand Down Expand Up @@ -159,7 +162,7 @@ def determine_job_size(k_size):


def monitor_jobs_to_start(jobs, running_work, max_concurrent, max_for_phase_1, next_job_work, chia_location,
log_directory, next_log_check, minimum_minutes_between_jobs, system_drives):
log_directory, next_log_check, minimum_minutes_between_jobs, system_drives, use_daemon):
drives_free_space = {}
for job in jobs:
directories = [job.destination_directory]
Expand Down Expand Up @@ -256,10 +259,12 @@ def monitor_jobs_to_start(jobs, running_work, max_concurrent, max_for_phase_1, n
next_job_work[j.name] = minimum_stagger

job, work = start_work(
running_work=running_work,
job=job,
chia_location=chia_location,
log_directory=log_directory,
drives_free_space=drives_free_space,
use_daemon=use_daemon
)
jobs[i] = deepcopy(job)
if work is None:
Expand All @@ -271,15 +276,13 @@ def monitor_jobs_to_start(jobs, running_work, max_concurrent, max_for_phase_1, n
return jobs, running_work, next_job_work, next_log_check


def start_work(job, chia_location, log_directory, drives_free_space):
def start_work(running_work, job, chia_location, log_directory, drives_free_space, use_daemon):
logging.info(f'Starting new plot for job: {job.name}')
nice_val = job.unix_process_priority
if is_windows():
nice_val = job.windows_process_priority

now = datetime.now()
log_file_path = get_log_file_name(log_directory, job, now)
logging.info(f'Job log file path: {log_file_path}')
destination_directory, temporary_directory, temporary2_directory, job = \
get_target_directories(job, drives_free_space=drives_free_space)
if not destination_directory:
Expand All @@ -290,7 +293,6 @@ def start_work(job, chia_location, log_directory, drives_free_space):

work = deepcopy(Work())
work.job = job
work.log_file = log_file_path
work.datetime_start = now
work.work_id = job.current_work_id

Expand All @@ -301,26 +303,56 @@ def start_work(job, chia_location, log_directory, drives_free_space):
temporary2_directory = destination_directory
logging.info(f'Job temporary2 directory: {temporary2_directory}')

plot_command = plots.create(
chia_location=chia_location,
farmer_public_key=job.farmer_public_key,
pool_public_key=job.pool_public_key,
size=job.size,
memory_buffer=job.memory_buffer,
temporary_directory=temporary_directory,
temporary2_directory=temporary2_directory,
destination_directory=destination_directory,
threads=job.threads,
buckets=job.buckets,
bitfield=job.bitfield,
exclude_final_directory=job.exclude_final_directory,
)
logging.info(f'Starting with plot command: {plot_command}')

log_file = open(log_file_path, 'a')
logging.info(f'Starting process')
process = start_process(args=plot_command, log_file=log_file)
pid = process.pid
if use_daemon:
asyncio.get_event_loop().run_until_complete(start_plotting_async(
queue=job.name,
size=job.size,
memory_buffer=job.memory_buffer,
temporary_directory=temporary_directory,
temporary2_directory=temporary2_directory,
destination_directory=destination_directory,
threads=job.threads,
buckets=job.buckets,
bitfield=job.bitfield,
exclude_final_directory=job.exclude_final_directory,
))
logging.info(f'Waiting for new plotting process to be started by daemon')
while True:
chia_processes = [p[1] for p in get_chia_plot_processes()]
work_process_ids = running_work.keys()
pids = [p.pid for p in chia_processes if p.pid not in work_process_ids]
if len(pids) == 1:
pid = pids[0]
break
time.sleep(5)

process = psutil.Process(pid)
log_file_path = get_log_path_for_plot(process)

else:
plot_command = plots.create(
chia_location=chia_location,
farmer_public_key=job.farmer_public_key,
pool_public_key=job.pool_public_key,
size=job.size,
memory_buffer=job.memory_buffer,
temporary_directory=temporary_directory,
temporary2_directory=temporary2_directory,
destination_directory=destination_directory,
threads=job.threads,
buckets=job.buckets,
bitfield=job.bitfield,
exclude_final_directory=job.exclude_final_directory,
)
logging.info(f'Starting with plot command: {plot_command}')

log_file_path = get_log_file_name(log_directory, job, now)

log_file = open(log_file_path, 'a')
logging.info(f'Starting process')
process = start_process(args=plot_command, log_file=log_file)
pid = process.pid

logging.info(f'Started process: {pid}')

logging.info(f'Setting priority level: {nice_val}')
Expand All @@ -331,6 +363,8 @@ def start_work(job, chia_location, log_directory, drives_free_space):
psutil.Process(pid).cpu_affinity(job.cpu_affinity)
logging.info(f'Set process cpu affinity')

logging.info(f'Job log file path: {log_file_path}')
work.log_file = log_file_path
work.pid = pid
job.total_running += 1
job.total_kicked_off += 1
Expand Down
64 changes: 42 additions & 22 deletions plotmanager/library/utilities/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ def _contains_in_list(string, lst, case_insensitive=False):
return False


def _get_split_cmdline(process):
raw_commands = process.cmdline()
commands = []
for item in raw_commands:
if not item.startswith('-') or len(item) == 2:
commands.append(item)
else:
commands.append(item[:2])
commands.append(item[2:])
return commands


def get_manager_processes():
processes = []
for process in psutil.process_iter():
Expand Down Expand Up @@ -99,8 +111,8 @@ def get_chia_drives():
continue
except (psutil.ZombieProcess, psutil.NoSuchProcess):
continue
commands = process.cmdline()
temporary_drive, temporary2_drive, destination_drive = get_plot_drives(commands=commands)

temporary_drive, temporary2_drive, destination_drive = get_plot_drives(commands=_get_split_cmdline(process))
if not temporary_drive and not destination_drive:
continue

Expand Down Expand Up @@ -170,7 +182,7 @@ def get_temp_size(plot_id, temporary_directory, temporary2_directory):
return temp_size


def get_running_plots(jobs, running_work, instrumentation_settings):
def get_chia_plot_processes():
chia_processes = []
logging.info(f'Getting running plots')
chia_executable_name = get_chia_executable_name()
Expand All @@ -196,31 +208,39 @@ def get_running_plots(jobs, running_work, instrumentation_settings):
datetime_start = datetime.fromtimestamp(process.create_time())
chia_processes.append([datetime_start, process])
chia_processes.sort(key=lambda x: (x[0]))
return chia_processes


def get_log_path_for_plot(process):
logging.info(f'Finding log file for process: {process.pid}')
log_file_path = None
try:
for file in process.open_files():
if '.mui' == file.path[-4:]:
continue
if file.path[-4:] not in ['.log', '.txt']:
continue
if file.path[-9:] == 'debug.log':
continue
log_file_path = file.path
logging.info(f'Found log file: {log_file_path}')
break
except (psutil.AccessDenied, RuntimeError, psutil.NoSuchProcess):
logging.info(f'Failed to find log file: {process.pid}')

return log_file_path


def get_running_plots(jobs, running_work, instrumentation_settings):
chia_processes = get_chia_plot_processes()

for datetime_start, process in chia_processes:
logging.info(f'Finding log file for process: {process.pid}')
log_file_path = None
commands = []
try:
commands = process.cmdline()
for file in process.open_files():
if '.mui' == file.path[-4:]:
continue
if file.path[-4:] not in ['.log', '.txt']:
continue
if file.path[-9:] == 'debug.log':
continue
log_file_path = file.path
logging.info(f'Found log file: {log_file_path}')
break
except (psutil.AccessDenied, RuntimeError):
logging.info(f'Failed to find log file: {process.pid}')
except psutil.NoSuchProcess:
continue
log_file_path = get_log_path_for_plot(process)

assumed_job = None
logging.info(f'Finding associated job')

commands = _get_split_cmdline(process)
temporary_directory, temporary2_directory, destination_directory = get_plot_directories(commands=commands)
for job in jobs:
if isinstance(job.temporary_directory, list) and temporary_directory not in job.temporary_directory:
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
chia-blockchain==1.1.5
dateparser==1.0.0
psutil==5.8.0
PyYAML==5.4.1
websockets==8.1
python_pushover==0.4