Individually Named Dask Worker Console Log Files #10861
-
Novice Dask user.....I'm attempting to use dask futures to run parallel workflow pipelines on a single server and have run into a roadblock figuring out how to create separate log files for each 'job' (worker)'s console output. Each job is passed an alphanumeric string as an argument and I'd like to use that string in the filename for the console output from that job. I cannot for the life of me figure out how to do this properly. I can create separate log files for each worker, but they end up containing the console output (e.g., from 'download' below) from EVERY worker (not ideal). Example code below: from dask.distributed import Client, LocalCluster
from helper import download, runwork, packup
class Pipeline():
def runPipe(self, textstring):
setup_logger(textstring)
download(textstring)
def setup_logger(textstring: str):
logDir = '/tmp'
logStart = datetime.now()
# fmt:off
MSG_LOG_FILE = Path(logDir) / f"fastPipe_{textstring}_{logStart:%H%M%S%d%m%y}.log"
ERR_LOG_FILE = Path(logDir) / 'Errors' / f"fastPipe_{textstring}_{logStart:%H%M%S%d%m%y}.err"
# fmt:on
if not MSG_LOG_FILE.parent.exists():
MSG_LOG_FILE.parent.mkdir(parents=True)
if not ERR_LOG_FILE.parent.exists():
ERR_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
log = logging.getLogger()
log.setLevel(logger.info)
log_fmt = "%(asctime)s | %(levelname)s | %(message)s"
error_fmt = "%(asctime)s | %(levelname)s | %(message)s"
date_fmt = "%m.%d.%y %H:%M:%S"
msglog_file_handler = logger.addHandler(logging.FileHandler(MSG_LOG_FILE, mode='w', delay=True))
msglog_file_handler.setFormatter(CustomFormatter(log_fmt, date_fmt))
msglog_file_handler.setLevel(logger.info)
log.addHandler(msglog_file_handler)
errorlog_file_handler = logging.FileHandler(ERR_LOG_FILE, mode='w', delay=True)
errorlog_file_handler.setFormatter(CustomFormatter(error_fmt, date_fmt))
errorlog_file_handler.setLevel(logging.ERROR)
log.addHandler(errorlog_file_handler)
return (log)
if __name__ == '__main__':
try:
print('Connecting to Dask cluster...')
client = Client('tcp://128.128.188.188:8888', timeout='2s')
except (TimeoutError, OSError):
print('No Dask cluster found, creating a new instance...')
with dask.config.set({"distributed.worker.resources.GPUMemory": 30000}):
cluster = LocalCluster(n_workers=1, dashboard_address=':8888', processes=True, threads_per_worker=1, silence_logs=False)
client = Client(address=cluster)
fp = Pipeline()
textstring_list = ['7787DFA', '8988JJQ', '4454OOP']
client.map(fp.runPipe, textstring_list, pure=False) |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Solved my own issue by adding a line clearing the log handlers (log.handlers = []) just after the log is instantiated (log = logging.getLogger() )each time setup_logger is called. Not 100% sure why the handlers are retained (or appended?) across dask workers, but this solved my issue in case anyone else comes across it. |
Beta Was this translation helpful? Give feedback.
Solved my own issue by adding a line clearing the log handlers (log.handlers = []) just after the log is instantiated (log = logging.getLogger() )each time setup_logger is called. Not 100% sure why the handlers are retained (or appended?) across dask workers, but this solved my issue in case anyone else comes across it.