#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Piero Dalle Pezze
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
#
# Object: Execute the model several times for deterministic or stochastic analysis
from __future__ import print_function
import logging
import sys
import os
import multiprocessing
import subprocess
import shlex
from time import sleep
logger = logging.getLogger('sbpipe')
[docs]def run_cmd(cmd):
"""
Run a command using Python subprocess.
:param cmd: The string of the command to run
"""
if sys.version_info > (3,):
with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
out, err = p.communicate()
else:
p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return out, err
[docs]def run_cmd_block(cmd):
"""
Run a command using Python subprocess. Block the call until the command has finished.
:param cmd: A tuple containing the string of the command to run
"""
p = subprocess.call(shlex.split(cmd))
[docs]def parcomp(cmd, cmd_iter_substr, output_dir, cluster='local', runs=1, local_cpus=1, output_msg=False,
colnames=[]):
"""
Generic function to run a command in parallel
:param cmd: the command string to run in parallel
:param cmd_iter_substr: the substring of the iteration number. This will be replaced in a number automatically
:param output_dir: the output directory
:param cluster: the cluster type among local (Python multiprocessing), sge, or lsf
:param runs: the number of runs. Ignored if colnames is not empty
:param local_cpus: the number of cpus to use at most
:param output_msg: print the output messages on screen (available for cluster='local' only)
:param colnames: the name of the columns to process
:return: True if the computation succeeded.
"""
logger.debug("Parallel computation using " + cluster)
logger.debug("Command: " + cmd)
logger.debug("Iter ID string: " + cmd_iter_substr)
logger.debug("# runs: " + str(runs))
if cluster == "sge" or cluster == "lsf":
out_dir = os.path.join(output_dir, 'out')
err_dir = os.path.join(output_dir, 'err')
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if not os.path.exists(err_dir):
os.makedirs(err_dir)
if cluster == "sge": # use SGE (Sun Grid Engine)
return run_jobs_sge(cmd, cmd_iter_substr, out_dir, err_dir, runs, colnames)
elif cluster == "lsf": # use LSF (Platform Load Sharing Facility)
return run_jobs_lsf(cmd, cmd_iter_substr, out_dir, err_dir, runs, colnames)
else: # use local by default (python multiprocessing). This is configured to work locally using multi-core.
if cluster != "local":
logger.warning(
"Variable cluster is not set correctly in the configuration file. "
"Values are: `local`, `lsf`, `sge`. Running `local` by default")
return run_jobs_local(cmd, cmd_iter_substr, runs, local_cpus, output_msg, colnames)
[docs]def progress_bar(it, total):
"""
A minimal CLI progress bar.
:param it: current iteration starting from 1
:param total: total iterations
"""
percent = '(' + ("{0:.1f}").format(100 * (it / float(total))) + '%)'
progress = str(it) + ' of ' + str(total)
print('\r%s %s %s' % ('Initialised:', progress, percent), end='\r')
if it == total:
print()
[docs]def progress_bar2(it, total):
"""
A CLI progress bar.
:param it: current iteration starting from 1
:param total: total iterations
"""
percent = ("{0:.1f}").format(100 * (it / float(total)))
length = 50
filled = int(length * it // total)
bar = '#' * filled + '-' * (length - filled)
progress = '(' + str(it) + ' of ' + str(total) + ')'
print('\r%s |%s| %s%% %s' % ('Progress:', bar, percent, progress), end='\r')
if it == total:
print()
[docs]def call_proc(params):
"""
Run a command using Python subprocess.
:param params: A tuple containing (the string of the command to run, the command id)
"""
cmd, id, runs, handler_level = params
if handler_level <= logging.INFO:
progress_bar(id, runs)
if sys.version_info > (3,):
with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
out, err = p.communicate()
else:
p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return out, err
[docs]def run_jobs_local(cmd, cmd_iter_substr, runs=1, local_cpus=1, output_msg=False, colnames=[]):
"""
Run jobs using python multiprocessing locally.
:param cmd: the full command to run as a job
:param cmd_iter_substr: the substring in command to be replaced with a number
:param runs: the number of runs. Ignored if colnames is not empty
:param local_cpus: The number of available cpus. If local_cpus <=0, only one core will be used.
:param output_msg: print the output messages on screen (available for cluster_type='local' only)
:param colnames: the name of the columns to process
:return: True
"""
# Create a Pool.
pool = multiprocessing.Pool(1)
if local_cpus > 0:
if local_cpus <= multiprocessing.cpu_count():
# Create a pool with local_cpus
pool = multiprocessing.Pool(local_cpus)
logger.debug('Initialised multiprocessing.Pool with ' + str(local_cpus))
else:
logger.warning('`local_cpus` is higher than the physical number of CPUs (' +
str(multiprocessing.cpu_count()) + '). Setting `local_cpus` to ' +
str(multiprocessing.cpu_count()))
pool = multiprocessing.Pool(multiprocessing.cpu_count())
logger.info("Starting computation...")
results = []
# get the current level for the StreamHandler
# this must be executed at run-time
if len(logger.handlers) > 1:
handler_level = logger.handlers[1].level
else:
handler_level = logging.INFO
if len(colnames) > 0:
runs = len(colnames)
for i, column in enumerate(colnames):
command = cmd.replace(cmd_iter_substr, column)
logger.debug(command)
params = (command, i+1, runs, handler_level)
results.append(pool.apply_async(call_proc, (params,)))
else:
for i in range(0, runs):
command = cmd.replace(cmd_iter_substr, str(i+1))
logger.debug(command)
params = (command, i+1, runs, handler_level)
results.append(pool.apply_async(call_proc, (params,)))
# Close the pool and wait for each running task to complete
pool.close()
pool.join()
failed = 0
for result in results:
out, err = result.get()
# convert byte to str. Necessary for Python 3+.
# this is also compatible with Python 2.7
out = out.decode('utf-8')
err = err.decode('utf-8')
if 'error' in err.lower():
logger.error('\n' + err)
failed += 1
elif 'warning' in err.lower():
logger.warning('\n' + err)
else:
logger.debug('\n' + err)
if 'error' in out.lower():
logger.error('\n' + out)
elif 'warning' in out.lower():
logger.warning('\n' + out)
else:
if output_msg:
logger.info('\n' + out)
else:
logger.debug('\n' + out)
# Print the status of the parallel computation.
logger.info("Computation terminated.")
if failed == runs:
logger.warning('All computations seem to have errors in the standard error.')
logger.warning("For additional information, run SBpipe using the `--verbose` option.")
# return False
elif failed > 0:
logger.warning("Some computation might have failed. Do all output files exist?")
logger.warning("For additional information, run SBpipe using the `--verbose` option.")
else:
logger.info("If errors occur, check that " + cmd.split(" ")[0] + " runs correctly.")
logger.info("For additional information, run SBpipe using the `--verbose` option.")
return True
[docs]def run_jobs_sge(cmd, cmd_iter_substr, out_dir, err_dir, runs=1, colnames=[]):
"""
Run jobs using a Sun Grid Engine (SGE) cluster.
:param cmd: the full command to run as a job
:param cmd_iter_substr: the substring in command to be replaced with a number
:param out_dir: the directory containing the standard output from qsub
:param err_dir: the directory containing the standard error from qsub
:param runs: the number of runs. Ignored if colnames is not empty
:param colnames: the name of the columns to process
:return: True if the computation succeeded.
"""
# Test this with echo "ls -la" | xargs xargs using Python environment.
# The following works:
# lsCMD = "ls -la"
# echo_cmd=["echo", lsCMD]
# xargsCMD=["xargs", "xargs"]
# echo_proc = subprocess.Popen(echo_cmd, stdout=subprocess.PIPE)
# xargsProc = subprocess.Popen(xargsCMD, stdin=echo_proc.stdout)
logger.info("Starting computation...")
jobs = ""
cmd_iter_substr = cmd_iter_substr.strip('/')
# get the current level for the StreamHandler
# this must be executed at run-time
if len(logger.handlers) > 1:
handler_level = logger.handlers[1].level
else:
handler_level = logging.INFO
if len(colnames) > 0:
runs = len(colnames)
for i, column in enumerate(colnames):
# Now the same with qsub
jobs = "j" + column + "_" + cmd_iter_substr + "," + jobs
qsub_cmd = ["qsub", "-cwd", "-V", "-N", "j" + column + "_" + cmd_iter_substr, "-o", os.path.join(out_dir, "j" + column), "-e", os.path.join(err_dir, "j" + column), "-b", "y", cmd.replace(cmd_iter_substr, column)]
logger.debug(qsub_cmd)
#logger.info('Starting Task ' + column)
if sys.version_info > (3,):
with subprocess.Popen(qsub_cmd, stdout=subprocess.PIPE) as p:
p.communicate()[0]
else:
qsub_proc = subprocess.Popen(qsub_cmd, stdout=subprocess.PIPE)
qsub_proc.communicate()[0]
if handler_level <= logging.INFO:
sleep(0.01)
progress_bar(i+1, runs)
else:
for i in range(0, runs):
# Now the same with qsub
jobs = "j" + str(i+1) + "_" + cmd_iter_substr + "," + jobs
qsub_cmd = ["qsub", "-cwd", "-V", "-N", "j" + str(i+1) + "_" + cmd_iter_substr, "-o", os.path.join(out_dir, "j" + str(i+1)), "-e", os.path.join(err_dir, "j" + str(i+1)), "-b", "y", cmd.replace(cmd_iter_substr, str(i+1))]
logger.debug(qsub_cmd)
#logger.info('Starting Task ' + str(i+1))
if sys.version_info > (3,):
with subprocess.Popen(qsub_cmd, stdout=subprocess.PIPE) as p:
p.communicate()[0]
else:
qsub_proc = subprocess.Popen(qsub_cmd, stdout=subprocess.PIPE)
qsub_proc.communicate()[0]
if handler_level <= logging.INFO:
sleep(0.01)
progress_bar(i+1, runs)
# Check here when these jobs are finished before proceeding
# don't add names for output and error files as they can generate errors..
qsub_cmd = ["qsub", "-sync", "y", "-b", "y", "-o", "/dev/null", "-e", "/dev/null", "-hold_jid", jobs[:-1], "sbpipe_" + cmd_iter_substr, "1"]
if sys.version_info > (3,):
with subprocess.Popen(qsub_cmd, stdout=subprocess.PIPE) as p:
p.communicate()[0]
else:
qsub_proc = subprocess.Popen(qsub_cmd, stdout=subprocess.PIPE)
qsub_proc.communicate()[0]
logger.debug(qsub_cmd)
logger.info("Computation terminated.")
return quick_debug(cmd, out_dir, err_dir)
[docs]def run_jobs_lsf(cmd, cmd_iter_substr, out_dir, err_dir, runs=1, colnames=[]):
"""
Run jobs using a Load Sharing Facility (LSF) cluster.
:param cmd: the full command to run as a job
:param cmd_iter_substr: the substring in command to be replaced with a number
:param out_dir: the directory containing the standard output from bsub
:param err_dir: the directory containing the standard error from bsub
:param runs: the number of runs. Ignored if colnames is not empty
:param colnames: the name of the columns to process
:return: True if the computation succeeded.
"""
logger.info("Starting computation...")
jobs = ""
cmd_iter_substr = cmd_iter_substr.strip('/')
# get the current level for the StreamHandler
# this must be executed at run-time
if len(logger.handlers) > 1:
handler_level = logger.handlers[1].level
else:
handler_level = logging.INFO
if len(colnames) > 0:
runs = len(colnames)
for i, column in enumerate(colnames):
jobs = "done(j" + column + "_" + cmd_iter_substr + ")&&" + jobs
bsub_cmd = ["bsub", "-cwd", "-J", "j" + column + "_" + cmd_iter_substr, "-o", os.path.join(out_dir, "j" + column), "-e",
os.path.join(err_dir, "j" + column), cmd.replace(cmd_iter_substr, column)]
logger.debug(bsub_cmd)
#logger.info('Starting Task ' + column)
if sys.version_info > (3,):
with subprocess.Popen(bsub_cmd, stdout=subprocess.PIPE) as p:
p.communicate()[0]
else:
bsub_proc = subprocess.Popen(bsub_cmd, stdout=subprocess.PIPE)
bsub_proc.communicate()[0]
if handler_level <= logging.INFO:
sleep(0.01)
progress_bar(i+1, runs)
else:
for i in range(0, runs):
jobs = "done(j" + str(i+1) + "_" + cmd_iter_substr + ")&&" + jobs
bsub_cmd = ["bsub", "-cwd", "-J", "j" + str(i+1) + "_" + cmd_iter_substr, "-o", os.path.join(out_dir, "j" + str(i+1)), "-e", os.path.join(err_dir, "j" + str(i+1)), cmd.replace(cmd_iter_substr, str(i+1))]
logger.debug(bsub_cmd)
#logger.info('Starting Task ' + str(i+1))
if sys.version_info > (3,):
with subprocess.Popen(bsub_cmd, stdout=subprocess.PIPE) as p:
p.communicate()[0]
else:
bsub_proc = subprocess.Popen(bsub_cmd, stdout=subprocess.PIPE)
bsub_proc.communicate()[0]
if handler_level <= logging.INFO:
sleep(0.01)
progress_bar(i + 1, runs)
# Check here when these jobs are finished before proceeding
import random
import string
job_name = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(7))
bsub_cmd = ["bsub", "-J", job_name, "-o", "/dev/null", "-e", "/dev/null", "-w", jobs[:-2], "sbpipe_" + cmd_iter_substr, "1"]
logger.debug(bsub_cmd)
if sys.version_info > (3,):
with subprocess.Popen(bsub_cmd, stdout=subprocess.PIPE) as p:
p.communicate()[0]
else:
bsub_proc = subprocess.Popen(bsub_cmd, stdout=subprocess.PIPE)
bsub_proc.communicate()[0]
# Something better than the following would be highly desirable
import time
found = True
while found:
time.sleep(2)
if sys.version_info > (3,):
with subprocess.Popen(["bjobs", "-psr"], stdout=subprocess.PIPE) as p:
output = p.communicate()[0]
else:
my_poll = subprocess.Popen(["bjobs", "-psr"], stdout=subprocess.PIPE)
output = my_poll.communicate()[0]
if job_name not in output:
found = False
logger.info("Computation terminated.")
return quick_debug(cmd, out_dir, err_dir)
[docs]def quick_debug(cmd, out_dir, err_dir):
"""
Look up for `error` and `warning` in the standard output and error files.
A simple debugging function checking the generated log files. We don't stop the computation because it happens
that these messages are more `warnings` than real errors.
:param cmd: the executed command
:param out_dir: the directory containing the standard output files
:param err_dir: the directory contining the standard error files
:return: True
"""
outcome = True
logger.debug("Running parcomp.quick_debug()")
filename = os.path.join(err_dir, "j1")
if os.path.isfile(filename):
if not is_output_file_clean(filename, 'standard error'):
outcome = False
filename = os.path.join(out_dir, "j1")
if os.path.isfile(filename):
if not is_output_file_clean(filename, 'standard output'):
outcome = False
if not outcome:
logger.warning("\nSome computation might have failed. Please check the output in the folders:")
logger.warning("\t" + out_dir + ' (standard output)')
logger.warning("\t" + err_dir + ' (standard error)')
logger.warning("For additional information, run SBpipe using the `--verbose` option.")
logger.warning("(ignore previous warnings if results are generated as expected)")
else:
logger.info("If errors occur, please check the output in the folders: ")
logger.info("\t" + out_dir + ' (standard output)')
logger.info("\t" + err_dir + ' (standard error)')
logger.info("For additional information, run SBpipe using the `--verbose` option.")
# return outcome
return True
[docs]def is_output_file_clean(filename, stream_type='standard output'):
"""
Check whether a file contains the string 'error' or 'warning'. If so a message is printed.
:param filename: a file
:param stream_type: 'stderr' for standard error, 'stdout' for standard output.
:return: True
"""
with open(filename) as my_file:
content = my_file.read().replace('\n', ' ').lower()
if 'error' in content:
logger.warning('Found word `error` in ' + stream_type)
logger.warning('\n' + content)
return False
elif 'warning' in content:
logger.warning('Found word `warning` in ' + stream_type)
logger.warning('\n' + content)
else:
logger.debug('\n' + content)
return True