#!/usr/bin/env python3 # -*- coding: utf-8 -*- import bioblend import bioblend.galaxy.objects import argparse import os import logging import sys import json import utilities import speciesData from bioblend.galaxy.objects import GalaxyInstance from bioblend import galaxy """ gga_init.py Usage: $ python3 gga_init.py -i example.yml [OPTIONS] """ class RunWorkflow(speciesData.SpeciesData): """ Run a workflow into the galaxy instance's history of a given species This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow, provided the user creates their own workflow in a .ga format, and change the set_parameters function to have the correct parameters for their workflow (TODO: use a mapping file for parameters and the .ga file) """ # def get_species_history_id(self): # """ # Set and return the current species history id in its galaxy instance # :return: # """ # histories = self.instance.histories.get_histories(name=str(self.full_name)) # self.history_id = histories[0]["id"] # self.instance.histories.show_history(history_id=self.history_id) # return self.history_id def set_get_history(self): """ Create or set the working history to the current species one TODO move to utilities :return: """ try: histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.info("History for {0}: {1}".format(self.full_name, self.history_id)) except IndexError: logging.info("Creating history for %s" % self.full_name) self.instance.histories.create_history(name=str(self.full_name)) histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.info("History for {0}: {1}".format(self.full_name, self.history_id)) return self.history_id def get_instance_attributes(self): """ retrieves instance attributes: - working history ID - libraries ID (there should only be one library!) - datasets IDs :return: """ histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.debug("history ID: " + self.history_id) libraries = self.instance.libraries.get_libraries() # normally only one library self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library logging.debug("library ID: " + self.history_id) instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) # Access folders via their absolute path genome_folder = self.instance.libraries.get_folders(library_id=self.library_id, name="/genome/" + str(self.species_folder_name) + "/v" + str(self.genome_version)) annotation_folder = self.instance.libraries.get_folders(library_id=self.library_id, name="/annotation/" + str(self.species_folder_name) + "/OGS" + str(self.ogs_version)) # Get their IDs genome_folder_id = genome_folder[0]["id"] annotation_folder_id = annotation_folder[0]["id"] # Get the content of the folders genome_folder_content = self.instance.folders.show_folder(folder_id=genome_folder_id, contents=True) annotation_folder_content = self.instance.folders.show_folder(folder_id=annotation_folder_id, contents=True) # Find genome folder datasets genome_fasta_ldda_id = genome_folder_content["folder_contents"][0]["ldda_id"] # Several dicts in the annotation folder content (one dict = one file) for k, v in annotation_folder_content.items(): if k == "folder_contents": for d in v: if "proteins" in d["name"]: annotation_proteins_ldda_id = d["ldda_id"] if "transcripts" in d["name"]: annotation_transcripts_ldda_id = d["ldda_id"] if ".gff" in d["name"]: annotation_gff_ldda_id = d["ldda_id"] self.datasets["genome_file"] = genome_fasta_ldda_id self.datasets["gff_file"] = annotation_gff_ldda_id self.datasets["proteins_file"] = annotation_proteins_ldda_id self.datasets["transcripts_file"] = annotation_transcripts_ldda_id return {"history_id": self.history_id, "library_id": self.library_id, "datasets": self.datasets} # def import_datasets_to_history(self): # """ # Load the datasets into the current species history # """ # logging.info("Uploading datasets into history %s" % self.history_id) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"]) def connect_to_instance(self): """ Test the connection to the galaxy instance for the current organism Exit if we cannot connect to the instance """ self.instance = galaxy.GalaxyInstance(url=self.instance_url, email=self.config["custom_galaxy_default_admin_email"], password=self.config["custom_galaxy_default_admin_password"] ) logging.info("Connecting to the galaxy instance...") try: self.instance.histories.get_histories() except bioblend.ConnectionError: logging.critical("Cannot connect to galaxy instance @ " + self.instance_url) sys.exit() else: logging.info("Successfully connected to galaxy instance @ " + self.instance_url) def prepare_history(self): """ Galaxy instance startup in preparation for importing datasets and running a workflow - Add organism and analyses into the chado database - Get any other existing organisms IDs before updating the galaxy instance --> separate Calling this function is mandatory to have a working galaxy instance history :return: """ self.connect_to_instance() histories = self.instance.histories.get_histories(name=str(self.full_name)) # Add organism (species) to chado logging.info("Adding organism to the instance's chado database") if self.common == "" or self.common is None: self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.3", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus_uppercase, "species": self.chado_species_name, "common": self.abbreviation}) else: self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.3", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus_uppercase, "species": self.chado_species_name, "common": self.common}) # Add OGS analysis to chado logging.info("Adding OGS analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.3", history_id=self.history_id, tool_inputs={"name": self.full_name_lowercase + " OGS" + self.ogs_version, "program": "Performed by Genoscope", "programversion": str(self.sex + " OGS" + self.ogs_version), "sourcename": "Genoscope", "date_executed": self.date}) # Add genome analysis to chado logging.info("Adding genome analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.full_name_lowercase + " genome v" + self.genome_version, "program": "Performed by Genoscope", "programversion": str(self.sex + "genome v" + self.genome_version), "sourcename": "Genoscope", "date_executed": self.date}) # Also get the organism and analyses IDs self.get_organism_and_analyses_ids() logging.info("Finished initializing instance") def run_workflow(self, workflow_path, workflow_parameters, datamap): """ Run the "main" workflow in the galaxy instance - import data to library - load fasta and gff - sync with tripal - add jbrowse + organism - fill in the tripal views TODO: map tool name to step id :param workflow_name: :param workflow_parameters: :param datamap: :return: """ logging.info("importing workflow: " + str(workflow_path)) workflow_name = "demo" # for workflow demo workflow_ga_file = workflow_path # Name the workflow in galaxy if self.strain != "": custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) elif self.sex != "": custom_ga_file = "_".join([self.genus, self.species, self.sex]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) else: custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) # Solving format issues in the .ga (encoding errors when importing the file via bioblend) with open(workflow_ga_file, 'r') as ga_in_file: # workflow = str(ga_in_file.readlines()) # # Ugly fix for the jbrowse parameters (formatting) --> TODO: OBSOLETE (everything set at runtime) # workflow = workflow.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}', # str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"') # workflow = workflow.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"', # str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"') # workflow = workflow.replace("\\\\", "\\") # to restore the correct amount of backslashes in the workflow string before import # # OBSOLETE # workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', # "http://" + self.config["custom_host"] + ":8888/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") # # The json dict might sometimes turn to be invalid for unknown reasons and the json module will fail to decode it (galaxy export error) # workflow = workflow[2:-2] # if the line under doesn't output a correct json # # workflow = workflow[:-2] # if the line above doesn't output a correct json # Store the decoded json dictionary workflow_dict = json.load(ga_in_file) self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) self.workflow_name = workflow_name workflow_attributes = self.instance.workflows.get_workflows(name=self.workflow_name) workflow_id = workflow_attributes[0]["id"] show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id) logging.debug("Workflow ID: " + workflow_id) self.instance.workflows.invoke_workflow(workflow_id=workflow_id, history_id=self.history_id, params=workflow_parameters, inputs=datamap, inputs_by="") # self.instance.workflows.delete_workflow(workflow_id=workflow_id) # TODO : Keep for prod? (add a "if test" condition) def import_datasets_into_history(self): """ Find datasets in a library, get their ID and import thme into the current history if they are not already :return: """ # Instanciate the instance gio = GalaxyInstance(url=self.instance_url, email=self.config["custom_galaxy_default_admin_email"], password=self.config["custom_galaxy_default_admin_password"]) prj_lib = gio.libraries.get_previews(name="Project Data") self.library_id = prj_lib[0].id instance_source_data_folders = self.instance.libraries.get_folders(library_id=str(self.library_id)) folders_ids = {} current_folder_name = "" # Loop over the folders in the library and map folders names to their IDs for i in instance_source_data_folders: for k, v in i.items(): if k == "name": folders_ids[v] = 0 current_folder_name = v if k == "id": folders_ids[current_folder_name] = v # Iterating over the folders to find datasets and map datasets to their IDs logging.info("Datasets IDs: ") for k, v in folders_ids.items(): if k == "/genome": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) final_sub_folder_content = self.instance.folders.show_folder(folder_id=sub_folder_content["folder_contents"][0]["id"], contents=True) for k2, v2 in final_sub_folder_content.items(): for e in v2: if type(e) == dict: if e["name"].endswith(".fa"): self.datasets["genome_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) if k == "/annotation": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) final_sub_folder_content = self.instance.folders.show_folder(folder_id=sub_folder_content["folder_contents"][0]["id"], contents=True) for k2, v2 in final_sub_folder_content.items(): for e in v2: if type(e) == dict: # TODO: manage versions? (differentiate between the correct folders using self.config) if "transcripts" in e["name"]: self.datasets["transcripts_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) elif "proteins" in e["name"]: self.datasets["proteins_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) elif "gff" in e["name"]: self.datasets["gff_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) logging.info("Uploading datasets into history %s" % self.history_id) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"]) return {"history_id": self.history_id, "library_id": self.library_id, "datasets": self.datasets} def get_organism_and_analyses_ids(self): """ Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado doesn't accept organism/analyses names as valid inputs :return: """ # Get the ID for the current organism in chado org = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.3", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus_uppercase, "species": self.chado_species_name, "common": self.common}) org_job_out = org["outputs"][0]["id"] org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out) try: org_output = json.loads(org_json_output)[0] self.org_id = str(org_output["organism_id"]) # id needs to be a str to be recognized by chado tools except IndexError: logging.debug("No organism matching " + self.full_name + " exists in the instance's chado database") # Get the ID for the OGS analysis in chado ogs_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.3", history_id=self.history_id, tool_inputs={"name": self.full_name_lowercase + " OGS" + self.ogs_version}) ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"] ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out) try: ogs_analysis_output = json.loads(ogs_analysis_json_output)[0] self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching OGS analysis exists in the instance's chado database") # Get the ID for the genome analysis in chado genome_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.3", history_id=self.history_id, tool_inputs={"name": self.full_name_lowercase + " genome v" + self.genome_version}) genome_analysis_job_out = genome_analysis["outputs"][0]["id"] genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out) try: genome_analysis_output = json.loads(genome_analysis_json_output)[0] self.genome_analysis_id = str(genome_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching genome analysis exists in the instance's chado database") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction " "with galaxy instances for GGA" ", following the protocol @ " "http://gitlab.sb-roscoff.fr/abims/e-infra/gga") parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_true") parser.add_argument("--config", type=str, help="Config path, default to the 'config' file inside the script repository") parser.add_argument("--main-directory", type=str, help="Where the stack containers will be located, defaults to working directory") parser.add_argument("--workflow", "-w", type=str, help="Worfklow to run") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) # Parsing the config file if provided, using the default config otherwise if not args.config: args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config") else: args.config = os.path.abspath(args.config) if not args.main_directory: args.main_directory = os.getcwd() else: args.main_directory = os.path.abspath(args.main_directory) sp_dict_list = utilities.parse_input(args.input) for sp_dict in sp_dict_list: # Creating an instance of the RunWorkflow object for the current organism run_workflow_for_current_organism = RunWorkflow(parameters_dictionary=sp_dict) # Checking if user specified a workflow to run if not args.workflow: logging.critical("No workflow specified, exiting") sys.exit() else: workflow = os.path.abspath(args.workflow) # Starting logging.info("run_workflow.py called for %s" % run_workflow_for_current_organism.full_name) # Setting some of the instance attributes run_workflow_for_current_organism.main_dir = args.main_directory run_workflow_for_current_organism.species_dir = os.path.join(run_workflow_for_current_organism.main_dir, run_workflow_for_current_organism.genus_species + "/") # Parse the config yaml file run_workflow_for_current_organism.config = utilities.parse_config(args.config) # Set the instance url attribute for env_variable, value in run_workflow_for_current_organism.config.items(): if env_variable == "custom_host": # TODO: run_workflow_for_current_organism.instance_url = "http://{0}:8888/sp/{1}_{2}/galaxy/".format( value, run_workflow_for_current_organism.genus_lowercase, run_workflow_for_current_organism.species) break else: run_workflow_for_current_organism.instance_url = "http://localhost:8888/sp/{0}_{1}/galaxy/".format( run_workflow_for_current_organism.genus_lowercase, run_workflow_for_current_organism.species) run_workflow_for_current_organism.connect_to_instance() run_workflow_for_current_organism.set_get_history() # run_workflow_for_current_organism.get_species_history_id() # Prepare the instance+history for the current organism (add organism and analyses in Chado) --> add argument? # (althought there is no risk as chado refuses to duplicate an analysis/organism) run_workflow_for_current_organism.prepare_history() # Get the attributes of the instance and project data files run_workflow_for_current_organism.get_instance_attributes() # Import datasets into history (needs to be done in gga_load_data??) run_workflow_for_current_organism.import_datasets_into_history() workflow_parameters = dict() # Explicit workflow parameter names # TODO: Use an external mapping file instead? # DEMO WORKFLOW PARAM_LOAD_FASTA_IN_HISTORY = "0" PARAM_LOAD_FASTA_IN_CHADO = "1" # PARAM_SYNC_ORGANISM_INTO_TRIPAL = "2" # PARAM_SYNC_GENOME_ANALYSIS_INTO_TRIPAL = "3" # PARAM_SYNC_FEATURES_INTO_TRIPAL = "4" # Mapping parameters workflow_parameters[PARAM_LOAD_FASTA_IN_HISTORY] = {} workflow_parameters[PARAM_LOAD_FASTA_IN_CHADO] = {"organism": run_workflow_for_current_organism.org_id, "analysis_id": run_workflow_for_current_organism.genome_analysis_id, "do_update": "true"} # workflow_parameters[PARAM_SYNC_ORGANISM_INTO_TRIPAL] = {"organism_id": run_workflow_for_current_organism.org_id} # workflow_parameters[PARAM_SYNC_GENOME_ANALYSIS_INTO_TRIPAL] = {"analysis_id": run_workflow_for_current_organism.ogs_analysis_id} # workflow_parameters[PARAM_SYNC_FEATURES_INTO_TRIPAL] = {"organism_id": run_workflow_for_current_organism.org_id} run_workflow_for_current_organism.datamap = dict() run_workflow_for_current_organism.datamap[PARAM_LOAD_FASTA_IN_HISTORY] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["genome_file"]} """COMMENTED FOR THE DEMO""" # # Base worflow (loading data in chado and first sync into tripal) # PARAM_LOAD_FILE1_INTO_HISTORY, PARAM_LOAD_FILE2_INTO_HISTORY, PARAM_LOAD_FILE3_INTO_HISTORY, PARAM_LOAD_FILE4_INTO_HISTORY = "0", "1", "2", "3" # PARAM_LOAD_FASTA_IN_CHADO = "4" # PARAM_LOAD_GFF_IN_CHADO = "5" # PARAM_SYNC_ORGANISM_INTO_TRIPAL = "6" # PARAM_SYNC_GENOME_ANALYSIS_INTO_TRIPAL = "7" # PARAM_SYNC_OGS_ANALYSIS_INTO_TRIPAL = "8" # PARAM_SYNC_FEATURES_INTO_TRIPAL = "9" # workflow_parameters[PARAM_LOAD_FILE1_INTO_HISTORY] = {} # workflow_parameters[PARAM_LOAD_FILE2_INTO_HISTORY] = {} # workflow_parameters[PARAM_LOAD_FILE3_INTO_HISTORY] = {} # workflow_parameters[PARAM_LOAD_FILE4_INTO_HISTORY] = {} # workflow_parameters[PARAM_LOAD_FASTA_IN_CHADO] = {"organism": run_workflow_for_current_organism.org_id, # "analysis_id": run_workflow_for_current_organism.genome_analysis_id, # "do_update": "true"} # workflow_parameters[PARAM_LOAD_GFF_IN_CHADO] = {"organism": run_workflow_for_current_organism.org_id, # "analysis_id": run_workflow_for_current_organism.ogs_analysis_id} # workflow_parameters[PARAM_SYNC_ORGANISM_INTO_TRIPAL] = {"organism_id": run_workflow_for_current_organism.org_id} # workflow_parameters[PARAM_SYNC_GENOME_ANALYSIS_INTO_TRIPAL] = {"analysis_id": run_workflow_for_current_organism.ogs_analysis_id} # workflow_parameters[PARAM_SYNC_OGS_ANALYSIS_INTO_TRIPAL] = {"analysis_id": run_workflow_for_current_organism.genome_analysis_id} # workflow_parameters[PARAM_SYNC_FEATURES_INTO_TRIPAL] = {"organism_id": run_workflow_for_current_organism.org_id} # # Loading files into history works a bit different than the others as it's not a GMOD tool but a standard Galaxy tool # # It requires this additional "datamap" (conveniently named "datamap" here), requiring the source type of the file and its corresponding ID (unique) # run_workflow_for_current_organism.datamap = dict() # run_workflow_for_current_organism.datamap[PARAM_LOAD_FILE1_INTO_HISTORY] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["genome_file"]} # run_workflow_for_current_organism.datamap[PARAM_LOAD_FILE2_INTO_HISTORY] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["gff_file"]} # run_workflow_for_current_organism.datamap[PARAM_LOAD_FILE3_INTO_HISTORY] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["proteins_file"]} # run_workflow_for_current_organism.datamap[PARAM_LOAD_FILE4_INTO_HISTORY] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["transcripts_file"]} # Run the workflow with the parameters set above run_workflow_for_current_organism.run_workflow(workflow_path=workflow, workflow_parameters=workflow_parameters, datamap=run_workflow_for_current_organism.datamap) # WIP: metadata # metadata[genus_species_strain_sex]["workflows_run"] = metadata[genus_species_strain_sex]["workflows_run"].append("fooS")