#!/usr/bin/env python3 # -*- coding: utf-8 -*- import bioblend.galaxy.objects import argparse import os import logging import sys import json import time import utilities import utilities_bioblend import constants import constants_phaeo import runWorkflowPhaeo class OrgWorkflowParamJbrowse(runWorkflowPhaeo.OrgWorkflowParam): def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name, org_id, history_id, instance, genome_analysis_id=None, ogs_analysis_id=None, genome_hda_id=None, gff_hda_id=None, transcripts_hda_id=None, proteins_hda_id=None): self.genome_analysis_id = genome_analysis_id self.ogs_analysis_id = ogs_analysis_id self.genome_hda_id = genome_hda_id self.gff_hda_id = gff_hda_id self.transcripts_hda_id = transcripts_hda_id self.proteins_hda_id = proteins_hda_id super().__init__(genus_uppercase, chado_species_name, full_name, species_folder_name, org_id, history_id, instance) def check_param(self): params = [self.genus_uppercase, self.chado_species_name, self.full_name, self.species_folder_name, self.org_id, self.history_id, self.instance, self.genome_analysis_id, self.ogs_analysis_id, self.genome_hda_id, self.gff_hda_id, self.transcripts_hda_id, self.proteins_hda_id] utilities_bioblend.check_wf_param(self.full_name, params) class RunWorkflowJbrowse(runWorkflowPhaeo.RunWorkflow): """ Run a workflow into the galaxy instance's history of a given species This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow, provided the user creates their own workflow in a .ga format, and change the set_parameters function to have the correct parameters for their workflow """ def __init__(self, parameters_dictionary): super().__init__(parameters_dictionary) self.chado_species_name = " ".join(utilities.filter_empty_not_empty_items( [self.species, self.strain, self.sex])["not_empty"]) self.abbreviation = self.genus_uppercase[0] + ". " + self.chado_species_name self.common = self.name if not self.common_name is None and self.common_name != "": self.common = self.common_name self.genome_analysis_name = "genome v{0} of {1}".format(self.genome_version, self.full_name) self.genome_analysis_programversion = "genome v{0}".format(self.genome_version) self.genome_analysis_sourcename = self.full_name self.ogs_analysis_name = "OGS{0} of {1}".format(self.ogs_version, self.full_name) self.ogs_analysis_programversion = "OGS{0}".format(self.ogs_version) self.ogs_analysis_sourcename = self.full_name self.genome_hda_id = None self.gff_hda_id = None self.transcripts_hda_id = None self.proteins_hda_id = None def install_individual_tools(self): """ This function is used to verify that installed tools called outside workflows have the correct versions and changesets If it finds versions don't match, will install the correct version + changeset in the instance Doesn't do anything if versions match :return: """ logging.info("Validating installed individual tools versions and changesets") # Verify that the add_organism and add_analysis versions are correct in the instance # changeset for 2.3.4+galaxy0 has to be manually found because there is no way to get the wanted changeset of a non installed tool via bioblend # except for workflows (.ga) that already contain the changeset revisions inside the steps ids utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, version=constants_phaeo.GET_ORGANISMS_TOOL_VERSION, changeset_revision=constants_phaeo.GET_ORGANISMS_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, version=constants_phaeo.GET_ANALYSES_TOOL_VERSION, changeset_revision=constants_phaeo.GET_ANALYSES_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID, version=constants_phaeo.ADD_ORGANISM_TOOL_VERSION, changeset_revision=constants_phaeo.ADD_ORGANISM_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, version=constants_phaeo.ADD_ANALYSIS_TOOL_VERSION, changeset_revision=constants_phaeo.ADD_ANALYSIS_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, version=constants_phaeo.ANALYSIS_SYNC_TOOL_VERSION, changeset_revision=constants_phaeo.ANALYSIS_SYNC_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID, version=constants_phaeo.ORGANISM_SYNC_TOOL_VERSION, changeset_revision=constants_phaeo.ORGANISM_SYNC_TOOL_CHANGESET_REVISION, instance=self.instance) logging.info("Success: individual tools versions and changesets validated") def add_organism_and_sync(self): get_organisms_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, history_id=self.history_id, tool_inputs={}, time_sleep=10 ) organisms_dict_list = json.loads(get_organisms_tool_dataset) # Turn the dataset into a list for parsing org_id = None # Look up list of outputs (dictionaries) for org_dict in organisms_dict_list: if org_dict["genus"] == self.genus_uppercase and org_dict["species"] == self.chado_species_name: org_id = str(org_dict["organism_id"]) # id needs to be a str to be recognized by chado tools if org_id is None: add_organism_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID, history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus_uppercase, "species": self.chado_species_name, "common": self.common}) organism_dict = json.loads(add_organism_tool_dataset) org_id = str(organism_dict["organism_id"]) # id needs to be a str to be recognized by chado tools # Synchronize newly added organism in Tripal logging.info("Synchronizing organism %s in Tripal" % self.full_name) time.sleep(60) utilities_bioblend.run_tool( instance=self.instance, tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID, history_id=self.history_id, tool_inputs={"organism_id": org_id}) return org_id def import_datasets_into_history(self): """ Find datasets in a library, get their ID and import them into the current history if they are not already """ genome_ldda_id = None transcripts_ldda_id = None proteins_ldda_id = None gff_ldda_id = None genome_hda_id = None gff_hda_id = None transcripts_hda_id = None proteins_hda_id = None folder_dict_list = self.instance.libraries.get_folders(library_id=str(self.library_id)) folders_id_dict = {} # Loop over the folders in the library and map folders names to their IDs for folder_dict in folder_dict_list: folders_id_dict[folder_dict["name"]] = folder_dict["id"] # Iterating over the folders to find datasets and map datasets to their IDs for folder_name, folder_id in folders_id_dict.items(): if folder_name == "/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version): sub_folder_content = self.instance.folders.show_folder(folder_id=folder_id, contents=True) for value in sub_folder_content.values(): for e in value: if type(e) == dict: if e["name"].endswith(self.genome_filename): genome_ldda_id = e["ldda_id"] if folder_name == "/annotation/{0}/OGS{1}".format(self.species_folder_name, self.ogs_version): sub_folder_content = self.instance.folders.show_folder(folder_id=folder_id, contents=True) for value in sub_folder_content.values(): for e in value: if type(e) == dict: ldda_name = e["name"] ldda_id = e["ldda_id"] if ldda_name.endswith(self.transcripts_filename): transcripts_ldda_id = ldda_id elif ldda_name.endswith(self.proteins_filename): proteins_ldda_id = ldda_id elif ldda_name.endswith(self.gff_filename): gff_ldda_id = ldda_id hda_list = self.instance.datasets.get_datasets(self.history_id) # Finding datasets in history (matching datasets names) for hda in hda_list: hda_name = hda["name"] hda_id = hda["id"] if hda_name == self.genome_filename: genome_hda_id = hda_id if hda_name == self.gff_filename: gff_hda_id = hda_id if hda_name == self.transcripts_filename: transcripts_hda_id = hda_id if hda_name == self.proteins_filename : proteins_hda_id = hda_id # Import each dataset into history if it is not imported logging.debug("Uploading datasets into history %s" % self.history_id) if genome_hda_id is None: genome_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=genome_ldda_id) genome_hda_id = genome_dataset_upload["id"] if gff_hda_id is None: gff_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=gff_ldda_id) gff_hda_id = gff_dataset_upload["id"] if proteins_hda_id is None: proteins_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=proteins_ldda_id) proteins_hda_id = proteins_dataset_upload["id"] if transcripts_hda_id is None: transcripts_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=transcripts_ldda_id) transcripts_hda_id = transcripts_dataset_upload["id"] self.genome_hda_id = genome_hda_id self.gff_hda_id = gff_hda_id self.transcripts_hda_id = transcripts_hda_id self.proteins_hda_id = proteins_hda_id def prepare_history_and_get_wf_param(sp_dict_list, main_dir, config): all_org_wf_param_dict = {} for sp_dict in sp_dict_list: run_workflow_for_current_organism = RunWorkflowJbrowse(parameters_dictionary=sp_dict) # Verifying the galaxy container is running if not utilities_bioblend.check_galaxy_state(network_name=run_workflow_for_current_organism.genus_species, script_dir=run_workflow_for_current_organism.script_dir): logging.critical( "The galaxy container for %s is not ready yet!" % run_workflow_for_current_organism.genus_species) sys.exit() else: # Setting some of the instance attributes run_workflow_for_current_organism.main_dir = main_dir run_workflow_for_current_organism.set_galaxy_instance(config) run_workflow_for_current_organism.set_history() run_workflow_for_current_organism.install_individual_tools() run_workflow_for_current_organism.import_datasets_into_history() analyses_dict_list = run_workflow_for_current_organism.get_analyses() org_id = run_workflow_for_current_organism.add_organism_and_sync() genome_analysis_id = run_workflow_for_current_organism.add_analysis_and_sync( analyses_dict_list=analyses_dict_list, analysis_name=run_workflow_for_current_organism.genome_analysis_name, analysis_programversion=run_workflow_for_current_organism.genome_analysis_programversion, analysis_sourcename=run_workflow_for_current_organism.genome_analysis_sourcename ) ogs_analysis_id = run_workflow_for_current_organism.add_analysis_and_sync( analyses_dict_list=analyses_dict_list, analysis_name=run_workflow_for_current_organism.ogs_analysis_name, analysis_programversion=run_workflow_for_current_organism.ogs_analysis_programversion, analysis_sourcename=run_workflow_for_current_organism.ogs_analysis_sourcename ) # Create the StrainWorkflowParam object holding all attributes needed for the workflow org_wf_param = OrgWorkflowParamJbrowse( genus_uppercase=run_workflow_for_current_organism.genus_uppercase, full_name=run_workflow_for_current_organism.full_name, species_folder_name=run_workflow_for_current_organism.species_folder_name, chado_species_name=run_workflow_for_current_organism.chado_species_name, org_id=org_id, genome_analysis_id=genome_analysis_id, ogs_analysis_id=ogs_analysis_id, genome_hda_id=run_workflow_for_current_organism.genome_hda_id, gff_hda_id=run_workflow_for_current_organism.gff_hda_id, transcripts_hda_id=run_workflow_for_current_organism.transcripts_hda_id, proteins_hda_id=run_workflow_for_current_organism.proteins_hda_id, history_id=run_workflow_for_current_organism.history_id, instance=run_workflow_for_current_organism.instance ) org_wf_param.check_param() # Add the species dictionary to the complete dictionary # This dictionary contains every organism present in the input file # Its structure is the following: # {genus species: {strain1_sex1: {variables_key: variables_values}, strain1_sex2: {variables_key: variables_values}}} if not run_workflow_for_current_organism.genus_species in all_org_wf_param_dict.keys(): all_org_wf_param_dict[run_workflow_for_current_organism.genus_species] = { run_workflow_for_current_organism.strain_sex: org_wf_param} else: if not run_workflow_for_current_organism.strain_sex in all_org_wf_param_dict[ run_workflow_for_current_organism.genus_species].keys(): all_org_wf_param_dict[run_workflow_for_current_organism.genus_species][ run_workflow_for_current_organism.strain_sex] = org_wf_param else: logging.error("Duplicate organism with 'genus_species' = '{0}' and 'strain_sex' = '{1}'".format( run_workflow_for_current_organism.genus_species, run_workflow_for_current_organism.strain_sex)) return all_org_wf_param_dict if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run Galaxy workflows, specific to Phaeoexplorer data") parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_true") parser.add_argument("--config", type=str, help="Config path, default to the 'config' file inside the script repository") parser.add_argument("--main-directory", type=str, help="Where the stack containers will be located, defaults to working directory") args = parser.parse_args() bioblend_logger = logging.getLogger("bioblend") if args.verbose: logging.basicConfig(level=logging.DEBUG) bioblend_logger.setLevel(logging.DEBUG) else: logging.basicConfig(level=logging.INFO) bioblend_logger.setLevel(logging.INFO) # Parsing the config file if provided, using the default config otherwise if args.config: config_file = os.path.abspath(args.config) else: config_file = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), constants.DEFAULT_CONFIG) main_dir = None if not args.main_directory: main_dir = os.getcwd() else: main_dir = os.path.abspath(args.main_directory) config = utilities.parse_config(config_file) sp_dict_list = utilities.parse_input(args.input) script_dir = os.path.dirname(os.path.realpath(sys.argv[0])) all_org_wf_param_dict = prepare_history_and_get_wf_param( sp_dict_list=sp_dict_list, main_dir=main_dir, config=config) for genus_species, strains in all_org_wf_param_dict.items(): strains_list = list(strains.keys()) strains_count = len(strains_list) if strains_count == 1: logging.info("Input species %s: 1 strain detected in input dictionary" % genus_species) strain_sex = list(strains.keys())[0] org_wf_param = strains[strain_sex] # Set workflow path (1 organism) workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_1_ORG_FILE) # Check if the versions of tools specified in the workflow are installed in galaxy utilities_bioblend.install_workflow_tools(workflow_path=workflow_path, instance=org_wf_param.instance) # Set the workflow parameters (individual tools runtime parameters in the workflow) workflow_parameters = {} # Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method) workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_FASTA] = { "organism": org_wf_param.org_id, "analysis_id": org_wf_param.genome_analysis_id, "do_update": "true"} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_GFF] = { "organism": org_wf_param.org_id, "analysis_id": org_wf_param.ogs_analysis_id} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_FEATURE_SYNC] = { "organism_id": org_wf_param.org_id} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_POPULATE_VIEWS] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_INDEX] = {} # Set datamap (mapping of input files in the workflow) datamap = {} datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {"src": "hda", "id": org_wf_param.genome_hda_id} datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {"src": "hda", "id": org_wf_param.gff_hda_id} datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {"src": "hda", "id": org_wf_param.proteins_hda_id} with open(workflow_path, 'r') as ga_in_file: # Store the decoded json dictionary workflow_dict = json.load(ga_in_file) workflow_name = workflow_dict["name"] # For the Jbrowse tool, we unfortunately have to manually edit the parameters instead of setting them # as runtime values, using runtime parameters makes the tool throw an internal critical error ("replace not found" error) # Scratchgmod test: need "http" (or "https"), the hostname (+ port) if constants.CONF_JBROWSE_MENU_URL not in config.keys(): # default root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME]) else: root_url = config[constants.CONF_JBROWSE_MENU_URL] species_strain_sex = org_wf_param.chado_species_name.replace(" ", "-") jbrowse_menu_url = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( root_url=root_url, genus_sp=genus_species, Genus=org_wf_param.genus_uppercase, species_strain_sex=species_strain_sex, id="{id}") # Replace values in the workflow dictionary workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"] = \ workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"]\ .replace("__MENU_URL_ORG__", jbrowse_menu_url) workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"] = \ workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"]\ .replace("__DISPLAY_NAME_ORG__", org_wf_param.full_name)\ .replace("__UNIQUE_ID_ORG__", org_wf_param.species_folder_name) # Import the workflow in galaxy as a dict org_wf_param.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) # Get its attributes workflow_dict_list = org_wf_param.instance.workflows.get_workflows(name=workflow_name) # Then get its ID (required to invoke the workflow) workflow_id = workflow_dict_list[0]["id"] # Index 0 is the most recently imported workflow (the one we want) logging.debug("Workflow ID: %s" % workflow_id) # Check if the workflow is found try: show_workflow = org_wf_param.instance.workflows.show_workflow(workflow_id=workflow_id) except bioblend.ConnectionError: logging.warning("Error finding workflow %s" % workflow_name) # Finally, invoke the workflow along with its datamap, parameters and the history in which to invoke it org_wf_param.instance.workflows.invoke_workflow( workflow_id=workflow_id, history_id=org_wf_param.history_id, params=workflow_parameters, inputs=datamap, allow_tool_state_corrections=True) logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name)) if strains_count == 2: logging.info("Input organism %s: 2 species detected in input dictionary" % genus_species) strain_sex_org1 = strains_list[0] strain_sex_org2 = strains_list[1] sp_wf_param_org1 = strains[strain_sex_org1] sp_wf_param_org2 = strains[strain_sex_org2] # Set workflow path (2 organisms) workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_2_ORG_FILE) # Check if the versions of tools specified in the workflow are installed in galaxy utilities_bioblend.install_workflow_tools(workflow_path=workflow_path, instance=sp_wf_param_org1.instance) # Set the workflow parameters (individual tools runtime parameters in the workflow) workflow_parameters = {} # Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method) workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {} # Organism 1 workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG1] = { "organism": sp_wf_param_org1.org_id, "analysis_id": sp_wf_param_org1.genome_analysis_id, "do_update": "true"} # workflow_parameters[JBROWSE_ORG1] = {"jbrowse_menu_url": jbrowse_menu_url_org1} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG1] = { "organism": sp_wf_param_org1.org_id, "analysis_id": sp_wf_param_org1.ogs_analysis_id} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG1] = { "organism_id": sp_wf_param_org1.org_id} # workflow_parameters[JBROWSE_CONTAINER] = {"organisms": [{"name": org1_full_name, "unique_id": org1_species_folder_name, }, {"name": org2_full_name, "unique_id": org2_species_folder_name}]} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER] = {} # Organism 2 workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG2] = { "organism": sp_wf_param_org2.org_id, "analysis_id": sp_wf_param_org2.genome_analysis_id, "do_update": "true"} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG2] = { "organism": sp_wf_param_org2.org_id, "analysis_id": sp_wf_param_org2.ogs_analysis_id} # workflow_parameters[JRBOWSE_ORG2] = {"jbrowse_menu_url": jbrowse_menu_url_org2} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG2] = { "organism_id": sp_wf_param_org2.org_id} # POPULATE + INDEX DATA workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_POPULATE_VIEWS] = {} workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_INDEX] = {} # Set datamap (mapping of input files in the workflow) datamap = {} # Organism 1 datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {"src": "hda", "id": sp_wf_param_org1.genome_hda_id} datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {"src": "hda", "id": sp_wf_param_org1.gff_hda_id} datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {"src": "hda", "id": sp_wf_param_org1.proteins_hda_id} # Organism 2 datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {"src": "hda", "id": sp_wf_param_org2.genome_hda_id} datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {"src": "hda", "id": sp_wf_param_org2.gff_hda_id} datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {"src": "hda", "id": sp_wf_param_org2.proteins_hda_id} with open(workflow_path, 'r') as ga_in_file: # Store the decoded json dictionary workflow_dict = json.load(ga_in_file) workflow_name = workflow_dict["name"] # For the Jbrowse tool, we unfortunately have to manually edit the parameters instead of setting them # as runtime values, using runtime parameters makes the tool throw an internal critical error ("replace not found" error) # Scratchgmod test: need "http" (or "https"), the hostname (+ port) if constants.CONF_JBROWSE_MENU_URL not in config.keys(): # default root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME]) else: root_url = config[constants.CONF_JBROWSE_MENU_URL] species_strain_sex_org1 = sp_wf_param_org1.chado_species_name.replace(" ", "-") species_strain_sex_org2 = sp_wf_param_org2.chado_species_name.replace(" ", "-") jbrowse_menu_url_org1 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( root_url=root_url, genus_sp=genus_species, Genus=sp_wf_param_org1.genus_uppercase, species_strain_sex=species_strain_sex_org1, id="{id}") jbrowse_menu_url_org2 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format( root_url=root_url, genus_sp=genus_species, Genus=sp_wf_param_org2.genus_uppercase, species_strain_sex=species_strain_sex_org2, id="{id}") # Replace values in the workflow dictionary jbrowse_tool_state_org1 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1]["tool_state"] jbrowse_tool_state_org1 = jbrowse_tool_state_org1.replace("__MENU_URL_ORG1__", jbrowse_menu_url_org1) jbrowse_tool_state_org2 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2]["tool_state"] jbrowse_tool_state_org2 = jbrowse_tool_state_org2.replace("__MENU_URL_ORG2__", jbrowse_menu_url_org2) # The UNIQUE_ID is specific to a combination genus_species_strain_sex so every combination should have its unique workflow # in galaxy --> define a naming method for these workflows workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"] = \ workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"]\ .replace("__DISPLAY_NAME_ORG1__", sp_wf_param_org1.full_name)\ .replace("__UNIQUE_ID_ORG1__", sp_wf_param_org1.species_folder_name)\ .replace("__DISPLAY_NAME_ORG2__", sp_wf_param_org2.full_name)\ .replace("__UNIQUE_ID_ORG2__", sp_wf_param_org2.species_folder_name) # Import the workflow in galaxy as a dict sp_wf_param_org1.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) # Get its attributes workflow_dict_list = sp_wf_param_org1.instance.workflows.get_workflows(name=workflow_name) # Then get its ID (required to invoke the workflow) workflow_id = workflow_dict_list[0]["id"] # Index 0 is the most recently imported workflow (the one we want) logging.debug("Workflow ID: %s" % workflow_id) # Check if the workflow is found try: show_workflow = sp_wf_param_org1.instance.workflows.show_workflow(workflow_id=workflow_id) except bioblend.ConnectionError: logging.warning("Error finding workflow %s" % workflow_name) # Finally, invoke the workflow alogn with its datamap, parameters and the history in which to invoke it sp_wf_param_org1.instance.workflows.invoke_workflow( workflow_id=workflow_id, history_id=sp_wf_param_org1.history_id, params=workflow_parameters, inputs=datamap, allow_tool_state_corrections=True) logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name))