diff --git a/run_workflow_phaeoexplorer.py b/gga_run_workflow_phaeo_blast_interpro.py old mode 100755 new mode 100644 similarity index 99% rename from run_workflow_phaeoexplorer.py rename to gga_run_workflow_phaeo_blast_interpro.py index 8656ab079b03d48123c341262cafc58f9efcffdd..ddbc5838adb6ee3ffa6bb73aca0e40ad7dc502e0 --- a/run_workflow_phaeoexplorer.py +++ b/gga_run_workflow_phaeo_blast_interpro.py @@ -16,14 +16,9 @@ import utilities_bioblend import speciesData import constants import constants_phaeo +import runWorkflowPhaeo -""" -gga_init.py - -Usage: $ python3 gga_init.py -i input_example.yml --config [config file] [OPTIONS] -""" - -class StrainWorkflowParam: +class OrgWorkflowParamJbrowse(runWorkflowPhaeo.OrgWorkflowParam): def __init__(self, genus_species, strain_sex, genus_uppercase, chado_species_name, full_name, species_folder_name, org_id, history_id, instance, genome_analysis_id=None, ogs_analysis_id=None, blastp_analysis_id=None, interpro_analysis_id=None, @@ -41,13 +36,15 @@ class StrainWorkflowParam: self.interpro_analysis_id = interpro_analysis_id self.history_id = history_id self.instance = instance - self.genome_hda_id = genome_hda_id, - self.gff_hda_id = gff_hda_id, - self.transcripts_hda_id = transcripts_hda_id, - self.proteins_hda_id = proteins_hda_id, - self.blastp_hda_id = blastp_hda_id, - self.blastx_hda_id = blastx_hda_id, - self.interproscan_hda_id = interproscan_hda_id, + self.genome_hda_id = genome_hda_id + self.gff_hda_id = gff_hda_id + self.transcripts_hda_id = transcripts_hda_id + self.proteins_hda_id = proteins_hda_id + self.blastp_hda_id = blastp_hda_id + self.blastx_hda_id = blastx_hda_id + self.interproscan_hda_id = interproscan_hda_id + super().__init__(genus_species, strain_sex, genus_uppercase, chado_species_name, full_name, species_folder_name, + org_id, history_id, instance) def check_param_for_workflow_load_fasta_gff_jbrowse(self): params = [self.genus_species, self.strain_sex, self.genus_uppercase, @@ -699,7 +696,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): run_workflow_for_current_organism.import_datasets_into_history() # Create the StrainWorkflowParam object holding all attributes needed for the workflow - sp_wf_param = StrainWorkflowParam( + sp_wf_param = OrgWorkflowParamJbrowse( genus_species=run_workflow_for_current_organism.genus_species, strain_sex=run_workflow_for_current_organism.strain_sex, genus_uppercase = run_workflow_for_current_organism.genus_uppercase, @@ -730,7 +727,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): run_workflow_for_current_organism.import_datasets_into_history() # Create the StrainWorkflowParam object holding all attributes needed for the workflow - sp_wf_param = StrainWorkflowParam( + sp_wf_param = OrgWorkflowParamJbrowse( genus_species=run_workflow_for_current_organism.genus_species, strain_sex=run_workflow_for_current_organism.strain_sex, genus_uppercase = run_workflow_for_current_organism.genus_uppercase, @@ -760,7 +757,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): run_workflow_for_current_organism.import_datasets_into_history() # Create the StrainWorkflowParam object holding all attributes needed for the workflow - sp_wf_param = StrainWorkflowParam( + sp_wf_param = OrgWorkflowParamJbrowse( genus_species=run_workflow_for_current_organism.genus_species, strain_sex=run_workflow_for_current_organism.strain_sex, genus_uppercase = run_workflow_for_current_organism.genus_uppercase, @@ -786,7 +783,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): def install_changesets_revisions_from_workflow(instance, workflow_path): """ - Read a .ga file to extract the information about the different tools called. + Read a .ga file to extract the information about the different tools called. Check if every tool is installed via a "show_tool". If a tool is not installed (versions don't match), send a warning to the logger and install the required changeset (matching the tool version) Doesn't do anything if versions match diff --git a/gga_run_workflow_phaeo_jbrowse.py b/gga_run_workflow_phaeo_jbrowse.py new file mode 100644 index 0000000000000000000000000000000000000000..500f2a73fca86b3f40d56597ad88902d4a95c3bf --- /dev/null +++ b/gga_run_workflow_phaeo_jbrowse.py @@ -0,0 +1,607 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import bioblend.galaxy.objects +import argparse +import os +import logging +import sys +import json +import time + +import utilities +import utilities_bioblend +import constants +import constants_phaeo +import runWorkflowPhaeo + +class OrgWorkflowParamJbrowse(runWorkflowPhaeo.OrgWorkflowParam): + + def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name, + org_id, history_id, instance, genome_analysis_id=None, ogs_analysis_id=None, + genome_hda_id=None, gff_hda_id=None, transcripts_hda_id=None, proteins_hda_id=None): + self.genome_analysis_id = genome_analysis_id + self.ogs_analysis_id = ogs_analysis_id + self.genome_hda_id = genome_hda_id + self.gff_hda_id = gff_hda_id + self.transcripts_hda_id = transcripts_hda_id + self.proteins_hda_id = proteins_hda_id + super().__init__(genus_uppercase, chado_species_name, full_name, species_folder_name, + org_id, history_id, instance) + + def check_param(self): + params = [self.genus_uppercase, + self.chado_species_name, + self.full_name, + self.species_folder_name, + self.org_id, + self.history_id, + self.instance, + self.genome_analysis_id, + self.ogs_analysis_id, + self.genome_hda_id, + self.gff_hda_id, + self.transcripts_hda_id, + self.proteins_hda_id] + utilities_bioblend.check_wf_param(self.full_name, params) + +class RunWorkflowJbrowse(runWorkflowPhaeo.RunWorkflow): + """ + Run a workflow into the galaxy instance's history of a given species + + + This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow, + provided the user creates their own workflow in a .ga format, and change the set_parameters function + to have the correct parameters for their workflow + + """ + + def __init__(self, parameters_dictionary): + + super().__init__(parameters_dictionary) + + self.chado_species_name = " ".join(utilities.filter_empty_not_empty_items( + [self.species, self.strain, self.sex])["not_empty"]) + + self.abbreviation = self.genus_uppercase[0] + ". " + self.chado_species_name + + self.common = self.name + if not self.common_name is None and self.common_name != "": + self.common = self.common_name + + self.genome_analysis_name = "genome v{0} of {1}".format(self.genome_version, self.full_name) + self.genome_analysis_programversion = "genome v{0}".format(self.genome_version) + self.genome_analysis_sourcename = self.full_name + + self.ogs_analysis_name = "OGS{0} of {1}".format(self.ogs_version, self.full_name) + self.ogs_analysis_programversion = "OGS{0}".format(self.ogs_version) + self.ogs_analysis_sourcename = self.full_name + + self.genome_hda_id = None + self.gff_hda_id = None + self.transcripts_hda_id = None + self.proteins_hda_id = None + + def install_changesets_revisions_for_individual_tools(self): + """ + This function is used to verify that installed tools called outside workflows have the correct versions and changesets + If it finds versions don't match, will install the correct version + changeset in the instance + Doesn't do anything if versions match + + :return: + """ + + logging.info("Validating installed individual tools versions and changesets") + + # Verify that the add_organism and add_analysis versions are correct in the instance + # changeset for 2.3.4+galaxy0 has to be manually found because there is no way to get the wanted changeset of a non installed tool via bioblend + # except for workflows (.ga) that already contain the changeset revisions inside the steps ids + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, + version=constants_phaeo.GET_ORGANISMS_TOOL_VERSION, + changeset_revision=constants_phaeo.GET_ORGANISMS_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, + version=constants_phaeo.GET_ANALYSES_TOOL_VERSION, + changeset_revision=constants_phaeo.GET_ANALYSES_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID, + version=constants_phaeo.ADD_ORGANISM_TOOL_VERSION, + changeset_revision=constants_phaeo.ADD_ORGANISM_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, + version=constants_phaeo.ADD_ANALYSIS_TOOL_VERSION, + changeset_revision=constants_phaeo.ADD_ANALYSIS_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, + version=constants_phaeo.ANALYSIS_SYNC_TOOL_VERSION, + changeset_revision=constants_phaeo.ANALYSIS_SYNC_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID, + version=constants_phaeo.ORGANISM_SYNC_TOOL_VERSION, + changeset_revision=constants_phaeo.ORGANISM_SYNC_TOOL_CHANGESET_REVISION, + instance=self.instance) + + logging.info("Success: individual tools versions and changesets validated") + + def add_organism_and_sync(self): + + get_organisms_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( + instance=self.instance, + tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, + history_id=self.history_id, + tool_inputs={}, + time_sleep=10 + ) + organisms_dict_list = json.loads(get_organisms_tool_dataset) # Turn the dataset into a list for parsing + + org_id = None + + # Look up list of outputs (dictionaries) + for org_dict in organisms_dict_list: + if org_dict["genus"] == self.genus_uppercase and org_dict["species"] == self.chado_species_name: + org_id = str(org_dict["organism_id"]) # id needs to be a str to be recognized by chado tools + + if org_id is None: + add_organism_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( + instance=self.instance, + tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID, + history_id=self.history_id, + tool_inputs={"abbr": self.abbreviation, + "genus": self.genus_uppercase, + "species": self.chado_species_name, + "common": self.common}) + organism_dict = json.loads(add_organism_tool_dataset) + org_id = str(organism_dict["organism_id"]) # id needs to be a str to be recognized by chado tools + + # Synchronize newly added organism in Tripal + logging.info("Synchronizing organism %s in Tripal" % self.full_name) + time.sleep(60) + utilities_bioblend.run_tool( + instance=self.instance, + tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID, + history_id=self.history_id, + tool_inputs={"organism_id": org_id}) + + return org_id + + def import_datasets_into_history(self): + """ + Find datasets in a library, get their ID and import them into the current history if they are not already + """ + + genome_ldda_id = None + transcripts_ldda_id = None + proteins_ldda_id = None + gff_ldda_id = None + + genome_hda_id = None + gff_hda_id = None + transcripts_hda_id = None + proteins_hda_id = None + + folder_dict_list = self.instance.libraries.get_folders(library_id=str(self.library_id)) + + folders_id_dict = {} + + # Loop over the folders in the library and map folders names to their IDs + for folder_dict in folder_dict_list: + folders_id_dict[folder_dict["name"]] = folder_dict["id"] + + # Iterating over the folders to find datasets and map datasets to their IDs + for folder_name, folder_id in folders_id_dict.items(): + if folder_name == "/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version): + sub_folder_content = self.instance.folders.show_folder(folder_id=folder_id, contents=True) + for value in sub_folder_content.values(): + for e in value: + if type(e) == dict: + if e["name"].endswith(self.genome_filename): + genome_ldda_id = e["ldda_id"] + + if folder_name == "/annotation/{0}/OGS{1}".format(self.species_folder_name, self.ogs_version): + sub_folder_content = self.instance.folders.show_folder(folder_id=folder_id, contents=True) + for value in sub_folder_content.values(): + for e in value: + if type(e) == dict: + ldda_name = e["name"] + ldda_id = e["ldda_id"] + if ldda_name.endswith(self.transcripts_filename): + transcripts_ldda_id = ldda_id + elif ldda_name.endswith(self.proteins_filename): + proteins_ldda_id = ldda_id + elif ldda_name.endswith(self.gff_filename): + gff_ldda_id = ldda_id + + hda_list = self.instance.datasets.get_datasets(self.history_id) + # Finding datasets in history (matching datasets names) + for hda in hda_list: + hda_name = hda["name"] + hda_id = hda["id"] + if hda_name == self.genome_filename: + genome_hda_id = hda_id + if hda_name == self.gff_filename: + gff_hda_id = hda_id + if hda_name == self.transcripts_filename: + transcripts_hda_id = hda_id + if hda_name == self.proteins_filename : + proteins_hda_id = hda_id + + # Import each dataset into history if it is not imported + logging.debug("Uploading datasets into history %s" % self.history_id) + + if genome_hda_id is None: + genome_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=genome_ldda_id) + genome_hda_id = genome_dataset_upload["id"] + if gff_hda_id is None: + gff_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=gff_ldda_id) + gff_hda_id = gff_dataset_upload["id"] + if proteins_hda_id is None: + proteins_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=proteins_ldda_id) + proteins_hda_id = proteins_dataset_upload["id"] + if transcripts_hda_id is None: + transcripts_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=transcripts_ldda_id) + transcripts_hda_id = transcripts_dataset_upload["id"] + + self.genome_hda_id = genome_hda_id + self.gff_hda_id = gff_hda_id + self.transcripts_hda_id = transcripts_hda_id + self.proteins_hda_id = proteins_hda_id + +def prepare_history_and_get_wf_param(sp_dict_list, main_dir, config): + + all_org_wf_param_dict = {} + for sp_dict in sp_dict_list: + + run_workflow_for_current_organism = RunWorkflowJbrowse(parameters_dictionary=sp_dict) + + # Verifying the galaxy container is running + if not utilities_bioblend.check_galaxy_state(network_name=run_workflow_for_current_organism.genus_species, + script_dir=run_workflow_for_current_organism.script_dir): + logging.critical( + "The galaxy container for %s is not ready yet!" % run_workflow_for_current_organism.genus_species) + sys.exit() + + else: + + # Setting some of the instance attributes + run_workflow_for_current_organism.main_dir = main_dir + + run_workflow_for_current_organism.set_galaxy_instance(config) + run_workflow_for_current_organism.set_history() + run_workflow_for_current_organism.install_changesets_revisions_for_individual_tools() + run_workflow_for_current_organism.import_datasets_into_history() + + analyses_dict_list = run_workflow_for_current_organism.get_analyses() + + org_id = run_workflow_for_current_organism.add_organism_and_sync() + genome_analysis_id = run_workflow_for_current_organism.add_analysis_and_sync( + analyses_dict_list=analyses_dict_list, + analysis_name=run_workflow_for_current_organism.genome_analysis_name, + analysis_programversion=run_workflow_for_current_organism.genome_analysis_programversion, + analysis_sourcename=run_workflow_for_current_organism.genome_analysis_sourcename + ) + ogs_analysis_id = run_workflow_for_current_organism.add_analysis_and_sync( + analyses_dict_list=analyses_dict_list, + analysis_name=run_workflow_for_current_organism.ogs_analysis_name, + analysis_programversion=run_workflow_for_current_organism.ogs_analysis_programversion, + analysis_sourcename=run_workflow_for_current_organism.ogs_analysis_sourcename + ) + + # Create the StrainWorkflowParam object holding all attributes needed for the workflow + org_wf_param = OrgWorkflowParamJbrowse( + genus_uppercase=run_workflow_for_current_organism.genus_uppercase, + full_name=run_workflow_for_current_organism.full_name, + species_folder_name=run_workflow_for_current_organism.species_folder_name, + chado_species_name=run_workflow_for_current_organism.chado_species_name, + org_id=org_id, + genome_analysis_id=genome_analysis_id, + ogs_analysis_id=ogs_analysis_id, + genome_hda_id=run_workflow_for_current_organism.genome_hda_id, + gff_hda_id=run_workflow_for_current_organism.gff_hda_id, + transcripts_hda_id=run_workflow_for_current_organism.transcripts_hda_id, + proteins_hda_id=run_workflow_for_current_organism.proteins_hda_id, + history_id=run_workflow_for_current_organism.history_id, + instance=run_workflow_for_current_organism.instance + ) + org_wf_param.check_param() + + # Add the species dictionary to the complete dictionary + # This dictionary contains every organism present in the input file + # Its structure is the following: + # {genus species: {strain1_sex1: {variables_key: variables_values}, strain1_sex2: {variables_key: variables_values}}} + if not run_workflow_for_current_organism.genus_species in all_org_wf_param_dict.keys(): + all_org_wf_param_dict[run_workflow_for_current_organism.genus_species] = { + run_workflow_for_current_organism.strain_sex: org_wf_param} + else: + if not run_workflow_for_current_organism.strain_sex in all_org_wf_param_dict[ + run_workflow_for_current_organism.genus_species].keys(): + all_org_wf_param_dict[run_workflow_for_current_organism.genus_species][ + run_workflow_for_current_organism.strain_sex] = org_wf_param + else: + logging.error("Duplicate organism with 'genus_species' = '{0}' and 'strain_sex' = '{1}'".format( + run_workflow_for_current_organism.genus_species, run_workflow_for_current_organism.strain_sex)) + + return all_org_wf_param_dict + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run Galaxy workflows, specific to Phaeoexplorer data") + + parser.add_argument("input", + type=str, + help="Input file (yml)") + + parser.add_argument("-v", "--verbose", + help="Increase output verbosity", + action="store_true") + + parser.add_argument("--config", + type=str, + help="Config path, default to the 'config' file inside the script repository") + + parser.add_argument("--main-directory", + type=str, + help="Where the stack containers will be located, defaults to working directory") + + parser.add_argument("--workflow", "-w", + type=str, + help="Worfklow to run. Available options: load_fasta_gff_jbrowse, blast, interpro") + + args = parser.parse_args() + + bioblend_logger = logging.getLogger("bioblend") + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + bioblend_logger.setLevel(logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + bioblend_logger.setLevel(logging.INFO) + + # Parsing the config file if provided, using the default config otherwise + if args.config: + config_file = os.path.abspath(args.config) + else: + config_file = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), constants.DEFAULT_CONFIG) + + main_dir = None + if not args.main_directory: + main_dir = os.getcwd() + else: + main_dir = os.path.abspath(args.main_directory) + + config = utilities.parse_config(config_file) + sp_dict_list = utilities.parse_input(args.input) + script_dir = os.path.dirname(os.path.realpath(sys.argv[0])) + + all_org_wf_param_dict = prepare_history_and_get_wf_param( + sp_dict_list=sp_dict_list, + main_dir=main_dir, + config=config) + + for genus_species, strains in all_org_wf_param_dict.items(): + strains_list = list(strains.keys()) + strains_count = len(strains_list) + + if strains_count == 1: + logging.info("Input species %s: 1 strain detected in input dictionary" % genus_species) + strain_sex = list(strains.keys())[0] + org_wf_param = strains[strain_sex] + + # Set workflow path (1 organism) + workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_1_ORG_FILE) + + # Check if the versions of tools specified in the workflow are installed in galaxy + utilities_bioblend.install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=org_wf_param.instance) + + # Set the workflow parameters (individual tools runtime parameters in the workflow) + workflow_parameters = {} + # Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method) + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_FASTA] = { + "organism": org_wf_param.org_id, + "analysis_id": org_wf_param.genome_analysis_id, + "do_update": "true"} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_GFF] = { + "organism": org_wf_param.org_id, + "analysis_id": org_wf_param.ogs_analysis_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_FEATURE_SYNC] = { + "organism_id": org_wf_param.org_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_POPULATE_VIEWS] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_INDEX] = {} + + # Set datamap (mapping of input files in the workflow) + datamap = {} + datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {"src": "hda", "id": org_wf_param.genome_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {"src": "hda", "id": org_wf_param.gff_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {"src": "hda", "id": org_wf_param.proteins_hda_id} + + with open(workflow_path, 'r') as ga_in_file: + + # Store the decoded json dictionary + workflow_dict = json.load(ga_in_file) + workflow_name = workflow_dict["name"] + + # For the Jbrowse tool, we unfortunately have to manually edit the parameters instead of setting them + # as runtime values, using runtime parameters makes the tool throw an internal critical error ("replace not found" error) + # Scratchgmod test: need "http" (or "https"), the hostname (+ port) + if constants.CONF_JBROWSE_MENU_URL not in config.keys(): + # default + root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME]) + else: + root_url = config[constants.CONF_JBROWSE_MENU_URL] + species_strain_sex = org_wf_param.chado_species_name.replace(" ", "-") + jbrowse_menu_url = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( + root_url=root_url, + genus_sp=genus_species, + Genus=org_wf_param.genus_uppercase, + species_strain_sex=species_strain_sex, + id="{id}") + # Replace values in the workflow dictionary + workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"] = \ + workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"]\ + .replace("__MENU_URL_ORG__", jbrowse_menu_url) + workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"] = \ + workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"]\ + .replace("__DISPLAY_NAME_ORG__", org_wf_param.full_name)\ + .replace("__UNIQUE_ID_ORG__", org_wf_param.species_folder_name) + + # Import the workflow in galaxy as a dict + org_wf_param.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) + + # Get its attributes + workflow_dict_list = org_wf_param.instance.workflows.get_workflows(name=workflow_name) + # Then get its ID (required to invoke the workflow) + workflow_id = workflow_dict_list[0]["id"] # Index 0 is the most recently imported workflow (the one we want) + logging.debug("Workflow ID: %s" % workflow_id) + # Check if the workflow is found + try: + show_workflow = org_wf_param.instance.workflows.show_workflow(workflow_id=workflow_id) + except bioblend.ConnectionError: + logging.warning("Error finding workflow %s" % workflow_name) + + # Finally, invoke the workflow along with its datamap, parameters and the history in which to invoke it + org_wf_param.instance.workflows.invoke_workflow( + workflow_id=workflow_id, + history_id=org_wf_param.history_id, + params=workflow_parameters, + inputs=datamap, + allow_tool_state_corrections=True) + + logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name)) + + if strains_count == 2: + + logging.info("Input organism %s: 2 species detected in input dictionary" % genus_species) + strain_sex_org1 = strains_list[0] + strain_sex_org2 = strains_list[1] + sp_wf_param_org1 = strains[strain_sex_org1] + sp_wf_param_org2 = strains[strain_sex_org2] + + # Set workflow path (2 organisms) + workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_2_ORG_FILE) + + # Check if the versions of tools specified in the workflow are installed in galaxy + utilities_bioblend.install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=sp_wf_param_org1.instance) + + # Set the workflow parameters (individual tools runtime parameters in the workflow) + workflow_parameters = {} + # Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method) + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {} + # Organism 1 + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG1] = { + "organism": sp_wf_param_org1.org_id, + "analysis_id": sp_wf_param_org1.genome_analysis_id, + "do_update": "true"} + # workflow_parameters[JBROWSE_ORG1] = {"jbrowse_menu_url": jbrowse_menu_url_org1} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG1] = { + "organism": sp_wf_param_org1.org_id, + "analysis_id": sp_wf_param_org1.ogs_analysis_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG1] = { + "organism_id": sp_wf_param_org1.org_id} + # workflow_parameters[JBROWSE_CONTAINER] = {"organisms": [{"name": org1_full_name, "unique_id": org1_species_folder_name, }, {"name": org2_full_name, "unique_id": org2_species_folder_name}]} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER] = {} + # Organism 2 + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG2] = { + "organism": sp_wf_param_org2.org_id, + "analysis_id": sp_wf_param_org2.genome_analysis_id, + "do_update": "true"} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG2] = { + "organism": sp_wf_param_org2.org_id, + "analysis_id": sp_wf_param_org2.ogs_analysis_id} + # workflow_parameters[JRBOWSE_ORG2] = {"jbrowse_menu_url": jbrowse_menu_url_org2} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG2] = { + "organism_id": sp_wf_param_org2.org_id} + # POPULATE + INDEX DATA + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_POPULATE_VIEWS] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_INDEX] = {} + + # Set datamap (mapping of input files in the workflow) + datamap = {} + # Organism 1 + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {"src": "hda", "id": sp_wf_param_org1.genome_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {"src": "hda", "id": sp_wf_param_org1.gff_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {"src": "hda", "id": sp_wf_param_org1.proteins_hda_id} + # Organism 2 + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {"src": "hda", "id": sp_wf_param_org2.genome_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {"src": "hda", "id": sp_wf_param_org2.gff_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {"src": "hda", "id": sp_wf_param_org2.proteins_hda_id} + + with open(workflow_path, 'r') as ga_in_file: + + # Store the decoded json dictionary + workflow_dict = json.load(ga_in_file) + workflow_name = workflow_dict["name"] + + # For the Jbrowse tool, we unfortunately have to manually edit the parameters instead of setting them + # as runtime values, using runtime parameters makes the tool throw an internal critical error ("replace not found" error) + # Scratchgmod test: need "http" (or "https"), the hostname (+ port) + if constants.CONF_JBROWSE_MENU_URL not in config.keys(): + # default + root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME]) + else: + root_url = config[constants.CONF_JBROWSE_MENU_URL] + species_strain_sex_org1 = sp_wf_param_org1.chado_species_name.replace(" ", "-") + species_strain_sex_org2 = sp_wf_param_org2.chado_species_name.replace(" ", "-") + jbrowse_menu_url_org1 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( + root_url=root_url, + genus_sp=genus_species, + Genus=sp_wf_param_org1.genus_uppercase, + species_strain_sex=species_strain_sex_org1, + id="{id}") + jbrowse_menu_url_org2 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( + root_url=root_url, + genus_sp=genus_species, + Genus=sp_wf_param_org2.genus_uppercase, + species_strain_sex=species_strain_sex_org2, + id="{id}") + # Replace values in the workflow dictionary + jbrowse_tool_state_org1 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1]["tool_state"] + jbrowse_tool_state_org1 = jbrowse_tool_state_org1.replace("__MENU_URL_ORG1__", jbrowse_menu_url_org1) + jbrowse_tool_state_org2 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2]["tool_state"] + jbrowse_tool_state_org2 = jbrowse_tool_state_org2.replace("__MENU_URL_ORG2__", jbrowse_menu_url_org2) + # The UNIQUE_ID is specific to a combination genus_species_strain_sex so every combination should have its unique workflow + # in galaxy --> define a naming method for these workflows + workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"] = \ + workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"]\ + .replace("__DISPLAY_NAME_ORG1__", sp_wf_param_org1.full_name)\ + .replace("__UNIQUE_ID_ORG1__", sp_wf_param_org1.species_folder_name)\ + .replace("__DISPLAY_NAME_ORG2__", sp_wf_param_org2.full_name)\ + .replace("__UNIQUE_ID_ORG2__", sp_wf_param_org2.species_folder_name) + + # Import the workflow in galaxy as a dict + sp_wf_param_org1.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) + + # Get its attributes + workflow_dict_list = sp_wf_param_org1.instance.workflows.get_workflows(name=workflow_name) + # Then get its ID (required to invoke the workflow) + workflow_id = workflow_dict_list[0]["id"] # Index 0 is the most recently imported workflow (the one we want) + logging.debug("Workflow ID: %s" % workflow_id) + # Check if the workflow is found + try: + show_workflow = sp_wf_param_org1.instance.workflows.show_workflow(workflow_id=workflow_id) + except bioblend.ConnectionError: + logging.warning("Error finding workflow %s" % workflow_name) + + # Finally, invoke the workflow alogn with its datamap, parameters and the history in which to invoke it + sp_wf_param_org1.instance.workflows.invoke_workflow( + workflow_id=workflow_id, + history_id=sp_wf_param_org1.history_id, + params=workflow_parameters, + inputs=datamap, + allow_tool_state_corrections=True) + + logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name)) diff --git a/runWorkflowPhaeo.py b/runWorkflowPhaeo.py new file mode 100644 index 0000000000000000000000000000000000000000..1aede4a03d60106c67aede5e582ff3285582f498 --- /dev/null +++ b/runWorkflowPhaeo.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +import json +import time + +import utilities_bioblend +import speciesData +import constants +import constants_phaeo + +class OrgWorkflowParam: + + def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name, + org_id, history_id, instance): + self.genus_uppercase = genus_uppercase + self.chado_species_name = chado_species_name, + self.full_name = full_name + self.species_folder_name = species_folder_name + self.org_id = org_id + self.history_id = history_id + self.instance = instance + +class RunWorkflow(speciesData.SpeciesData): + """ + Run a workflow into the galaxy instance's history of a given species + + + This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow, + provided the user creates their own workflow in a .ga format, and change the set_parameters function + to have the correct parameters for their workflow + + """ + + def __init__(self, parameters_dictionary): + + super().__init__(parameters_dictionary) + self.history_name = str(self.genus_species) + + def set_galaxy_instance(self, config): + + # Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1 + instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format( + config[constants.CONF_ALL_HTTP_PORT], + self.genus_species) + + self.instance = utilities_bioblend.get_galaxy_instance( + instance_url=instance_url, + email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], + password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD], + ) + + def set_history(self): + self.history_id = utilities_bioblend.get_history( + instance=self.instance, + history_name=self.history_name) + + def get_analyses(self): + + get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( + instance=self.instance, + tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, + history_id=self.history_id, + tool_inputs={}, + time_sleep=10 + ) + analyses_dict_list = json.loads(get_analyses_tool_dataset) + return analyses_dict_list + + def add_analysis(self, name, programversion, sourcename): + + add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( + instance=self.instance, + tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, + history_id=self.history_id, + tool_inputs={"name": name, + "program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM, + "programversion": programversion, + "sourcename": sourcename, + "date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE}) + analysis_dict = json.loads(add_analysis_tool_dataset) + analysis_id = str(analysis_dict["analysis_id"]) + + return analysis_id + + def sync_analysis(self, analysis_id): + + time.sleep(60) + utilities_bioblend.run_tool( + instance=self.instance, + tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, + history_id=self.history_id, + tool_inputs={"analysis_id": analysis_id}) + + def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename): + """ + Add one analysis to Chado database + Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow) + Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism) + """ + + analysis_id = None + + # Look up list of outputs (dictionaries) + for analyses_dict in analyses_dict_list: + if analyses_dict["name"] == analysis_name: + analysis_id = str(analyses_dict["analysis_id"]) + + if analysis_id is None: + analysis_id = self.add_analysis( + name=analysis_name, + programversion=analysis_programversion, + sourcename=analysis_sourcename + ) + + # Synchronize analysis in Tripal + logging.info("Synchronizing analysis %s in Tripal" % analysis_name) + self.sync_analysis(analysis_id=analysis_id) + + return analysis_id + + def get_invocation_report(self, workflow_name): + """ + Debugging method for workflows + + Simply logs and returns a report of the previous workflow invocation (execution of a workflow in + the instance via the API) + + :param workflow_name: + :return: + """ + + workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name) + workflow_id = workflow_attributes[1]["id"] # Most recently imported workflow (index 1 in the list) + invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id) + invocation_id = invocations[1]["id"] # Most recent invocation + invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id) + + logging.debug(invocation_report) + + return invocation_report diff --git a/utilities_bioblend.py b/utilities_bioblend.py index 99a47addceb2897697c0fe502c598e3eeb0ff4e0..dc19d0e5b5643bc143b5c869da519d4ea509e299 100644 --- a/utilities_bioblend.py +++ b/utilities_bioblend.py @@ -6,6 +6,7 @@ import sys import os import subprocess import time +import json import bioblend from bioblend import galaxy @@ -137,3 +138,33 @@ def install_repository_revision(instance, tool_id, version, changeset_revision): install_tool_dependencies=True, install_repository_dependencies=False, install_resolver_dependencies=True) + +def install_changesets_revisions_from_workflow(instance, workflow_path): + """ + Read a .ga file to extract the information about the different tools called. + Check if every tool is installed via a "show_tool". + If a tool is not installed (versions don't match), send a warning to the logger and install the required changeset (matching the tool version) + Doesn't do anything if versions match + + :return: + """ + + logging.info("Validating that installed tools versions and changesets match workflow versions") + + # Load the workflow file (.ga) in a buffer + with open(workflow_path, 'r') as ga_in_file: + + # Then store the decoded json dictionary + workflow_dict = json.load(ga_in_file) + + # Look up every "step_id" looking for tools + for step in workflow_dict["steps"].values(): + if step["tool_id"]: + # Check if an installed version matches the workflow tool version + # (If it's not installed, the show_tool version returned will be a default version with the suffix "XXXX+0") + install_repository_revision(tool_id=step["tool_id"], + version=step["tool_version"], + changeset_revision=step["tool_shed_repository"]["changeset_revision"], + instance=instance) + + logging.info("Tools versions and changeset_revisions from workflow validated")