Source code for simul.copasi.copasi

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018 Piero Dalle Pezze
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import logging
import os
import re
import shutil
import sys

from sbpipe.utils.dependencies import which
from sbpipe.utils.io import remove_file_silently
from sbpipe.utils.io import replace_str_in_file
from sbpipe.utils.parcomp import parcomp
from ..simul import Simul

try:  # Python 2.7+
    from sbpipe.simul.copasi.model_checking import copasi_model_checking
except ImportError:
    pass

logger = logging.getLogger('sbpipe')


[docs]class Copasi(Simul): """ Copasi simulator. """
[docs] def __init__(self): __doc__ = Simul.__init__.__doc__ Simul.__init__(self) self._copasi_not_found_msg = "CopasiSE not found! Please check that CopasiSE is installed and in the PATH " \ "environmental variable." self._copasi = which("CopasiSE") if self._copasi is None: logger.error(self._copasi_not_found_msg)
[docs] def model_checking(self, model_filename, fileout, task_name=""): """ Check whether the Copasi model can be loaded and executed correctly. :param model_filename: the COPASI filename :param fileout: the file containing the model checking results :param task_name: the task to check :return: boolean indicating whether the model could be loaded and executed successfully """ if 'COPASI' in sys.modules: logger.info('COPASI model checking ...') return copasi_model_checking(model_filename, fileout, task_name) else: logger.warning('Python bindings for COPASI not found. Skipping COPASI model checking.') return True
[docs] def sim(self, model, inputdir, outputdir, cluster="local", local_cpus=1, runs=1, output_msg=False): __doc__ = Simul.sim.__doc__ # check Copasi file if not self.model_checking(os.path.join(inputdir, model), os.path.join(inputdir, model.replace('.cps', '_check.txt')), 'Time-Course'): return False if not self._run_par_comput(inputdir, model, outputdir, cluster, local_cpus, runs, output_msg): return False # removed repeated copasi files repeated_copasi_files = [f for f in os.listdir(inputdir) if re.match(self._get_model_group(model) + '[0-9]+.*.cps', f)] for report in repeated_copasi_files: remove_file_silently(os.path.join(inputdir, report)) return True
[docs] def ps1(self, model, scanned_par, simulate_intervals, single_param_scan_intervals, inputdir, outputdir, cluster="local", local_cpus=1, runs=1, output_msg=False): __doc__ = Simul.ps1.__doc__ # check Copasi file if not self.model_checking(os.path.join(inputdir, model), os.path.join(inputdir, model.replace('.cps', '_check.txt')), 'Scan'): return False if not self._run_par_comput(inputdir, model, outputdir, cluster, local_cpus, runs, output_msg): return False # removed repeated copasi files repeated_copasi_files = [f for f in os.listdir(inputdir) if re.match(self._get_model_group(model) + '[0-9]+.*.cps', f)] for report in repeated_copasi_files: remove_file_silently(os.path.join(inputdir, report)) self.ps1_postproc(model, scanned_par, simulate_intervals, single_param_scan_intervals, outputdir) return True
[docs] def ps2(self, model, sim_length, inputdir, outputdir, cluster="local", local_cpus=1, runs=1, output_msg=False): __doc__ = Simul.ps2.__doc__ # check Copasi file if not self.model_checking(os.path.join(inputdir, model), os.path.join(inputdir, model.replace('.cps', '_check.txt')), 'Scan'): return False if not self._run_par_comput(inputdir, model, outputdir, cluster, local_cpus, runs, output_msg): return False # removed repeated copasi files repeated_copasi_files = [f for f in os.listdir(inputdir) if re.match(self._get_model_group(model) + '[0-9]+.*.cps', f)] for report in repeated_copasi_files: remove_file_silently(os.path.join(inputdir, report)) self.ps2_postproc(model, sim_length, outputdir) return True
[docs] def pe(self, model, inputdir, cluster, local_cpus, runs, outputdir, sim_data_dir, output_msg=False): __doc__ = Simul.pe.__doc__ # check Copasi file if not self.model_checking(os.path.join(inputdir, model), os.path.join(inputdir, model.replace('.cps', '_check.txt')), 'Parameter Estimation'): return False if not self._run_par_comput(inputdir, model, sim_data_dir, cluster, local_cpus, runs, output_msg): return False # move_models repeated_copasi_files = [f for f in os.listdir(inputdir) if re.match(self._get_model_group(model) + '[0-9]+.*.cps', f)] for file in repeated_copasi_files: remove_file_silently(os.path.join(inputdir, file)) return True
[docs] def _run_par_comput(self, inputdir, model, outputdir, cluster="local", local_cpus=1, runs=1, output_msg=False): __doc__ = Simul._run_par_comput.__doc__ if self._copasi is None: logger.error(self._copasi_not_found_msg) return False model_group = self._get_model_group(model) # replicate the models for i in range(1, runs + 1): shutil.copyfile(os.path.join(inputdir, model), os.path.join(inputdir, model_group) + str(i) + ".cps") replace_str_in_file(os.path.join(inputdir, model_group) + str(i) + ".cps", os.path.splitext(model)[0] + ".csv", model_group + str(i) + ".csv") replace_str_in_file(os.path.join(inputdir, model_group) + str(i) + ".cps", os.path.splitext(model)[0] + ".txt", model_group + str(i) + ".csv") replace_str_in_file(os.path.join(inputdir, model_group) + str(i) + ".cps", os.path.splitext(model)[0] + ".tsv", model_group + str(i) + ".csv") replace_str_in_file(os.path.join(inputdir, model_group) + str(i) + ".cps", os.path.splitext(model)[0] + ".dat", model_group + str(i) + ".csv") # run copasi in parallel # To make things simple, the last 10 character of groupid are extracted and reversed. # This string will be likely different from groupid and is the string to replace with # the iteration number. str_to_replace = self._groupid[10::-1] command = self._copasi + " " + os.path.join(inputdir, model_group + str_to_replace + ".cps") command = command.replace('\\', '\\\\') if not parcomp(command, str_to_replace, outputdir, cluster, runs, local_cpus, output_msg): return False if not self._move_reports(inputdir, outputdir, model, self._groupid): return False return True
# utilities for collecting parameter estimation results #######################################################
[docs] def _get_params_list(self, filein): """ Return the list of parameter names from filein :param filein: a Copasi parameter estimation report file :return: the list of parameter names """ parameters = [] with open(filein, 'r') as file: lines = file.readlines() line_num = -1 for line in lines: line_num += 1 split_line = line.split('\t') if len(split_line) > 0 and \ (split_line[0].find('List of Fitting Items:') != -1 or split_line[0].find('List of Optimization Items:') != -1): # retrieve parameters line_num += 1 if line_num < len(lines): split_line = lines[line_num].rstrip().split("\t") while len(split_line) > 0: split_line = split_line[0] if not split_line: break if split_line.find('<=') != -1: # extract the name which is between ' <= ' and ' <= ' param = re.search(' <= (.*) <= ', split_line).group(1) parameters.append(param) line_num += 1 if line_num < len(lines): split_line = lines[line_num].rstrip().split("\t") else: break break ############################################################## # START BUG FIX 226. # # This fix is kept separately as it may be removed one day. # Copasi adds the list of constraints to the same list of fitted parameters # in the parameter estimation report. This causes an issue with SBpipe as this cannot discriminate whether # the parameter is estimated or constrained. # This patch cuts off the parameter list based on the number of columns of estimated parameters. # NOTICE: It will work as long as Copasi adds the list of constraints at the bottom of # the list of fitted parameters. # We need to get the number of columns for the estimated parameters. This is not well represented in a # Copasi report and if the estimation is not completed, this information is also missing. In this latter # case, we don't do anything. col_num = 0 with open(filein, 'r') as file: lines = file.readlines() line_num = -1 for line in lines: line_num += 1 split_line = line.split('\t') if len(split_line) > 0 and split_line[0].find('[Function Evaluations]') != -1: # retrieve the columns line_num += 1 if line_num < len(lines): split_line = lines[line_num] if split_line.find('(') != -1: # extract the string which is between '\t(\t' and '\t)\t' # cols contains the parameter values for the first function evaluation cols = re.search('(\t(.*)\t)', split_line).group(1) col_num = len(cols.split("\t")) break # Now we only consider the first `col_num` in parameters, if col_num is not zero. if col_num != 0: parameters = parameters[:col_num] # END BUG FIX 226 ############################################################## return parameters
[docs] def _write_best_fits(self, files, path_out, filename_out): """ Write the final estimates to filename_out :param files: the list of Copasi parameter estimation reports :param path_out: the path to store the file combining the final (best) estimates (filename_out) :param filename_out: the file containing the final (best) estimates """ file_num = -1 # logger.info("\nCollecting results:") with open(os.path.join(path_out, filename_out), 'a') as fileout: for file in files: file_num += 1 with open(file, 'r') as filein: # logger.info(os.path.basename(file)) lines = filein.readlines() line_num = -1 for line in lines: line_num += 1 split_line = line.rstrip().split("\t") # Retrieve the estimated values of the parameters if len(split_line) > 2 and split_line[0] == '[Function Evaluations]' and \ split_line[1] == '[Best Value]' and split_line[2] == '[Best Parameters]': # Retrieve the best fit. best_fit = [] while line_num + 1 < len(lines): line_num += 1 split_line = lines[line_num].replace("\t(", "").replace("\t)", "").rstrip().split("\t") if len(split_line) == 1: break best_fit = split_line # print(best_fit) fileout.write('\t'.join(map(str, best_fit)) + '\n') break
[docs] def _write_all_fits(self, files, path_out, filename_out): """ Write all the estimates to filename_out :param files: the list of Copasi parameter estimation reports :param path_out: the path to store the file combining all the estimates :param filename_out: the file containing all the estimates """ file_num = -1 # logger.info("\nCollecting results:") with open(os.path.join(path_out, filename_out), 'a') as fileout: for file in files: file_num += 1 with open(file, 'r') as filein: # logger.info(os.path.basename(file)) lines = filein.readlines() line_num = -1 for line in lines: line_num += 1 split_line = line.rstrip().split("\t") # Retrieve the estimated values of the parameters if len(split_line) > 2 and split_line[0] == '[Function Evaluations]' and \ split_line[1] == '[Best Value]' and split_line[2] == '[Best Parameters]': # retrieve all fits while line_num + 1 < len(lines): line_num += 1 split_line = lines[line_num].replace("\t(", "").replace("\t)", "").rstrip().split("\t") if len(split_line) == 1: break # print(split_line[1:]) fileout.write('\t'.join(map(str, split_line[1:])) + '\n') break