Skip to content
Snippets Groups Projects
run_workflow.py 22.7 KiB
Newer Older
#!/usr/bin/python
# -*- coding: utf-8 -*-
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464


import bioblend
import bioblend.galaxy.objects
from bioblend import galaxy
import argparse
import os
import subprocess
import logging
import sys
import yaml
import re
from gga_autoload.gga_load_data import metadata_generator

""" 
deploy_stacks.py

Usage: $ python3 deploy_stacks.py -i example.yml [OPTIONS]
"""


def parse_input(input_file):
    """
    Parse the yml input file to extract data to create the SpeciesData objects
    Return a list of dictionaries. Each dictionary contains data tied to a species

    :param input_file:
    :return:
    """

    parsed_sp_dict_list = []

    if str(input_file).endswith("yml") or str(input_file).endswith("yaml"):
        logging.debug("Input format used: YAML")
    else:
        logging.critical("Error, please input a YAML file")
        sys.exit()
    with open(input_file, 'r') as stream:
        try:
            yaml_dict = yaml.safe_load(stream)
            for k, v in yaml_dict.items():
                if k == "config":
                    pass
                parsed_sp_dict_list.append(v)
        except yaml.YAMLError as exit_code:
            logging.critical(exit_code + " (YAML input file might be incorrect)")
            sys.exit()
    return parsed_sp_dict_list


