#!/usr/bin/python # -*- coding: utf-8 -*- import bioblend import bioblend.galaxy.objects import argparse import os import logging import sys import json import utilities import speciesData from gga_autoload.gga_load_data import metadata_generator from bioblend import galaxy """ gga_init.py Usage: $ python3 gga_init.py -i example.yml [OPTIONS] """ class RunWorkflow(speciesData.SpeciesData): """ Run a workflow into the galaxy instance's history of a given species This script is made to work for Phaeoexplorer GGA environments, but can be adapted to run any workflow, provided the user creates their own workflow in a .ga format, and change the set_parameters function to have the correct parameters for their workflow """ def get_species_history_id(self): """ Set and return the current species history id in its galaxy instance :return: """ histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] self.instance.histories.show_history(history_id=self.history_id) return self.history_id def get_instance_attributes(self): """ retrieves instance attributes: - working history ID - libraries ID (there should only be one library!) - datasets IDs :return: """ histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.debug("history ID: " + self.history_id) libraries = self.instance.libraries.get_libraries() # normally only one library self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library logging.debug("library ID: " + self.history_id) instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) folders_ids = {} current_folder_name = "" for i in instance_source_data_folders: for k, v in i.items(): if k == "name": folders_ids[v] = 0 current_folder_name = v if k == "id": folders_ids[current_folder_name] = v logging.info("Folders and datasets IDs: ") datasets = dict() for k, v in folders_ids.items(): logging.info("\t" + k + ": " + v) if k == "/genome": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: if e["name"].endswith(".fa"): datasets["genome_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif k == "/annotation/" + self.genus_species: sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: # TODO: manage several files of the same type and manage versions if e["name"].endswith("transcripts-gff.fa"): datasets["transcripts_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("proteins.fasta"): datasets["proteins_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith(".gff"): datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("MALE"): datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) return {"history_id": self.history_id, "library_id": self.library_id, "datasets": datasets} def prepare_history(self): """ Galaxy instance startup in preparation for importing datasets and running a workflow - Remove Homo sapiens from the chado database. - Add organism and analyses into the chado database --> separate - Get any other existing organisms IDs before updating the galaxy instance --> separate Calling this function is mandatory to have a working galaxy instance history :return: """ self.connect_to_instance() self.get_species_history_id() histories = self.instance.histories.get_histories(name=str(self.full_name)) # Create the first history if not histories: self.instance.histories.create_history(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.debug("history ID: " + self.history_id) # libraries = self.instance.libraries.get_libraries() # routine check: one library # self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library logging.debug("library ID: " + self.history_id) instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) # Delete Homo sapiens from Chado database logging.debug("Getting 'Homo sapiens' ID in instance's chado database") get_sapiens_id_job = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": "Homo", "species": "sapiens"}) get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"] get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output) try: logging.debug("Deleting Homo 'sapiens' in the instance's chado database") get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0] sapiens_id = str( get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2", history_id=self.history_id, tool_inputs={"organism": str(sapiens_id)}) except bioblend.ConnectionError: logging.debug("Homo sapiens isn't in the instance's chado database") except IndexError: logging.debug("Homo sapiens isn't in the instance's chado database") pass # Add organism (species) to chado logging.info("Adding organism to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus, "species": self.species, "common": self.common}) # Add OGS analysis to chado logging.info("Adding OGS analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version, "program": "Performed by Genoscope", "programversion": str("OGS" + self.ogs_version), "sourcename": "Genoscope", "date_executed": self.date}) # Add genome analysis to chado logging.info("Adding genome analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version, "program": "Performed by Genoscope", "programversion": str("genome v" + self.genome_version), "sourcename": "Genoscope", "date_executed": self.date}) self.get_organism_and_analyses_ids() logging.info("Finished initializing instance") def run_workflow(self, workflow_name, workflow_parameters, datamap): """ Run the "main" workflow in the galaxy instance - import data to library - load fasta and gff - sync with tripal - add jbrowse + organism - fill in the tripal views TODO: map tool name to step id :param workflow_name: :param workflow_parameters: :param datamap: :return: """ logging.debug("running workflow: " + str(workflow_name)) workflow_ga_file = self.main_dir + "Galaxy-Workflow-" + workflow_name + ".ga" if self.strain != "": custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) else: custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) with open(workflow_ga_file, 'r') as ga_in_file: workflow = str(ga_in_file.readlines()) # ugly fix for the jbrowse parameters workflow = workflow.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}', str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"') workflow = workflow.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"', str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"') workflow = workflow.replace("\\\\", "\\") # to restore the correct amount of backslashes in the workflow string before import # test workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', "http://localhost/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") # TODO: Uncomment next lines in production # workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', # "http://abims--gga.sb-roscoff.fr/sp/" + self.genus_lowercase + "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") workflow = workflow[2:-2] # if the line under doesn't output a correct json # workflow = workflow[:-2] # if the line above doesn't output a correct json workflow_dict = json.loads(workflow) self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) self.workflow_name = workflow_name workflow_attributes = self.instance.workflows.get_workflows(name=self.workflow_name) workflow_id = workflow_attributes[0]["id"] show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id) logging.debug("Workflow ID: " + workflow_id) logging.debug("Inputs:") logging.debug(show_workflow["Inputs"]) self.instance.workflows.invoke_workflow(workflow_id=workflow_id, history_id=self.history_id, params=workflow_parameters, inputs=datamap, inputs_by="") self.instance.workflows.delete_workflow(workflow_id=workflow_id) # TODO : Keep for prod (add a "if test" condition) def get_organism_and_analyses_ids(self): """ Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado doesn't accept organism/analyses names as valid inputs :return: """ # Get the ID for the current organism in chado org = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": self.genus, "species": self.species}) org_job_out = org["outputs"][0]["id"] org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out) try: org_output = json.loads(org_json_output)[0] self.org_id = str(org_output["organism_id"]) # id needs to be a str to be recognized by chado tools except IndexError: logging.debug("no organism matching " + self.full_name + " exists in the instance's chado database") # Get the ID for the OGS analysis in chado ogs_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version}) ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"] ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out) try: ogs_analysis_output = json.loads(ogs_analysis_json_output)[0] self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching OGS analysis exists in the instance's chado database") # Get the ID for the genome analysis in chado genome_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version}) genome_analysis_job_out = genome_analysis["outputs"][0]["id"] genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out) try: genome_analysis_output = json.loads(genome_analysis_json_output)[0] self.genome_analysis_id = str(genome_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching genome analysis exists in the instance's chado database") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction " "with galaxy instances for GGA" ", following the protocol @ " "http://gitlab.sb-roscoff.fr/abims/e-infra/gga") parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_false") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) logging.info("Start") sp_dict_list = utilities.parse_input(args.input) for sp_dict in sp_dict_list: run_workflow_for_current_organism = RunWorkflow(parameters_dictionary=sp_dict) run_workflow_for_current_organism.main_dir = os.path.abspath(args.dir) if args.init_instance: logging.info(" Initializing the galaxy instance") run_workflow_for_current_organism.init_instance() run_workflow_for_current_organism.get_instance_attributes() # metadata[genus_species_strain_sex]["initialized"] = True if args.load_data: logging.info("Loading data into galaxy") # run_workflow_for_current_organism.load_data() # metadata[genus_species_strain_sex]["data_loaded_in_instance"] = True if args.run_main: logging.info("Running main workflow") run_workflow_for_current_organism.get_organism_and_analyses_ids() workflow_parameters = dict() load_fasta_dataset="0" # TODO: Add a name for each parameter workflow_parameters[load_fasta_dataset] = {} workflow_parameters["1"] = {} workflow_parameters["2"] = {} workflow_parameters["3"] = {} workflow_parameters["4"] = {"organism": run_workflow_for_current_organism.org_id, "analysis_id": run_workflow_for_current_organism.genome_analysis_id, "do_update": "true"} workflow_parameters["5"] = {"organism": run_workflow_for_current_organism.org_id, "analysis_id": run_workflow_for_current_organism.ogs_analysis_id} workflow_parameters["6"] = {"organism_id": run_workflow_for_current_organism.org_id} workflow_parameters["7"] = {"analysis_id": run_workflow_for_current_organism.ogs_analysis_id} workflow_parameters["8"] = {"analysis_id": run_workflow_for_current_organism.genome_analysis_id} workflow_parameters["9"] = {"organism_id": run_workflow_for_current_organism.org_id} workflow_parameters["10"] = {} workflow_parameters["11"] = {} run_workflow_for_current_organism.datamap = dict() run_workflow_for_current_organism.datamap["0"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["genome_file"]} run_workflow_for_current_organism.datamap["1"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["gff_file"]} run_workflow_for_current_organism.datamap["2"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["proteins_file"]} run_workflow_for_current_organism.datamap["3"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["transcripts_file"]} run_workflow_for_current_organism.run_workflow(workflow_name="main", workflow_parameters=workflow_parameters, datamap=run_workflow_for_current_organism.datamap) # metadata[genus_species_strain_sex]["workflows_run"] = metadata[genus_species_strain_sex]["workflows_run"].append("main")