From 7cdad582d08bfdca743ceda38a86e77b76419d06 Mon Sep 17 00:00:00 2001 From: Loraine Gueguen <loraine.gueguen@sb-roscoff.fr> Date: Sun, 30 May 2021 15:50:04 +0200 Subject: [PATCH] Create utilities_bioblend.py. Refactor run_wf (WIP). --- phaeo_constants.py => constants_phaeo.py | 0 gga_load_data.py | 37 +- run_workflow_phaeoexplorer.py | 467 ++++++++++------------- utilities.py | 88 ----- utilities_bioblend.py | 139 +++++++ 5 files changed, 345 insertions(+), 386 deletions(-) rename phaeo_constants.py => constants_phaeo.py (100%) create mode 100644 utilities_bioblend.py diff --git a/phaeo_constants.py b/constants_phaeo.py similarity index 100% rename from phaeo_constants.py rename to constants_phaeo.py diff --git a/gga_load_data.py b/gga_load_data.py index d0ef036..2c00165 100755 --- a/gga_load_data.py +++ b/gga_load_data.py @@ -10,10 +10,10 @@ import sys import time import json import yaml -from bioblend import galaxy from bioblend.galaxy.objects import GalaxyInstance import utilities +import utilities_bioblend import speciesData import constants @@ -68,7 +68,7 @@ class LoadData(speciesData.SpeciesData): """ logging.debug("Getting 'Homo sapiens' ID in chado database") - get_sapiens_id_json_output = utilities.run_tool_and_download_single_output_dataset( + get_sapiens_id_json_output = utilities_bioblend.run_tool_and_download_single_output_dataset( self.instance, tool_id=constants.GET_ORGANISMS_TOOL, # If this version if not found, Galaxy will use the one that is found history_id=self.history_id, @@ -78,7 +78,7 @@ class LoadData(speciesData.SpeciesData): try: get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0] sapiens_id = str(get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool - utilities.run_tool( + utilities_bioblend.run_tool( self.instance, tool_id=constants.DELETE_ORGANISMS_TOOL, history_id=self.history_id, @@ -278,29 +278,6 @@ class LoadData(speciesData.SpeciesData): logging.info("Did not find metadata in %s " % meta_file) return self.get_bam_label(dirname, bam_file) - def set_galaxy_instance(self): - """ - Test the connection to the galaxy instance for the current organism - Exit if we cannot connect to the instance - - """ - - logging.info("Connecting to the galaxy instance (%s)" % self.instance_url) - self.instance = galaxy.GalaxyInstance(url=self.instance_url, - email=self.config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - password=self.config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] - ) - - try: - self.instance.histories.get_histories() - except bioblend.ConnectionError: - logging.critical("Cannot connect to galaxy instance (%s) " % self.instance_url) - sys.exit() - else: - logging.info("Successfully connected to galaxy instance (%s) " % self.instance_url) - - return self.instance - if __name__ == "__main__": parser = argparse.ArgumentParser(description="Load data into Galaxy library") @@ -367,11 +344,15 @@ if __name__ == "__main__": load_data_for_current_species.genus_species) # Check the galaxy container state and proceed if the galaxy services are up and running - if utilities.check_galaxy_state(network_name=load_data_for_current_species.genus_species, + if utilities_bioblend.check_galaxy_state(network_name=load_data_for_current_species.genus_species, script_dir=load_data_for_current_species.script_dir): # Create the Galaxy instance - load_data_for_current_species.instance = load_data_for_current_species.set_galaxy_instance() + load_data_for_current_species.instance = utilities_bioblend.get_galaxy_instance( + instance_url=load_data_for_current_species.instance_url, + email=load_data_for_current_species.config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], + password=load_data_for_current_species.config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] + ) # Load the datasets into a galaxy library logging.info("Setting up library for {0} {1}".format(load_data_for_current_species.genus, load_data_for_current_species.species)) diff --git a/run_workflow_phaeoexplorer.py b/run_workflow_phaeoexplorer.py index 9d9b12e..8656ab0 100755 --- a/run_workflow_phaeoexplorer.py +++ b/run_workflow_phaeoexplorer.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import bioblend import bioblend.galaxy.objects import argparse import os @@ -10,13 +9,13 @@ import sys import json import time -from bioblend.galaxy.objects import GalaxyInstance from bioblend import galaxy import utilities +import utilities_bioblend import speciesData import constants -import phaeo_constants +import constants_phaeo """ gga_init.py @@ -26,21 +25,15 @@ Usage: $ python3 gga_init.py -i input_example.yml --config [config file] [OPTION class StrainWorkflowParam: - def __init__(self, genus_species, strain_sex, genus, genus_uppercase, species, chado_species_name, sex, - strain, full_name, species_folder_name, org_id, - history_id, instance, instance_url, email, password, - genome_analysis_id=None, ogs_analysis_id=None, blastp_analysis_id=None, interpro_analysis_id=None, + def __init__(self, genus_species, strain_sex, genus_uppercase, chado_species_name, full_name, species_folder_name, + org_id, history_id, instance, genome_analysis_id=None, ogs_analysis_id=None, blastp_analysis_id=None, interpro_analysis_id=None, genome_hda_id=None, gff_hda_id=None, transcripts_hda_id=None, proteins_hda_id=None, blastp_hda_id=None, blastx_hda_id=None, interproscan_hda_id=None): self.genus_species = genus_species self.strain_sex = strain_sex - self.genus = genus self.genus_uppercase = genus_uppercase - self.species = species self.chado_species_name = chado_species_name, self.full_name = full_name self.species_folder_name = species_folder_name - self.sex = sex - self.strain = strain self.org_id = org_id self.genome_analysis_id = genome_analysis_id self.ogs_analysis_id = ogs_analysis_id @@ -48,9 +41,6 @@ class StrainWorkflowParam: self.interpro_analysis_id = interpro_analysis_id self.history_id = history_id self.instance = instance - self.instance_url = instance_url - self.email = email - self.password = password self.genome_hda_id = genome_hda_id, self.gff_hda_id = gff_hda_id, self.transcripts_hda_id = transcripts_hda_id, @@ -60,31 +50,31 @@ class StrainWorkflowParam: self.interproscan_hda_id = interproscan_hda_id, def check_param_for_workflow_load_fasta_gff_jbrowse(self): - params = [self.genus_species, self.strain_sex, self.genus, self.genus_uppercase, self.species, - self.sex, self.strain, self.chado_species_name, self.full_name, + params = [self.genus_species, self.strain_sex, self.genus_uppercase, + self.chado_species_name, self.full_name, self.species_folder_name, self.org_id, - self.history_id, self.instance, self.instance_url, self.email, self.password, + self.history_id, self.instance, self.genome_analysis_id, self.ogs_analysis_id, self.genome_hda_id, self.gff_hda_id, self.transcripts_hda_id, self.proteins_hda_id] - utilities.check_wf_param(self.full_name, params) + utilities_bioblend.check_wf_param(self.full_name, params) def check_param_for_workflow_blastp(self): - params = [self.genus_species, self.strain_sex, self.genus, self.genus_uppercase, self.species, - self.sex, self.strain, self.chado_species_name, self.full_name, + params = [self.genus_species, self.strain_sex, self.genus_uppercase, + self.chado_species_name, self.full_name, self.species_folder_name, self.org_id, - self.history_id, self.instance, self.instance_url, self.email, self.password, + self.history_id, self.instance, self.blastp_analysis_id, self.blastp_hda_id] - utilities.check_wf_param(self.full_name, params) + utilities_bioblend.check_wf_param(self.full_name, params) def check_param_for_workflow_interpro(self): - params = [self.genus_species, self.strain_sex, self.genus, self.genus_uppercase, self.species, - self.sex, self.strain, self.chado_species_name, self.full_name, + params = [self.genus_species, self.strain_sex, self.genus_uppercase, + self.chado_species_name, self.full_name, self.species_folder_name, self.org_id, - self.history_id, self.instance, self.instance_url, self.email, self.password, + self.history_id, self.instance, self.interpro_analysis_id, self.interproscan_hda_id] - utilities.check_wf_param(self.full_name, params) + utilities_bioblend.check_wf_param(self.full_name, params) class RunWorkflow(speciesData.SpeciesData): @@ -129,119 +119,64 @@ class RunWorkflow(speciesData.SpeciesData): self.blastx_hda_id = None self.interproscan_hda_id = None - def set_history(self): - """ - Create or set the working history to the current species one - - :return: - """ - try: - histories = self.instance.histories.get_histories(name=self.history_name) - self.history_id = histories[0]["id"] - logging.debug("History ID set for {0}: {1}".format(self.history_name, self.history_id)) - except IndexError: - logging.info("Creating history for %s" % self.history_name) - history = self.instance.histories.create_history(name=self.history_name) - self.history_id = history["id"] - logging.debug("History ID set for {0}: {1}".format(self.history_name, self.history_id)) - - return self.history_id - - def set_galaxy_instance(self): - """ - Test the connection to the galaxy instance for the current organism - Exit if we cannot connect to the instance - - """ - - logging.debug("Connecting to the galaxy instance (%s)" % self.instance_url) - self.instance = galaxy.GalaxyInstance(url=self.instance_url, - email=self.config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - password=self.config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] - ) - - try: - self.instance.histories.get_histories() - except bioblend.ConnectionError: - logging.critical("Cannot connect to galaxy instance (%s) " % self.instance_url) - sys.exit() - else: - logging.debug("Successfully connected to galaxy instance (%s) " % self.instance_url) - - return self.instance - def install_changesets_revisions_for_individual_tools(self): """ This function is used to verify that installed tools called outside workflows have the correct versions and changesets If it finds versions don't match, will install the correct version + changeset in the instance Doesn't do anything if versions match - + :return: """ logging.info("Validating installed individual tools versions and changesets") # Verify that the add_organism and add_analysis versions are correct in the instance - - add_organism_tool = self.instance.tools.show_tool(phaeo_constants.ADD_ORGANISM_TOOL_ID) - add_analysis_tool = self.instance.tools.show_tool(phaeo_constants.ADD_ANALYSIS_TOOL_ID) - get_organisms_tool = self.instance.tools.show_tool(phaeo_constants.GET_ORGANISMS_TOOL_ID) - get_analyses_tool = self.instance.tools.show_tool(phaeo_constants.GET_ANALYSES_TOOL_ID) - analysis_sync_tool = self.instance.tools.show_tool(phaeo_constants.ANALYSIS_SYNC_TOOL_ID) - organism_sync_tool = self.instance.tools.show_tool(phaeo_constants.ORGANISM_SYNC_TOOL_ID) - # changeset for 2.3.4+galaxy0 has to be manually found because there is no way to get the wanted changeset of a non installed tool via bioblend # except for workflows (.ga) that already contain the changeset revisions inside the steps ids - utilities.install_repository_revision(current_version=get_organisms_tool["version"], - toolshed_dict=get_organisms_tool["tool_shed_repository"], - version_to_install=phaeo_constants.GET_ORGANISMS_TOOL_VERSION, - changeset_revision=phaeo_constants.GET_ORGANISMS_TOOL_CHANGESET_REVISION, - instance=self.instance) - - utilities.install_repository_revision(current_version=get_analyses_tool["version"], - toolshed_dict=get_analyses_tool["tool_shed_repository"], - version_to_install=phaeo_constants.GET_ANALYSES_TOOL_VERSION, - changeset_revision=phaeo_constants.GET_ANALYSES_TOOL_CHANGESET_REVISION, - instance=self.instance) - - utilities.install_repository_revision(current_version=add_organism_tool["version"], - toolshed_dict=add_organism_tool["tool_shed_repository"], - version_to_install=phaeo_constants.ADD_ORGANISM_TOOL_VERSION, - changeset_revision=phaeo_constants.ADD_ORGANISM_TOOL_CHANGESET_REVISION, - instance=self.instance) - - utilities.install_repository_revision(current_version=add_analysis_tool["version"], - toolshed_dict=add_analysis_tool["tool_shed_repository"], - version_to_install=phaeo_constants.ADD_ANALYSIS_TOOL_VERSION, - changeset_revision=phaeo_constants.ADD_ANALYSIS_TOOL_CHANGESET_REVISION, - instance=self.instance) - - utilities.install_repository_revision(current_version=analysis_sync_tool["version"], - toolshed_dict=analysis_sync_tool["tool_shed_repository"], - version_to_install=phaeo_constants.ANALYSIS_SYNC_TOOL_VERSION, - changeset_revision=phaeo_constants.ANALYSIS_SYNC_TOOL_CHANGESET_REVISION, - instance=self.instance) - - utilities.install_repository_revision(current_version=organism_sync_tool["version"], - toolshed_dict=organism_sync_tool["tool_shed_repository"], - version_to_install=phaeo_constants.ORGANISM_SYNC_TOOL_VERSION, - changeset_revision=phaeo_constants.ORGANISM_SYNC_TOOL_CHANGESET_REVISION, - instance=self.instance) + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, + version=constants_phaeo.GET_ORGANISMS_TOOL_VERSION, + changeset_revision=constants_phaeo.GET_ORGANISMS_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, + version=constants_phaeo.GET_ANALYSES_TOOL_VERSION, + changeset_revision=constants_phaeo.GET_ANALYSES_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID, + version=constants_phaeo.ADD_ORGANISM_TOOL_VERSION, + changeset_revision=constants_phaeo.ADD_ORGANISM_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, + version=constants_phaeo.ADD_ANALYSIS_TOOL_VERSION, + changeset_revision=constants_phaeo.ADD_ANALYSIS_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, + version=constants_phaeo.ANALYSIS_SYNC_TOOL_VERSION, + changeset_revision=constants_phaeo.ANALYSIS_SYNC_TOOL_CHANGESET_REVISION, + instance=self.instance) + + utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID, + version=constants_phaeo.ORGANISM_SYNC_TOOL_VERSION, + changeset_revision=constants_phaeo.ORGANISM_SYNC_TOOL_CHANGESET_REVISION, + instance=self.instance) logging.info("Success: individual tools versions and changesets validated") def add_analysis(self, name, programversion, sourcename): - add_analysis_tool_dataset = utilities.run_tool_and_download_single_output_dataset( + add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, - tool_id=phaeo_constants.ADD_ANALYSIS_TOOL_ID, + tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, history_id=self.history_id, tool_inputs={"name": name, - "program": phaeo_constants.ADD_ANALYSIS_TOOL_PARAM_PROGRAM, + "program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM, "programversion": programversion, "sourcename": sourcename, - "date_executed": phaeo_constants.ADD_ANALYSIS_TOOL_PARAM_DATE}) + "date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE}) analysis_dict = json.loads(add_analysis_tool_dataset) analysis_id = str(analysis_dict["analysis_id"]) @@ -250,17 +185,17 @@ class RunWorkflow(speciesData.SpeciesData): def sync_analysis(self, analysis_id): time.sleep(60) - utilities.run_tool( + utilities_bioblend.run_tool( instance=self.instance, - tool_id=phaeo_constants.ANALYSIS_SYNC_TOOL_ID, + tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, history_id=self.history_id, tool_inputs={"analysis_id": analysis_id}) def add_organism_and_sync(self): - get_organisms_tool_dataset = utilities.run_tool_and_download_single_output_dataset( + get_organisms_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, - tool_id=phaeo_constants.GET_ORGANISMS_TOOL_ID, + tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, history_id=self.history_id, tool_inputs={}, time_sleep=10 @@ -275,9 +210,9 @@ class RunWorkflow(speciesData.SpeciesData): org_id = str(org_dict["organism_id"]) # id needs to be a str to be recognized by chado tools if org_id is None: - add_organism_tool_dataset = utilities.run_tool_and_download_single_output_dataset( + add_organism_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, - tool_id=phaeo_constants.ADD_ORGANISM_TOOL_ID, + tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID, history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus_uppercase, @@ -289,9 +224,9 @@ class RunWorkflow(speciesData.SpeciesData): # Synchronize newly added organism in Tripal logging.info("Synchronizing organism %s in Tripal" % self.full_name) time.sleep(60) - utilities.run_tool( + utilities_bioblend.run_tool( instance=self.instance, - tool_id=phaeo_constants.ORGANISM_SYNC_TOOL_ID, + tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID, history_id=self.history_id, tool_inputs={"organism_id": org_id}) @@ -299,9 +234,9 @@ class RunWorkflow(speciesData.SpeciesData): def get_analyses(self): - get_analyses_tool_dataset = utilities.run_tool_and_download_single_output_dataset( + get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, - tool_id=phaeo_constants.GET_ANALYSES_TOOL_ID, + tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, history_id=self.history_id, tool_inputs={}, time_sleep=10 @@ -590,10 +525,21 @@ class RunWorkflow(speciesData.SpeciesData): :return: """ - # Instanciate the instance - gio = GalaxyInstance(url=self.instance_url, - email=self.config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - password=self.config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD]) + genome_ldda_id = None + transcripts_ldda_id = None + proteins_ldda_id = None + gff_ldda_id = None + interpro_ldda_id = None + blastp_ldda_id = None + blastx_ldda_id = None + + genome_hda_id = None + gff_hda_id = None + transcripts_hda_id = None + proteins_hda_id = None + blastp_hda_id = None + blastx_hda_id = None + interproscan_hda_id = None folder_dict_list = self.instance.libraries.get_folders(library_id=str(self.library_id)) @@ -634,7 +580,6 @@ class RunWorkflow(speciesData.SpeciesData): blastx_ldda_id = ldda_id hda_list = self.instance.datasets.get_datasets(self.history_id) - genome_hda_id, gff_hda_id, transcripts_hda_id, proteins_hda_id, blastp_hda_id, interproscan_hda_id = None, None, None, None, None, None # Finding datasets in history (matching datasets names) for hda in hda_list: hda_name = hda["name"] @@ -703,7 +648,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): run_workflow_for_current_organism = RunWorkflow(parameters_dictionary=sp_dict) # Verifying the galaxy container is running - if not utilities.check_galaxy_state(network_name=run_workflow_for_current_organism.genus_species, + if not utilities_bioblend.check_galaxy_state(network_name=run_workflow_for_current_organism.genus_species, script_dir=run_workflow_for_current_organism.script_dir): logging.critical("The galaxy container for %s is not ready yet!" % run_workflow_for_current_organism.genus_species) sys.exit() @@ -723,11 +668,17 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): run_workflow_for_current_organism.config[constants.CONF_ALL_HTTP_PORT], run_workflow_for_current_organism.genus_species) - run_workflow_for_current_organism.set_galaxy_instance() - history_id = run_workflow_for_current_organism.set_history() + run_workflow_for_current_organism.instance = utilities_bioblend.get_galaxy_instance( + instance_url=run_workflow_for_current_organism.instance_url, + email=run_workflow_for_current_organism.config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], + password=run_workflow_for_current_organism.config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD], + ) + history_id = utilities_bioblend.get_history( + instance=run_workflow_for_current_organism.instance, + history_name=run_workflow_for_current_organism.history_name) run_workflow_for_current_organism.install_changesets_revisions_for_individual_tools() - if workflow_type == phaeo_constants.WF_LOAD_GFF_JB: + if workflow_type == constants_phaeo.WF_LOAD_GFF_JB: analyses_dict_list = run_workflow_for_current_organism.get_analyses() @@ -751,13 +702,9 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): sp_wf_param = StrainWorkflowParam( genus_species=run_workflow_for_current_organism.genus_species, strain_sex=run_workflow_for_current_organism.strain_sex, - genus=run_workflow_for_current_organism.genus, genus_uppercase = run_workflow_for_current_organism.genus_uppercase, - species=species, full_name=run_workflow_for_current_organism.full_name, species_folder_name=run_workflow_for_current_organism.species_folder_name, - sex=run_workflow_for_current_organism.sex, - strain=run_workflow_for_current_organism.strain, chado_species_name=run_workflow_for_current_organism.chado_species_name, org_id=org_id, genome_analysis_id=genome_analysis_id, @@ -770,10 +717,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): blastx_hda_id=run_workflow_for_current_organism.blastx_hda_id, interproscan_hda_id=run_workflow_for_current_organism.interproscan_hda_id, history_id=history_id, - instance=run_workflow_for_current_organism.instance, - instance_url=run_workflow_for_current_organism.instance_url, - email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] + instance=run_workflow_for_current_organism.instance ) sp_wf_param.check_param_for_workflow_load_fasta_gff_jbrowse() @@ -789,16 +733,12 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): sp_wf_param = StrainWorkflowParam( genus_species=run_workflow_for_current_organism.genus_species, strain_sex=run_workflow_for_current_organism.strain_sex, - genus=run_workflow_for_current_organism.genus, genus_uppercase = run_workflow_for_current_organism.genus_uppercase, - species=species, full_name=run_workflow_for_current_organism.full_name, species_folder_name=run_workflow_for_current_organism.species_folder_name, - sex=run_workflow_for_current_organism.sex, - strain=run_workflow_for_current_organism.strain, chado_species_name=run_workflow_for_current_organism.chado_species_name, org_id=org_id, - blastp_analysis_id=genome_analysis_id, + blastp_analysis_id=blastp_analysis_id, genome_hda_id=run_workflow_for_current_organism.genome_hda_id, gff_hda_id=run_workflow_for_current_organism.gff_hda_id, transcripts_hda_id=run_workflow_for_current_organism.transcripts_hda_id, @@ -807,10 +747,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): blastx_hda_id=run_workflow_for_current_organism.blastx_hda_id, interproscan_hda_id=run_workflow_for_current_organism.interproscan_hda_id, history_id=history_id, - instance=run_workflow_for_current_organism.instance, - instance_url=run_workflow_for_current_organism.instance_url, - email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] + instance=run_workflow_for_current_organism.instance ) sp_wf_param.check_param_for_workflow_blastp() @@ -826,13 +763,9 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): sp_wf_param = StrainWorkflowParam( genus_species=run_workflow_for_current_organism.genus_species, strain_sex=run_workflow_for_current_organism.strain_sex, - genus=run_workflow_for_current_organism.genus, genus_uppercase = run_workflow_for_current_organism.genus_uppercase, - species=species, full_name=run_workflow_for_current_organism.full_name, species_folder_name=run_workflow_for_current_organism.species_folder_name, - sex=run_workflow_for_current_organism.sex, - strain=run_workflow_for_current_organism.strain, chado_species_name=run_workflow_for_current_organism.chado_species_name, org_id=org_id, interpro_analysis_id=interpro_analysis_id, @@ -844,10 +777,7 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): blastx_hda_id=run_workflow_for_current_organism.blastx_hda_id, interproscan_hda_id=run_workflow_for_current_organism.interproscan_hda_id, history_id=history_id, - instance=run_workflow_for_current_organism.instance, - instance_url=run_workflow_for_current_organism.instance_url, - email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] + instance=run_workflow_for_current_organism.instance ) sp_wf_param.check_param_for_workflow_interpro() @@ -875,15 +805,12 @@ def install_changesets_revisions_from_workflow(instance, workflow_path): # Look up every "step_id" looking for tools for step in workflow_dict["steps"].values(): if step["tool_id"]: - # Get the descriptive dictionary of the installed tool (using the tool id in the workflow) - show_tool = instance.tools.show_tool(step["tool_id"]) # Check if an installed version matches the workflow tool version # (If it's not installed, the show_tool version returned will be a default version with the suffix "XXXX+0") - utilities.install_repository_revision(current_version=show_tool["version"], - toolshed_dict=show_tool["tool_shed_repository"], - version_to_install=step["tool_version"], - changeset_revision=step["tool_shed_repository"]["changeset_revision"], - instance=instance) + utilities_bioblend.install_repository_revision(tool_id=step["tool_id"], + version=step["tool_version"], + changeset_revision=step["tool_shed_repository"]["changeset_revision"], + instance=instance) logging.info("Tools versions and changeset_revisions from workflow validated") @@ -940,35 +867,35 @@ if __name__ == "__main__": if not args.workflow: logging.critical("No workflow type specified, exiting") sys.exit() - elif args.workflow in phaeo_constants.WORKFLOW_VALID_TYPES: + elif args.workflow in constants_phaeo.WORKFLOW_VALID_TYPES: workflow_type = args.workflow logging.info("Workflow type set to '%s'" % workflow_type) script_dir = os.path.dirname(os.path.realpath(sys.argv[0])) all_sp_workflow_dict = {} - if workflow_type == phaeo_constants.WF_LOAD_GFF_JB: + if workflow_type == constants_phaeo.WF_LOAD_GFF_JB: for sp_dict in sp_dict_list: # Add and retrieve all analyses/organisms for the current input species and add their IDs to the input dictionary - sp_workflow_param = get_sp_workflow_param( + sp_wf_param = get_sp_workflow_param( sp_dict, main_dir=main_dir, config=config, - workflow_type=phaeo_constants.WF_LOAD_GFF_JB) + workflow_type=constants_phaeo.WF_LOAD_GFF_JB) - current_sp_genus_species = sp_workflow_param.genus_species - current_sp_strain_sex = sp_workflow_param.strain_sex + current_sp_genus_species = sp_wf_param.genus_species + current_sp_strain_sex = sp_wf_param.strain_sex # Add the species dictionary to the complete dictionary # This dictionary contains every organism present in the input file # Its structure is the following: # {genus species: {strain1_sex1: {variables_key: variables_values}, strain1_sex2: {variables_key: variables_values}}} if not current_sp_genus_species in all_sp_workflow_dict.keys(): - all_sp_workflow_dict[current_sp_genus_species] = {current_sp_strain_sex: sp_workflow_param} + all_sp_workflow_dict[current_sp_genus_species] = {current_sp_strain_sex: sp_wf_param} else: if not current_sp_strain_sex in all_sp_workflow_dict[current_sp_genus_species].keys(): - all_sp_workflow_dict[current_sp_genus_species][current_sp_strain_sex] = sp_workflow_param + all_sp_workflow_dict[current_sp_genus_species][current_sp_strain_sex] = sp_wf_param else: logging.error("Duplicate organism with 'genus_species' = '{0}' and 'strain_sex' = '{1}'".format(current_sp_genus_species, current_sp_strain_sex)) @@ -979,38 +906,38 @@ if __name__ == "__main__": if strains_count == 1: logging.info("Input species %s: 1 strain detected in input dictionary" % species) strain_sex = list(strains.keys())[0] - sp_workflow_param = strains[strain_sex] + sp_wf_param = strains[strain_sex] # Set workflow path (1 organism) - workflow_path = os.path.join(os.path.abspath(script_dir), phaeo_constants.WORKFLOWS_PATH, phaeo_constants.WF_LOAD_GFF_JB_1_ORG_FILE) + workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_1_ORG_FILE) # Check if the versions of tools specified in the workflow are installed in galaxy - install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=sp_workflow_param.instance) + install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=sp_wf_param.instance) # Set the workflow parameters (individual tools runtime parameters in the workflow) workflow_parameters = {} # Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method) - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_FASTA] = { - "organism": sp_workflow_param.org_id, - "analysis_id": sp_workflow_param.genome_analysis_id, + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_FASTA] = { + "organism": sp_wf_param.org_id, + "analysis_id": sp_wf_param.genome_analysis_id, "do_update": "true"} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_GFF] = { - "organism": sp_workflow_param.org_id, - "analysis_id": sp_workflow_param.ogs_analysis_id} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_FEATURE_SYNC] = { - "organism_id": sp_workflow_param.org_id} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_POPULATE_VIEWS] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_INDEX] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_GFF] = { + "organism": sp_wf_param.org_id, + "analysis_id": sp_wf_param.ogs_analysis_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_FEATURE_SYNC] = { + "organism_id": sp_wf_param.org_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_POPULATE_VIEWS] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_INDEX] = {} # Set datamap (mapping of input files in the workflow) datamap = {} - datamap[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {"src": "hda", "id": sp_workflow_param.genome_hda_id} - datamap[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {"src": "hda", "id": sp_workflow_param.gff_hda_id} - datamap[phaeo_constants.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {"src": "hda", "id": sp_workflow_param.proteins_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {"src": "hda", "id": sp_wf_param.genome_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {"src": "hda", "id": sp_wf_param.gff_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {"src": "hda", "id": sp_wf_param.proteins_hda_id} with open(workflow_path, 'r') as ga_in_file: @@ -1026,115 +953,115 @@ if __name__ == "__main__": root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME]) else: root_url = config[constants.CONF_JBROWSE_MENU_URL] - species_strain_sex = sp_workflow_param.chado_species_name.replace(" ", "-") + species_strain_sex = sp_wf_param.chado_species_name.replace(" ", "-") jbrowse_menu_url = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( root_url=root_url, - genus_sp=sp_workflow_param.genus_species, - Genus=sp_workflow_param.genus_uppercase, + genus_sp=sp_wf_param.genus_species, + Genus=sp_wf_param.genus_uppercase, species_strain_sex=species_strain_sex, id="{id}") # Replace values in the workflow dictionary - jbrowse_tool_state = workflow_dict["steps"][phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"] + jbrowse_tool_state = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"] jbrowse_tool_state = jbrowse_tool_state.replace("__MENU_URL_ORG__", jbrowse_menu_url) - jb_to_container_tool_state = workflow_dict["steps"][phaeo_constants.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"] + jb_to_container_tool_state = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"] jb_to_container_tool_state = jb_to_container_tool_state\ - .replace("__DISPLAY_NAME_ORG__", sp_workflow_param.full_name)\ - .replace("__UNIQUE_ID_ORG__", sp_workflow_param.species_folder_name) + .replace("__DISPLAY_NAME_ORG__", sp_wf_param.full_name)\ + .replace("__UNIQUE_ID_ORG__", sp_wf_param.species_folder_name) # Import the workflow in galaxy as a dict - sp_workflow_param.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) + sp_wf_param.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) # Get its attributes - workflow_attributes = sp_workflow_param.instance.workflows.get_workflows(name=workflow_name) + workflow_attributes = sp_wf_param.instance.workflows.get_workflows(name=workflow_name) # Then get its ID (required to invoke the workflow) workflow_id = workflow_attributes[0]["id"] # Index 0 is the most recently imported workflow (the one we want) logging.debug("Workflow ID: %s" % workflow_id) # Check if the workflow is found try: - show_workflow = sp_workflow_param.instance.workflows.show_workflow(workflow_id=workflow_id) + show_workflow = sp_wf_param.instance.workflows.show_workflow(workflow_id=workflow_id) except bioblend.ConnectionError: logging.warning("Error finding workflow %s" % workflow_name) # Finally, invoke the workflow along with its datamap, parameters and the history in which to invoke it - sp_workflow_param.instance.workflows.invoke_workflow( + sp_wf_param.instance.workflows.invoke_workflow( workflow_id=workflow_id, - history_id=sp_workflow_param.history_id, + history_id=sp_wf_param.history_id, params=workflow_parameters, inputs=datamap, allow_tool_state_corrections=True) - logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance ({1}) for the jobs state".format(workflow_name, sp_workflow_param.instance_url)) + logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name)) if strains_count == 2: logging.info("Input organism %s: 2 species detected in input dictionary" % species) strain_sex_org1 = strains_list[0] strain_sex_org2 = strains_list[1] - sp_workflow_param_org1 = strains[strain_sex_org1] - sp_workflow_param_org2 = strains[strain_sex_org2] + sp_wf_param_org1 = strains[strain_sex_org1] + sp_wf_param_org2 = strains[strain_sex_org2] # Set workflow path (2 organisms) - workflow_path = os.path.join(os.path.abspath(script_dir), phaeo_constants.WORKFLOWS_PATH, phaeo_constants.WF_LOAD_GFF_JB_2_ORG_FILE) + workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_2_ORG_FILE) # Check if the versions of tools specified in the workflow are installed in galaxy - install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=sp_workflow_param_org1.instance) + install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=sp_wf_param_org1.instance) # Set the workflow parameters (individual tools runtime parameters in the workflow) workflow_parameters = {} # Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method) - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {} # Organism 1 - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG1] = { - "organism": sp_workflow_param_org1.org_id, - "analysis_id": sp_workflow_param_org1.genome_analysis_id, + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG1] = { + "organism": sp_wf_param_org1.org_id, + "analysis_id": sp_wf_param_org1.genome_analysis_id, "do_update": "true"} # workflow_parameters[JBROWSE_ORG1] = {"jbrowse_menu_url": jbrowse_menu_url_org1} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG1] = { - "organism": sp_workflow_param_org1.org_id, - "analysis_id": sp_workflow_param_org1.ogs_analysis_id} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG1] = { - "organism_id": sp_workflow_param_org1.org_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG1] = { + "organism": sp_wf_param_org1.org_id, + "analysis_id": sp_wf_param_org1.ogs_analysis_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG1] = { + "organism_id": sp_wf_param_org1.org_id} # workflow_parameters[JBROWSE_CONTAINER] = {"organisms": [{"name": org1_full_name, "unique_id": org1_species_folder_name, }, {"name": org2_full_name, "unique_id": org2_species_folder_name}]} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER] = {} # Organism 2 - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG2] = { - "organism": sp_workflow_param_org2.org_id, - "analysis_id": sp_workflow_param_org2.genome_analysis_id, + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG2] = { + "organism": sp_wf_param_org2.org_id, + "analysis_id": sp_wf_param_org2.genome_analysis_id, "do_update": "true"} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG2] = { - "organism": sp_workflow_param_org2.org_id, - "analysis_id": sp_workflow_param_org2.ogs_analysis_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG2] = { + "organism": sp_wf_param_org2.org_id, + "analysis_id": sp_wf_param_org2.ogs_analysis_id} # workflow_parameters[JRBOWSE_ORG2] = {"jbrowse_menu_url": jbrowse_menu_url_org2} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG2] = { - "organism_id": sp_workflow_param_org2.org_id} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG2] = { + "organism_id": sp_wf_param_org2.org_id} # POPULATE + INDEX DATA - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_POPULATE_VIEWS] = {} - workflow_parameters[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_INDEX] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_POPULATE_VIEWS] = {} + workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_INDEX] = {} # Set datamap (mapping of input files in the workflow) datamap = {} # Organism 1 - datamap[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {"src": "hda", "id": sp_workflow_param_org1.genome_hda_id} - datamap[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {"src": "hda", "id": sp_workflow_param_org1.gff_hda_id} - datamap[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {"src": "hda", "id": sp_workflow_param_org1.proteins_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {"src": "hda", "id": sp_wf_param_org1.genome_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {"src": "hda", "id": sp_wf_param_org1.gff_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {"src": "hda", "id": sp_wf_param_org1.proteins_hda_id} # Organism 2 - datamap[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {"src": "hda", "id": sp_workflow_param_org2.genome_hda_id} - datamap[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {"src": "hda", "id": sp_workflow_param_org2.gff_hda_id} - datamap[phaeo_constants.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {"src": "hda", "id": sp_workflow_param_org2.proteins_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {"src": "hda", "id": sp_wf_param_org2.genome_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {"src": "hda", "id": sp_wf_param_org2.gff_hda_id} + datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {"src": "hda", "id": sp_wf_param_org2.proteins_hda_id} with open(workflow_path, 'r') as ga_in_file: @@ -1150,67 +1077,67 @@ if __name__ == "__main__": root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME]) else: root_url = config[constants.CONF_JBROWSE_MENU_URL] - species_strain_sex_org1 = sp_workflow_param_org1.chado_species_name.replace(" ", "-") - species_strain_sex_org2 = sp_workflow_param_org2.chado_species_name.replace(" ", "-") + species_strain_sex_org1 = sp_wf_param_org1.chado_species_name.replace(" ", "-") + species_strain_sex_org2 = sp_wf_param_org2.chado_species_name.replace(" ", "-") jbrowse_menu_url_org1 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( root_url=root_url, - genus_sp=sp_workflow_param_org1.genus_species, - Genus=sp_workflow_param_org1.genus_uppercase, + genus_sp=sp_wf_param_org1.genus_species, + Genus=sp_wf_param_org1.genus_uppercase, species_strain_sex=species_strain_sex_org1, id="{id}") jbrowse_menu_url_org2 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( root_url=root_url, - genus_sp=sp_workflow_param_org2.genus_species, - Genus=sp_workflow_param_org2.genus_uppercase, + genus_sp=sp_wf_param_org2.genus_species, + Genus=sp_wf_param_org2.genus_uppercase, species_strain_sex=species_strain_sex_org2, id="{id}") # Replace values in the workflow dictionary - jbrowse_tool_state_org1 = workflow_dict["steps"][phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1]["tool_state"] + jbrowse_tool_state_org1 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1]["tool_state"] jbrowse_tool_state_org1 = jbrowse_tool_state_org1.replace("__MENU_URL_ORG1__", jbrowse_menu_url_org1) - jbrowse_tool_state_org2 = workflow_dict["steps"][phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2]["tool_state"] + jbrowse_tool_state_org2 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2]["tool_state"] jbrowse_tool_state_org2 = jbrowse_tool_state_org2.replace("__MENU_URL_ORG2__", jbrowse_menu_url_org2) # The UNIQUE_ID is specific to a combination genus_species_strain_sex so every combination should have its unique workflow # in galaxy --> define a naming method for these workflows - jb_to_container_tool_state = workflow_dict["steps"][phaeo_constants.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"] + jb_to_container_tool_state = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"] jb_to_container_tool_state = jb_to_container_tool_state\ - .replace("__DISPLAY_NAME_ORG1__", sp_workflow_param_org1.full_name)\ - .replace("__UNIQUE_ID_ORG1__", sp_workflow_param_org1.species_folder_name)\ - .replace("__DISPLAY_NAME_ORG2__", sp_workflow_param_org2.full_name)\ - .replace("__UNIQUE_ID_ORG2__", sp_workflow_param_org2.species_folder_name) + .replace("__DISPLAY_NAME_ORG1__", sp_wf_param_org1.full_name)\ + .replace("__UNIQUE_ID_ORG1__", sp_wf_param_org1.species_folder_name)\ + .replace("__DISPLAY_NAME_ORG2__", sp_wf_param_org2.full_name)\ + .replace("__UNIQUE_ID_ORG2__", sp_wf_param_org2.species_folder_name) # Import the workflow in galaxy as a dict - sp_workflow_param_org1.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) + sp_wf_param_org1.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) # Get its attributes - workflow_attributes = sp_workflow_param_org1.instance.workflows.get_workflows(name=workflow_name) + workflow_attributes = sp_wf_param_org1.instance.workflows.get_workflows(name=workflow_name) # Then get its ID (required to invoke the workflow) workflow_id = workflow_attributes[0]["id"] # Index 0 is the most recently imported workflow (the one we want) logging.debug("Workflow ID: %s" % workflow_id) # Check if the workflow is found try: - show_workflow = sp_workflow_param_org1.instance.workflows.show_workflow(workflow_id=workflow_id) + show_workflow = sp_wf_param_org1.instance.workflows.show_workflow(workflow_id=workflow_id) except bioblend.ConnectionError: logging.warning("Error finding workflow %s" % workflow_name) # Finally, invoke the workflow alogn with its datamap, parameters and the history in which to invoke it - sp_workflow_param_org1.instance.workflows.invoke_workflow( + sp_wf_param_org1.instance.workflows.invoke_workflow( workflow_id=workflow_id, - history_id=sp_workflow_param_org1.history_id, + history_id=sp_wf_param_org1.history_id, params=workflow_parameters, inputs=datamap, allow_tool_state_corrections=True) - logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance ({1}) for the jobs state".format(workflow_name, sp_workflow_param_org1.instance_url)) + logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name)) - if workflow_type == phaeo_constants.WORKFLOW_BLAST: + if workflow_type == constants_phaeo.WORKFLOW_BLAST: for sp_dict in sp_dict_list: # Add and retrieve all analyses/organisms for the current input species and add their IDs to the input dictionary - sp_workflow_param = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workflow_type=phaeo_constants.WORKFLOW_BLAST) + sp_wf_param = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workflow_type=constants_phaeo.WORKFLOW_BLAST) - current_sp_genus_species = list(sp_workflow_param.keys())[0] - current_sp_genus_species_dict = list(sp_workflow_param.values())[0] + current_sp_genus_species = list(sp_wf_param.keys())[0] + current_sp_genus_species_dict = list(sp_wf_param.values())[0] current_sp_strain_sex = list(current_sp_genus_species_dict.keys())[0] current_sp_strain_sex_attributes_dict = list(current_sp_genus_species_dict.values())[0] @@ -1477,10 +1404,10 @@ if __name__ == "__main__": for sp_dict in sp_dict_list: # Add and retrieve all analyses/organisms for the current input species and add their IDs to the input dictionary - sp_workflow_param = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workfow_type="blast") + sp_wf_param = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workfow_type="blast") - current_sp_genus_species = list(sp_workflow_param.keys())[0] - current_sp_genus_species_dict = list(sp_workflow_param.values())[0] + current_sp_genus_species = list(sp_wf_param.keys())[0] + current_sp_genus_species_dict = list(sp_wf_param.values())[0] current_sp_strain_sex = list(current_sp_genus_species_dict.keys())[0] current_sp_strain_sex_attributes_dict = list(current_sp_genus_species_dict.values())[0] diff --git a/utilities.py b/utilities.py index 14d8510..bc5100d 100755 --- a/utilities.py +++ b/utilities.py @@ -4,11 +4,7 @@ import yaml import logging import sys -import os -import subprocess -import bioblend import constants -import time def load_yaml(yaml_file): @@ -84,49 +80,6 @@ def no_empty_items(li): empty = False return empty -def check_wf_param(full_name, params): - - if not no_empty_items(params): - logging.critical( - "One empty workflow parameter found for organism {0}: {1})".format(full_name, params)) - sys.exit() - -def check_galaxy_state(network_name, script_dir): - """ - Read the logs of the galaxy container for the current species to check if the service is "ready" - """ - - # Run supervisorctl status in the galaxy container via serexec - # Change serexec permissions in repo - try: - os.chmod("%s/serexec" % script_dir, 0o0755) - except PermissionError: - logging.warning("serexec permissions incorrect in %s" % script_dir) - galaxy_logs = subprocess.run(["%s/serexec" % script_dir, "{0}_galaxy".format(network_name), - "supervisorctl", "status", "galaxy:"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if "galaxy:galaxy_web RUNNING" in str(galaxy_logs.stdout) \ - and "galaxy:handler0 RUNNING" in str(galaxy_logs.stdout) \ - and "galaxy:handler1 RUNNING" in str(galaxy_logs.stdout): - return 1 - else: - return 0 - - -def get_species_history_id(instance, full_name): - """ - Set and return the current species history id in its galaxy instance - - :param instance: - :param full_name: - :return: - """ - - histories = instance.histories.get_histories(name=str(full_name)) - history_id = histories[0]["id"] - show_history = instance.histories.show_history(history_id=history_id) - - return [history_id, show_history] - def get_gspecies_string_from_sp_dict(sp_dict): genus = sp_dict[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_GENUS] @@ -153,7 +106,6 @@ def get_unique_species_str_list(sp_dict_list): return unique_species_li - def get_unique_species_dict_list(sp_dict_list): """ Filter the species dictionary list to return only unique genus_species combinations @@ -195,46 +147,6 @@ def get_sp_picture(sp_dict_list): sp_picture_dict[gspecies] = sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH] return sp_picture_dict -def run_tool(instance, tool_id, history_id, tool_inputs): - - output_dict = None - try: - logging.debug("Running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs)) - output_dict = instance.tools.run_tool( - tool_id=tool_id, - history_id=history_id, - tool_inputs=tool_inputs) - except bioblend.ConnectionError: - logging.error("Unexpected HTTP response (bioblend.ConnectionError) when running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs)) - - return output_dict - -def run_tool_and_download_single_output_dataset(instance, tool_id, history_id, tool_inputs, time_sleep = 0): - - output_dict = run_tool(instance, tool_id, history_id, tool_inputs) - if not time_sleep is None: - time.sleep(time_sleep) - single_output_dataset_id = output_dict["outputs"][0]["id"] - dataset = instance.datasets.download_dataset(dataset_id=single_output_dataset_id) - - return dataset - -def install_repository_revision(current_version, toolshed_dict, version_to_install, changeset_revision, instance): - - if current_version != version_to_install: - name = toolshed_dict["name"] - owner = toolshed_dict["owner"] - toolshed = "https://" + toolshed_dict["tool_shed"] - logging.warning("Installing changeset revision {0} for {1}".format(changeset_revision, name)) - - instance.toolshed.install_repository_revision(tool_shed_url=toolshed, - name=name, - owner=owner, - changeset_revision=changeset_revision, - install_tool_dependencies=True, - install_repository_dependencies=False, - install_resolver_dependencies=True) - def create_org_param_dict_from_constants(): """ Create a dictionary of variables containing the keys needed to render the organisms.yml.j2 (NOT the values) diff --git a/utilities_bioblend.py b/utilities_bioblend.py new file mode 100644 index 0000000..99a47ad --- /dev/null +++ b/utilities_bioblend.py @@ -0,0 +1,139 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import logging +import sys +import os +import subprocess +import time +import bioblend +from bioblend import galaxy + +import utilities + +def get_galaxy_instance(instance_url, email, password): + """ + Test the connection to the galaxy instance for the current organism + Exit if we cannot connect to the instance + + """ + + logging.info("Connecting to the galaxy instance (%s)" % instance_url) + instance = galaxy.GalaxyInstance(url=instance_url, + email=email, + password=password) + + try: + instance.histories.get_histories() + except bioblend.ConnectionError: + logging.critical("Cannot connect to galaxy instance (%s) " % instance_url) + sys.exit() + else: + logging.info("Successfully connected to galaxy instance (%s) " % instance_url) + + return instance + +def get_history(instance, history_name): + """ + Create or set the working history to the current species one + + :return: + """ + try: + histories = instance.histories.get_histories(name=history_name) + history_id = histories[0]["id"] + logging.debug("History ID set for {0}: {1}".format(history_name, history_id)) + except IndexError: + logging.info("Creating history for %s" % history_name) + history = instance.histories.create_history(name=history_name) + history_id = history["id"] + logging.debug("History ID set for {0}: {1}".format(history_name, history_id)) + + return history_id + +def check_wf_param(full_name, params): + + if not utilities.no_empty_items(params): + logging.critical( + "One empty workflow parameter found for organism {0}: {1})".format(full_name, params)) + sys.exit() + +def check_galaxy_state(network_name, script_dir): + """ + Read the logs of the galaxy container for the current species to check if the service is "ready" + """ + + # Run supervisorctl status in the galaxy container via serexec + # Change serexec permissions in repo + try: + os.chmod("%s/serexec" % script_dir, 0o0755) + except PermissionError: + logging.warning("serexec permissions incorrect in %s" % script_dir) + galaxy_logs = subprocess.run(["%s/serexec" % script_dir, "{0}_galaxy".format(network_name), + "supervisorctl", "status", "galaxy:"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if "galaxy:galaxy_web RUNNING" in str(galaxy_logs.stdout) \ + and "galaxy:handler0 RUNNING" in str(galaxy_logs.stdout) \ + and "galaxy:handler1 RUNNING" in str(galaxy_logs.stdout): + return 1 + else: + return 0 + +def get_species_history_id(instance, full_name): + """ + Set and return the current species history id in its galaxy instance + + :param instance: + :param full_name: + :return: + """ + + histories = instance.histories.get_histories(name=str(full_name)) + history_id = histories[0]["id"] + show_history = instance.histories.show_history(history_id=history_id) + + return [history_id, show_history] + + +def run_tool(instance, tool_id, history_id, tool_inputs): + + output_dict = None + try: + logging.debug("Running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs)) + output_dict = instance.tools.run_tool( + tool_id=tool_id, + history_id=history_id, + tool_inputs=tool_inputs) + except bioblend.ConnectionError: + logging.error("Unexpected HTTP response (bioblend.ConnectionError) when running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs)) + + return output_dict + +def run_tool_and_download_single_output_dataset(instance, tool_id, history_id, tool_inputs, time_sleep = 0): + + output_dict = run_tool(instance, tool_id, history_id, tool_inputs) + if not time_sleep is None: + time.sleep(time_sleep) + single_output_dataset_id = output_dict["outputs"][0]["id"] + dataset = instance.datasets.download_dataset(dataset_id=single_output_dataset_id) + + return dataset + +def install_repository_revision(instance, tool_id, version, changeset_revision): + + tool_dict = instance.tools.show_tool(tool_id) + current_version = tool_dict["version"] + toolshed_dict = tool_dict["tool_shed_repository"] + + if current_version != version: + name = toolshed_dict["name"] + owner = toolshed_dict["owner"] + toolshed = "https://" + toolshed_dict["tool_shed"] + logging.warning("Installing changeset revision {0} for {1}".format(changeset_revision, name)) + + instance.toolshed.install_repository_revision(tool_shed_url=toolshed, + name=name, + owner=owner, + changeset_revision=changeset_revision, + install_tool_dependencies=True, + install_repository_dependencies=False, + install_resolver_dependencies=True) -- GitLab