class RunWorkflow:
    """
    Run a workflow into the galaxy instance's history of a given species

    """

    def __init__(self, parameters_dictionary):
        self.parameters_dictionary = parameters_dictionary
        self.species = parameters_dictionary["description"]["species"]
        self.genus = parameters_dictionary["description"]["genus"]
        self.strain = parameters_dictionary["description"]["strain"]
        self.sex = parameters_dictionary["description"]["sex"]
        self.common = parameters_dictionary["description"]["common_name"]
        self.date = datetime.today().strftime("%Y-%m-%d")
        self.origin = parameters_dictionary["description"]["origin"]
        self.performed = parameters_dictionary["data"]["performed_by"]
        if parameters_dictionary["data"]["genome_version"] == "":
            self.genome_version = "1.0"
        else:
            self.genome_version = parameters_dictionary["data"]["genome_version"]
        if parameters_dictionary["data"]["ogs_version"] == "":
            self.ogs_version = "1.0"
        else:
            self.ogs_version = parameters_dictionary["data"]["ogs_version"]
        self.genus_lowercase = self.genus[0].lower() + self.genus[1:]
        self.genus_uppercase = self.genus[0].upper() + self.genus[1:]
        self.species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex])
        self.full_name = " ".join([self.genus_uppercase, self.species, self.strain, self.sex])
        self.abbreviation = " ".join([self.genus_lowercase[0], self.species, self.strain, self.sex])
        self.genus_species = self.genus_lowercase + "_" + self.species
        self.instance_url = "http://scratchgmodv1:8888/sp/" + self.genus_lowercase + "_" + self.species + "/galaxy/"
        # Testing with localhost/scratchgmodv1
        self.instance = None
        self.history_id = None
        self.library_id = None
        self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
        self.main_dir = None
        self.species_dir = None
        self.org_id = None
        self.genome_analysis_id = None
        self.ogs_analysis_id = None
        self.tool_panel = None
        self.datasets = dict()
        self.source_files = dict()
        self.workflow_name = None
        self.metadata = dict()
        self.api_key = "master"
        if parameters_dictionary["data"]["parent_directory"] == "" or parameters_dictionary["data"]["parent_directory"] == "/path/to/closest/parent/dir":
            self.source_data_dir = "/projet/sbr/phaeoexplorer/"  # Testing path for phaeoexplorer data
        else:
            self.source_data_dir = parameters_dictionary["data"]["parent_directory"]
        # Directory/subdirectories where data files are located (fasta, gff, ...)
        self.do_update = False
        # Update the instance (in histories corresponding to the input) instead of creating a new one
        self.api_key = "master"
        # API key used to communicate with the galaxy instance. Cannot be used to do user-tied actions
        self.species_name_regex_litteral = "(?=\w*V)(?=\w*A)(?=\w*R)(?=\w*I)(?=\w*A)(?=\w*B)(?=\w*L)(?=\w*E)\w+"  # Placeholder re


    def get_species_history_id(self):
        """
        Set and return the current species history id in its galaxy instance

        :return:
        """
        histories = self.instance.histories.get_histories(name=str(self.full_name))
        self.history_id = histories[0]["id"]
        self.instance.histories.show_history(history_id=self.history_id)

        return self.history_id


    def create_species_history(self):
        histories = self.instance.histories.get_histories(name=str(self.full_name))
        print("\n" + str(histories) + "\n" + self.full_name + "\n")
        if not histories:
            self.instance.histories.create_history(name="FOO")
            print("Created history!")


    def get_instance_attributes(self):
        """
        retrieves instance attributes:
        - working history ID
        - libraries ID (there should only be one library!)
        - datasets IDs

        :return:
        """
        histories = self.instance.histories.get_histories(name=str(self.full_name))
        self.history_id = histories[0]["id"]
        logging.debug("history ID: " + self.history_id)
        libraries = self.instance.libraries.get_libraries()  # normally only one library
        self.library_id = self.instance.libraries.get_libraries()[0]["id"]  # project data folder/library
        logging.debug("library ID: " + self.history_id)
        instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id)

        folders_ids = {}
        current_folder_name = ""
        for i in instance_source_data_folders:
            for k, v in i.items():
                if k == "name":
                    folders_ids[v] = 0
                    current_folder_name = v
                if k == "id":
                    folders_ids[current_folder_name] = v
        logging.info("Folders and datasets IDs: ")
        self.datasets = dict()
        for k, v in folders_ids.items():
            logging.info("\t" + k + ": " + v)
            if k == "/genome":
                sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
                for k2, v2 in sub_folder_content.items():
                    for e in v2:
                        if type(e) == dict:
                            if e["name"].endswith(".fa"):
                                self.datasets["genome_file"] = e["ldda_id"]
                                logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
            elif k == "/annotation/" + self.genus_species:
                sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
                for k2, v2 in sub_folder_content.items():
                    for e in v2:
                        if type(e) == dict:
                            # TODO: manage several files of the same type and manage versions
                            if e["name"].endswith("transcripts-gff.fa"):
                                self.datasets["transcripts_file"] = e["ldda_id"]
                                logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
                            elif e["name"].endswith("proteins.fasta"):
                                self.datasets["proteins_file"] = e["ldda_id"]
                                logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
                            elif e["name"].endswith(".gff"):
                                self.datasets["gff_file"] = e["ldda_id"]
                                logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
                            elif e["name"].endswith("MALE"):
                                self.datasets["gff_file"] = e["ldda_id"]
                                logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])


    def init_instance(self):
        """
        Galaxy instance startup in preparation for running workflows
        - remove Homo sapiens from the chado database.
        - add organism and analyses into the chado database --> separate
        - get any other existing organisms IDs before updating the galaxy instance --> separate

        TODO: move the library and analysis/data stuff to a separate function
        :return:
        """

        self.connect_to_instance()
        self.get_species_history_id()
        histories = self.instance.histories.get_histories(name=str(self.full_name))
        # Create the first history
        if not histories:
            self.instance.histories.create_history(name=str(self.full_name))
        self.history_id = histories[0]["id"]
        logging.debug("history ID: " + self.history_id)
        # libraries = self.instance.libraries.get_libraries()  # routine check: one library
        # self.library_id = self.instance.libraries.get_libraries()[0]["id"]  # project data folder/library
        logging.debug("library ID: " + self.history_id)
        instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id)

        # Delete Homo sapiens from Chado database
        logging.debug("Getting 'Homo sapiens' ID in instance's chado database")
        get_sapiens_id_job = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
            history_id=self.history_id,
            tool_inputs={"genus": "Homo", "species": "sapiens"})
        get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"]
        get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output)
        try:
            logging.debug("Deleting Homo 'sapiens' in the instance's chado database")
            get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0]
            sapiens_id = str(
                get_sapiens_id_final_output["organism_id"])  # needs to be str to be recognized by the chado tool
            self.instance.tools.run_tool(
                tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2",
                history_id=self.history_id,
                tool_inputs={"organism": str(sapiens_id)})
        except bioblend.ConnectionError:
            logging.debug("Homo sapiens isn't in the instance's chado database")
        except IndexError:
            logging.debug("Homo sapiens isn't in the instance's chado database")
            pass

        # TODO: the following actions should be done in a separate function (in case if the user wants to do everything him/herself -- for EOSC)
        # Add organism (species) to chado
        logging.info("Adding organism to the instance's chado database")
        self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2",
            history_id=self.history_id,
            tool_inputs={"abbr": self.abbreviation,
                         "genus": self.genus,
                         "species": self.species,
                         "common": self.common})
        # Add OGS analysis to chado
        logging.info("Adding OGS analysis to the instance's chado database")
        self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2",
            history_id=self.history_id,
            tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version,
                         "program": "Performed by Genoscope",
                         "programversion": str("OGS" + self.ogs_version),
                         "sourcename": "Genoscope",
                         "date_executed": self.date})

        # Add genome analysis to chado
        logging.info("Adding genome analysis to the instance's chado database")
        self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2",
            history_id=self.history_id,
            tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version,
                         "program": "Performed by Genoscope",
                         "programversion": str("genome v" + self.genome_version),
                         "sourcename": "Genoscope",
                         "date_executed": self.date})
        self.get_organism_and_analyses_ids()
        logging.info("Finished initializing instance")


    def run_workflow(self, workflow_name, workflow_parameters, datamap):
        """
        Run the "main" workflow in the galaxy instance
        - import data to library
        - load fasta and gff
        - sync with tripal
        - add jbrowse + organism
        - fill in the tripal views

        TODO: map tool name to step id
        :param workflow_name:
        :param workflow_parameters:
        :param datamap:
        :return:
        """

        logging.debug("running workflow: " + str(workflow_name))
        workflow_ga_file = self.main_dir + "Galaxy-Workflow-" + workflow_name + ".ga"
        if self.strain != "":
            custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga"
            custom_ga_file_path = os.path.abspath(custom_ga_file)
        else:
            custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga"
            custom_ga_file_path = os.path.abspath(custom_ga_file)
        with open(workflow_ga_file, 'r') as ga_in_file:
            workflow = str(ga_in_file.readlines())
            # ugly fix for the jbrowse parameters
            workflow = workflow.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}',
                                        str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"')
            workflow = workflow.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"',
                                        str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"')
            workflow = workflow.replace("\\\\", "\\")  # to restore the correct amount of backslashes in the workflow string before import
            # test
            workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}',
                                        "http://localhost/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}")
            # production
            # workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}',
            #                             "http://abims--gga.sb-roscoff.fr/sp/" + self.genus_lowercase + "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}")
            workflow = workflow[2:-2]  # if the line under doesn't output a correct json
            # workflow = workflow[:-2]  # if the line above doesn't output a correct json

            workflow_dict = json.loads(workflow)

            self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict)
            self.workflow_name = workflow_name
            workflow_attributes = self.instance.workflows.get_workflows(name=self.workflow_name)
            workflow_id = workflow_attributes[0]["id"]
            show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id)
            logging.debug("Workflow ID: " + workflow_id)

            logging.debug("Inputs:")
            logging.debug(show_workflow["Inputs"])
            self.instance.workflows.invoke_workflow(workflow_id=workflow_id,
                                                    history_id=self.history_id,
                                                    params=workflow_parameters,
                                                    inputs=datamap,
                                                    inputs_by="")
            self.instance.workflows.delete_workflow(workflow_id=workflow_id)


    def get_organism_and_analyses_ids(self):
        """
        Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado
        doesn't accept organism/analyses names as valid inputs

        :return:
        """
        # Get the ID for the current organism in chado
        org = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
            history_id=self.history_id,
            tool_inputs={"genus": self.genus, "species": self.species})
        org_job_out = org["outputs"][0]["id"]
        org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out)
        try:
            org_output = json.loads(org_json_output)[0]
            self.org_id = str(org_output["organism_id"])  # id needs to be a str to be recognized by chado tools
        except IndexError:
            logging.debug("no organism matching " + self.full_name + " exists in the instance's chado database")

        # Get the ID for the OGS analysis in chado
        ogs_analysis = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2",
            history_id=self.history_id,
            tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version})
        ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"]
        ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out)
        try:
            ogs_analysis_output = json.loads(ogs_analysis_json_output)[0]
            self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"])
        except IndexError:
            logging.debug("no matching OGS analysis exists in the instance's chado database")

        # Get the ID for the genome analysis in chado
        genome_analysis = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2",
            history_id=self.history_id,
            tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version})
        genome_analysis_job_out = genome_analysis["outputs"][0]["id"]
        genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out)
        try:
            genome_analysis_output = json.loads(genome_analysis_json_output)[0]
            self.genome_analysis_id = str(genome_analysis_output["analysis_id"])
        except IndexError:
            logging.debug("no matching genome analysis exists in the instance's chado database")


    def connect_to_instance(self):
        """
        TODO: move in init/access
        TODO: password
        Test the connection to the galaxy instance for the current organism
        Exit if it cannot connect to the instance
        """
        self.instance = galaxy.GalaxyInstance(url=self.instance_url, email="gga@sb-roscoff.fr", password="password",
                                              verify=False)
        logging.info("Connecting to the galaxy instance ...")
        try:
            self.instance.histories.get_histories()
            self.tool_panel = self.instance.tools.get_tool_panel()
        except bioblend.ConnectionError:
            logging.critical("Cannot connect to galaxy instance @ " + self.instance_url)
            sys.exit()
        else:
            logging.info("Successfully connected to galaxy instance @ " + self.instance_url)
        self.instance.histories.create_history(name="FOO")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
                                                 "with galaxy instances for GGA"
                                                 ", following the protocol @ "
                                                 "http://gitlab.sb-roscoff.fr/abims/e-infra/gga")

    parser.add_argument("input",
                        type=str,
                        help="Input file (yml)")

    parser.add_argument("-v", "--verbose",
                        help="Increase output verbosity",
                        action="store_false")

    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    logging.info("Start")
    sp_dict_list = parse_input(args.input)

    for sp_dict in sp_dict_list:
        o = RunWorkflow(parameters_dictionary=sp_dict)
        o.main_dir = os.path.abspath(args.dir)
        if args.init_instance:
            logging.info(" Initializing the galaxy instance")
            o.init_instance()
            o.get_instance_attributes()
            # metadata[genus_species_strain_sex]["initialized"] = True
        if args.load_data:
            logging.info("Loading data into galaxy")
            # o.load_data()
            # metadata[genus_species_strain_sex]["data_loaded_in_instance"] = True
        if args.run_main:
            logging.info("Running main workflow")
            o.get_organism_and_analyses_ids()
            workflow_parameters = dict()
            workflow_parameters["0"] = {}
            workflow_parameters["1"] = {}
            workflow_parameters["2"] = {}
            workflow_parameters["3"] = {}
            workflow_parameters["4"] = {"organism": al.org_id,
                                        "analysis_id": al.genome_analysis_id,
                                        "do_update": "true"}
            workflow_parameters["5"] = {"organism": al.org_id,
                                        "analysis_id": al.ogs_analysis_id}
            workflow_parameters["6"] = {"organism_id": al.org_id}
            workflow_parameters["7"] = {"analysis_id": al.ogs_analysis_id}
            workflow_parameters["8"] = {"analysis_id": al.genome_analysis_id}
            workflow_parameters["9"] = {"organism_id": al.org_id}
            workflow_parameters["10"] = {}
            workflow_parameters["11"] = {}

            o.datamap = dict()
            o.datamap["0"] = {"src": "hda", "id": al.datasets["genome_file"]}
            o.datamap["1"] = {"src": "hda", "id": al.datasets["gff_file"]}
            o.datamap["2"] = {"src": "hda", "id": al.datasets["proteins_file"]}
            o.datamap["3"] = {"src": "hda", "id": al.datasets["transcripts_file"]}

            o.run_workflow(workflow_name="main", workflow_parameters=workflow_parameters, datamap=al.datamap)
            # metadata[genus_species_strain_sex]["workflows_run"] = metadata[genus_species_strain_sex]["workflows_run"].append("main")