Source code for pl.ps2.parscan2

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Piero Dalle Pezze
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


# for computing the pipeline elapsed time 
import datetime
import glob
import logging
import os
import os.path
import yaml
import traceback
from ..pipeline import Pipeline
from sbpipe.utils.dependencies import is_r_package_installed
from sbpipe.utils.io import refresh
from sbpipe.utils.parcomp import parcomp
from sbpipe.utils.rand import get_rand_alphanum_str
from sbpipe.report.latex_reports import latex_report_ps2, pdf_report

logger = logging.getLogger('sbpipe')


[docs]class ParScan2(Pipeline): """ This module provides the user with a complete pipeline of scripts for computing double parameter scans. """
[docs] def __init__(self, models_folder='Models', working_folder='Results', sim_data_folder='double_param_scan_data', sim_plots_folder='double_param_scan_plots'): __doc__ = Pipeline.__init__.__doc__ Pipeline.__init__(self, models_folder, working_folder, sim_data_folder, sim_plots_folder)
[docs] def run(self, config_file): __doc__ = Pipeline.run.__doc__ logger.info("===============================") logger.info("Pipeline: double parameter scan") logger.info("===============================") logger.info("\n") logger.info("Loading file: " + config_file) logger.info("=============\n") # load the configuration file try: config_dict = Pipeline.load(config_file) except yaml.YAMLError as e: logger.error(e.message) logger.debug(traceback.format_exc()) return False except IOError: logger.error('File `' + config_file + '` does not exist.') logger.debug(traceback.format_exc()) return False # variable initialisation (generate_data, analyse_data, generate_report, generate_tarball, project_dir, simulator, model, scanned_par1, scanned_par2, cluster, local_cpus, runs, sim_length) = self.parse(config_dict) runs = int(runs) local_cpus = int(local_cpus) sim_length = int(sim_length) models_dir = os.path.join(project_dir, self.get_models_folder()) working_dir = os.path.join(project_dir, self.get_working_folder()) output_folder = os.path.splitext(model)[0] outputdir = os.path.join(working_dir, output_folder) # Get the pipeline start time start = datetime.datetime.now().replace(microsecond=0) # preprocessing if not os.path.exists(outputdir): os.makedirs(outputdir) if generate_data: logger.info("\n") logger.info("Data generation:") logger.info("================") status = ParScan2.generate_data(simulator, model, sim_length, models_dir, os.path.join(outputdir, self.get_sim_data_folder()), cluster, local_cpus, runs) if not status: return False if analyse_data: logger.info("\n") logger.info("Data analysis:") logger.info("==============") status = ParScan2.analyse_data(os.path.splitext(model)[0], scanned_par1, scanned_par2, os.path.join(outputdir, self.get_sim_data_folder()), os.path.join(outputdir, self.get_sim_plots_folder()), cluster, local_cpus, runs) if not status: return False if generate_report: logger.info("\n") logger.info("Report generation:") logger.info("==================") status = ParScan2.generate_report(os.path.splitext(model)[0], scanned_par1, scanned_par2, outputdir, self.get_sim_plots_folder()) if not status: return False if generate_tarball: status = self.generate_tarball(working_dir, output_folder) if not status: return False # Print the pipeline elapsed time end = datetime.datetime.now().replace(microsecond=0) logger.info("\n\nPipeline elapsed time (using Python datetime): " + str(end - start)) return True
[docs] @classmethod def generate_data(cls, simulator, model, sim_length, inputdir, outputdir, cluster, local_cpus, runs): """ The first pipeline step: data generation. :param simulator: the name of the simulator (e.g. Copasi) :param model: the model to process :param sim_length: the length of the simulation :param inputdir: the directory containing the model :param outputdir: the directory to store the results :param cluster: local, lsf for Load Sharing Facility, sge for Sun Grid Engine. :param local_cpus: the number of CPU. :param runs: the number of model simulation :return: True if the task was completed successfully, False otherwise. """ # Some controls if not os.path.isfile(os.path.join(inputdir, model)): logger.error(os.path.join(inputdir, model) + " does not exist.") return False if runs < 1: logger.error("variable `runs` must be greater than 0. Please, check your configuration file.") return False if int(sim_length) < 1: logger.error("variable sim_length must be greater than 0. Please, check your configuration file.") return False refresh(outputdir, os.path.splitext(model)[0]) logger.info("Simulating Model: " + model) try: sim = cls.get_simul_obj(simulator) except TypeError as e: logger.error("simulator: " + simulator + " not found.") logger.debug(traceback.format_exc()) return False try: return sim.ps2(model, sim_length, inputdir, outputdir, cluster, local_cpus, runs) except Exception as e: logger.error(str(e)) logger.debug(traceback.format_exc()) return False
[docs] @classmethod def analyse_data(cls, model, scanned_par1, scanned_par2, inputdir, outputdir, cluster='local', local_cpus=1, runs=1): """ The second pipeline step: data analysis. :param model: the model name :param scanned_par1: the first scanned parameter :param scanned_par2: the second scanned parameter :param inputdir: the directory containing the simulated data sets to process :param outputdir: the directory to store the performed analysis :param cluster: local, lsf for Load Sharing Facility, sge for Sun Grid Engine. :param local_cpus: the number of CPU. :param runs: the number of model simulation :return: True if the task was completed successfully, False otherwise. """ if not os.path.exists(inputdir): logger.error("input_dir " + inputdir + " does not exist. Generate some data first.") return False if runs < 1: logger.error("variable `runs` must be greater than 0. Please, check your configuration file.") return False if int(local_cpus) < 1: logger.error("variable local_cpus must be greater than 0. Please, check your configuration file.") return False # folder preparation refresh(outputdir, os.path.splitext(model)[0]) str_to_replace = get_rand_alphanum_str(10) # requires devtools::install_github("pdp10/sbpiper") if not is_r_package_installed('sbpiper'): logger.critical('R package `sbpiper` was not found. Abort.') return False command = 'R --quiet -e \'library(sbpiper); sbpiper_ps2(\"' + model + \ '\", \"' + scanned_par1 + '\", \"' + scanned_par2 + \ '\", \"' + inputdir + \ '\", \"' + outputdir + \ '\", \"' + str_to_replace # we replace \\ with / otherwise subprocess complains on windows systems. command = command.replace('\\', '\\\\') # We do this to make sure that characters like [ or ] don't cause troubles. command += '\")\'' if not parcomp(command, str_to_replace, outputdir, cluster, int(runs), int(local_cpus), False): return False if len(glob.glob(os.path.join(outputdir, os.path.splitext(model)[0] + '*.pdf'))) == 0: return False return True
[docs] @classmethod def generate_report(cls, model, scanned_par1, scanned_par2, outputdir, sim_plots_folder): """ The third pipeline step: report generation. :param model: the model name :param scanned_par1: the first scanned parameter :param scanned_par2: the second scanned parameter :param outputdir: the directory containing the report :param sim_plots_folder: the folder containing the plots. :return: True if the task was completed successfully, False otherwise. """ if not os.path.exists(os.path.join(outputdir, sim_plots_folder)): logger.error("input_dir " + os.path.join(outputdir, sim_plots_folder) + " does not exist. Analyse the data first.") return False logger.info("Generating LaTeX report") logger.info(model) filename_prefix = "report__double_param_scan_" latex_report_ps2(outputdir, sim_plots_folder, filename_prefix, model, scanned_par1, scanned_par2) logger.info("Generating PDF report") pdf_report(outputdir, filename_prefix + model + ".tex") if len(glob.glob(os.path.join(outputdir, '*' + os.path.splitext(model)[0] + '*.pdf'))) == 0: return False return True
[docs] def parse(self, my_dict): __doc__ = Pipeline.parse.__doc__ generate_data = True analyse_data = True generate_report = True generate_tarball = False project_dir = '.' model = 'model' # default values simulator = 'Copasi' # the first scanned param scanned_par1 = "" # the second scanned param scanned_par2 = "" cluster = 'local' local_cpus = 1 runs = 1 # the simulation length sim_length = 1 # Initialises the variables for key, value in my_dict.items(): logger.info(key + ": " + str(value)) if key == "generate_data": generate_data = value elif key == "analyse_data": analyse_data = value elif key == "generate_report": generate_report = value elif key == "generate_tarball": generate_tarball = value elif key == "project_dir": project_dir = value elif key == "model": model = value elif key == "simulator": simulator = value elif key == "scanned_par1": scanned_par1 = value elif key == "scanned_par2": scanned_par2 = value elif key == "cluster": cluster = value elif key == "local_cpus": local_cpus = value elif key == "runs": runs = value elif key == "sim_length": sim_length = value else: logger.warning('Found unknown option: `' + key + '`') return (generate_data, analyse_data, generate_report, generate_tarball, project_dir, simulator, model, scanned_par1, scanned_par2, cluster, local_cpus, runs, sim_length)