-
Loraine Gueguen authored6c86b6f9
gga_run_workflow_phaeo_jbrowse.py 33.37 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bioblend.galaxy.objects
import argparse
import os
import logging
import sys
import json
import time
import utilities
import utilities_bioblend
import constants
import constants_phaeo
import runWorkflowPhaeo
class OrgWorkflowParamJbrowse(runWorkflowPhaeo.OrgWorkflowParam):
def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name,
org_id, history_id, instance, genome_analysis_id=None, ogs_analysis_id=None,
genome_hda_id=None, gff_hda_id=None, transcripts_hda_id=None, proteins_hda_id=None):
self.genome_analysis_id = genome_analysis_id
self.ogs_analysis_id = ogs_analysis_id
self.genome_hda_id = genome_hda_id
self.gff_hda_id = gff_hda_id
self.transcripts_hda_id = transcripts_hda_id
self.proteins_hda_id = proteins_hda_id
super().__init__(genus_uppercase, chado_species_name, full_name, species_folder_name,
org_id, history_id, instance)
def check_param(self):
params = [self.genus_uppercase,
self.chado_species_name,
self.full_name,
self.species_folder_name,
self.org_id,
self.history_id,
self.instance,
self.genome_analysis_id,
self.ogs_analysis_id,
self.genome_hda_id,
self.gff_hda_id,
self.transcripts_hda_id,
self.proteins_hda_id]
utilities_bioblend.check_wf_param(self.full_name, params)
class RunWorkflowJbrowse(runWorkflowPhaeo.RunWorkflow):
"""
Run a workflow into the galaxy instance's history of a given species
This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow,
provided the user creates their own workflow in a .ga format, and change the set_parameters function
to have the correct parameters for their workflow
"""
def __init__(self, parameters_dictionary):
super().__init__(parameters_dictionary)
self.chado_species_name = " ".join(utilities.filter_empty_not_empty_items(
[self.species, self.strain, self.sex])["not_empty"])
self.abbreviation = self.genus_uppercase[0] + ". " + self.chado_species_name
self.common = self.name
if not self.common_name is None and self.common_name != "":
self.common = self.common_name
self.genome_analysis_name = "genome v{0} of {1}".format(self.genome_version, self.full_name)
self.genome_analysis_programversion = "genome v{0}".format(self.genome_version)
self.genome_analysis_sourcename = self.full_name
self.ogs_analysis_name = "OGS{0} of {1}".format(self.ogs_version, self.full_name)
self.ogs_analysis_programversion = "OGS{0}".format(self.ogs_version)
self.ogs_analysis_sourcename = self.full_name
self.genome_hda_id = None
self.gff_hda_id = None
self.transcripts_hda_id = None
self.proteins_hda_id = None
def install_individual_tools(self):
"""
This function is used to verify that installed tools called outside workflows have the correct versions and changesets
If it finds versions don't match, will install the correct version + changeset in the instance
Doesn't do anything if versions match
:return:
"""
logging.info("Validating installed individual tools versions and changesets")
# Verify that the add_organism and add_analysis versions are correct in the instance
# changeset for 2.3.4+galaxy0 has to be manually found because there is no way to get the wanted changeset of a non installed tool via bioblend
# except for workflows (.ga) that already contain the changeset revisions inside the steps ids
utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID,
version=constants_phaeo.GET_ORGANISMS_TOOL_VERSION,
changeset_revision=constants_phaeo.GET_ORGANISMS_TOOL_CHANGESET_REVISION,
instance=self.instance)
utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID,
version=constants_phaeo.GET_ANALYSES_TOOL_VERSION,
changeset_revision=constants_phaeo.GET_ANALYSES_TOOL_CHANGESET_REVISION,
instance=self.instance)
utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID,
version=constants_phaeo.ADD_ORGANISM_TOOL_VERSION,
changeset_revision=constants_phaeo.ADD_ORGANISM_TOOL_CHANGESET_REVISION,
instance=self.instance)
utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID,
version=constants_phaeo.ADD_ANALYSIS_TOOL_VERSION,
changeset_revision=constants_phaeo.ADD_ANALYSIS_TOOL_CHANGESET_REVISION,
instance=self.instance)
utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID,
version=constants_phaeo.ANALYSIS_SYNC_TOOL_VERSION,
changeset_revision=constants_phaeo.ANALYSIS_SYNC_TOOL_CHANGESET_REVISION,
instance=self.instance)
utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID,
version=constants_phaeo.ORGANISM_SYNC_TOOL_VERSION,
changeset_revision=constants_phaeo.ORGANISM_SYNC_TOOL_CHANGESET_REVISION,
instance=self.instance)
logging.info("Success: individual tools versions and changesets validated")
def add_organism_and_sync(self):
get_organisms_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID,
history_id=self.history_id,
tool_inputs={},
time_sleep=10
)
organisms_dict_list = json.loads(get_organisms_tool_dataset) # Turn the dataset into a list for parsing
org_id = None
# Look up list of outputs (dictionaries)
for org_dict in organisms_dict_list:
if org_dict["genus"] == self.genus_uppercase and org_dict["species"] == self.chado_species_name:
org_id = str(org_dict["organism_id"]) # id needs to be a str to be recognized by chado tools
if org_id is None:
add_organism_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID,
history_id=self.history_id,
tool_inputs={"abbr": self.abbreviation,
"genus": self.genus_uppercase,
"species": self.chado_species_name,
"common": self.common})
organism_dict = json.loads(add_organism_tool_dataset)
org_id = str(organism_dict["organism_id"]) # id needs to be a str to be recognized by chado tools
# Synchronize newly added organism in Tripal
logging.info("Synchronizing organism %s in Tripal" % self.full_name)
time.sleep(60)
utilities_bioblend.run_tool(
instance=self.instance,
tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID,
history_id=self.history_id,
tool_inputs={"organism_id": org_id})
return org_id
def import_datasets_into_history(self):
"""
Find datasets in a library, get their ID and import them into the current history if they are not already
"""
genome_ldda_id = None
transcripts_ldda_id = None
proteins_ldda_id = None
gff_ldda_id = None
genome_hda_id = None
gff_hda_id = None
transcripts_hda_id = None
proteins_hda_id = None
folder_dict_list = self.instance.libraries.get_folders(library_id=str(self.library_id))
folders_id_dict = {}
# Loop over the folders in the library and map folders names to their IDs
for folder_dict in folder_dict_list:
folders_id_dict[folder_dict["name"]] = folder_dict["id"]
# Iterating over the folders to find datasets and map datasets to their IDs
for folder_name, folder_id in folders_id_dict.items():
if folder_name == "/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version):
sub_folder_content = self.instance.folders.show_folder(folder_id=folder_id, contents=True)
for value in sub_folder_content.values():
for e in value:
if type(e) == dict:
if e["name"].endswith(self.genome_filename):
genome_ldda_id = e["ldda_id"]
if folder_name == "/annotation/{0}/OGS{1}".format(self.species_folder_name, self.ogs_version):
sub_folder_content = self.instance.folders.show_folder(folder_id=folder_id, contents=True)
for value in sub_folder_content.values():
for e in value:
if type(e) == dict:
ldda_name = e["name"]
ldda_id = e["ldda_id"]
if ldda_name.endswith(self.transcripts_filename):
transcripts_ldda_id = ldda_id
elif ldda_name.endswith(self.proteins_filename):
proteins_ldda_id = ldda_id
elif ldda_name.endswith(self.gff_filename):
gff_ldda_id = ldda_id
hda_list = self.instance.datasets.get_datasets(self.history_id)
# Finding datasets in history (matching datasets names)
for hda in hda_list:
hda_name = hda["name"]
hda_id = hda["id"]
if hda_name == self.genome_filename:
genome_hda_id = hda_id
if hda_name == self.gff_filename:
gff_hda_id = hda_id
if hda_name == self.transcripts_filename:
transcripts_hda_id = hda_id
if hda_name == self.proteins_filename :
proteins_hda_id = hda_id
# Import each dataset into history if it is not imported
logging.debug("Uploading datasets into history %s" % self.history_id)
if genome_hda_id is None:
genome_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=genome_ldda_id)
genome_hda_id = genome_dataset_upload["id"]
if gff_hda_id is None:
gff_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=gff_ldda_id)
gff_hda_id = gff_dataset_upload["id"]
if proteins_hda_id is None:
proteins_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=proteins_ldda_id)
proteins_hda_id = proteins_dataset_upload["id"]
if transcripts_hda_id is None:
transcripts_dataset_upload = self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=transcripts_ldda_id)
transcripts_hda_id = transcripts_dataset_upload["id"]
self.genome_hda_id = genome_hda_id
self.gff_hda_id = gff_hda_id
self.transcripts_hda_id = transcripts_hda_id
self.proteins_hda_id = proteins_hda_id
def prepare_history_and_get_wf_param(sp_dict_list, main_dir, config):
all_org_wf_param_dict = {}
for sp_dict in sp_dict_list:
run_workflow_for_current_organism = RunWorkflowJbrowse(parameters_dictionary=sp_dict)
# Verifying the galaxy container is running
if not utilities_bioblend.check_galaxy_state(network_name=run_workflow_for_current_organism.genus_species,
script_dir=run_workflow_for_current_organism.script_dir):
logging.critical(
"The galaxy container for %s is not ready yet!" % run_workflow_for_current_organism.genus_species)
sys.exit()
else:
# Setting some of the instance attributes
run_workflow_for_current_organism.main_dir = main_dir
run_workflow_for_current_organism.set_galaxy_instance(config)
run_workflow_for_current_organism.set_history()
run_workflow_for_current_organism.install_individual_tools()
run_workflow_for_current_organism.import_datasets_into_history()
analyses_dict_list = run_workflow_for_current_organism.get_analyses()
org_id = run_workflow_for_current_organism.add_organism_and_sync()
genome_analysis_id = run_workflow_for_current_organism.add_analysis_and_sync(
analyses_dict_list=analyses_dict_list,
analysis_name=run_workflow_for_current_organism.genome_analysis_name,
analysis_programversion=run_workflow_for_current_organism.genome_analysis_programversion,
analysis_sourcename=run_workflow_for_current_organism.genome_analysis_sourcename
)
ogs_analysis_id = run_workflow_for_current_organism.add_analysis_and_sync(
analyses_dict_list=analyses_dict_list,
analysis_name=run_workflow_for_current_organism.ogs_analysis_name,
analysis_programversion=run_workflow_for_current_organism.ogs_analysis_programversion,
analysis_sourcename=run_workflow_for_current_organism.ogs_analysis_sourcename
)
# Create the StrainWorkflowParam object holding all attributes needed for the workflow
org_wf_param = OrgWorkflowParamJbrowse(
genus_uppercase=run_workflow_for_current_organism.genus_uppercase,
full_name=run_workflow_for_current_organism.full_name,
species_folder_name=run_workflow_for_current_organism.species_folder_name,
chado_species_name=run_workflow_for_current_organism.chado_species_name,
org_id=org_id,
genome_analysis_id=genome_analysis_id,
ogs_analysis_id=ogs_analysis_id,
genome_hda_id=run_workflow_for_current_organism.genome_hda_id,
gff_hda_id=run_workflow_for_current_organism.gff_hda_id,
transcripts_hda_id=run_workflow_for_current_organism.transcripts_hda_id,
proteins_hda_id=run_workflow_for_current_organism.proteins_hda_id,
history_id=run_workflow_for_current_organism.history_id,
instance=run_workflow_for_current_organism.instance
)
org_wf_param.check_param()
# Add the species dictionary to the complete dictionary
# This dictionary contains every organism present in the input file
# Its structure is the following:
# {genus species: {strain1_sex1: {variables_key: variables_values}, strain1_sex2: {variables_key: variables_values}}}
if not run_workflow_for_current_organism.genus_species in all_org_wf_param_dict.keys():
all_org_wf_param_dict[run_workflow_for_current_organism.genus_species] = {
run_workflow_for_current_organism.strain_sex: org_wf_param}
else:
if not run_workflow_for_current_organism.strain_sex in all_org_wf_param_dict[
run_workflow_for_current_organism.genus_species].keys():
all_org_wf_param_dict[run_workflow_for_current_organism.genus_species][
run_workflow_for_current_organism.strain_sex] = org_wf_param
else:
logging.error("Duplicate organism with 'genus_species' = '{0}' and 'strain_sex' = '{1}'".format(
run_workflow_for_current_organism.genus_species, run_workflow_for_current_organism.strain_sex))
return all_org_wf_param_dict
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run Galaxy workflows, specific to Phaeoexplorer data")
parser.add_argument("input",
type=str,
help="Input file (yml)")
parser.add_argument("-v", "--verbose",
help="Increase output verbosity",
action="store_true")
parser.add_argument("--config",
type=str,
help="Config path, default to the 'config' file inside the script repository")
parser.add_argument("--main-directory",
type=str,
help="Where the stack containers will be located, defaults to working directory")
args = parser.parse_args()
bioblend_logger = logging.getLogger("bioblend")
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
bioblend_logger.setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
bioblend_logger.setLevel(logging.INFO)
# Parsing the config file if provided, using the default config otherwise
if args.config:
config_file = os.path.abspath(args.config)
else:
config_file = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), constants.DEFAULT_CONFIG)
main_dir = None
if not args.main_directory:
main_dir = os.getcwd()
else:
main_dir = os.path.abspath(args.main_directory)
config = utilities.parse_config(config_file)
sp_dict_list = utilities.parse_input(args.input)
script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
all_org_wf_param_dict = prepare_history_and_get_wf_param(
sp_dict_list=sp_dict_list,
main_dir=main_dir,
config=config)
for genus_species, strains in all_org_wf_param_dict.items():
strains_list = list(strains.keys())
strains_count = len(strains_list)
if strains_count == 1:
logging.info("Input species %s: 1 strain detected in input dictionary" % genus_species)
strain_sex = list(strains.keys())[0]
org_wf_param = strains[strain_sex]
# Set workflow path (1 organism)
workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_1_ORG_FILE)
# Check if the versions of tools specified in the workflow are installed in galaxy
utilities_bioblend.install_workflow_tools(workflow_path=workflow_path, instance=org_wf_param.instance)
# Set the workflow parameters (individual tools runtime parameters in the workflow)
workflow_parameters = {}
# Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method)
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_FASTA] = {
"organism": org_wf_param.org_id,
"analysis_id": org_wf_param.genome_analysis_id,
"do_update": "true"}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_LOAD_GFF] = {
"organism": org_wf_param.org_id,
"analysis_id": org_wf_param.ogs_analysis_id}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_FEATURE_SYNC] = {
"organism_id": org_wf_param.org_id}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_POPULATE_VIEWS] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_INDEX] = {}
# Set datamap (mapping of input files in the workflow)
datamap = {}
datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GENOME] = {"src": "hda", "id": org_wf_param.genome_hda_id}
datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_GFF] = {"src": "hda", "id": org_wf_param.gff_hda_id}
datamap[constants_phaeo.WF_LOAD_GFF_JB_1_ORG_INPUT_PROTEINS] = {"src": "hda", "id": org_wf_param.proteins_hda_id}
with open(workflow_path, 'r') as ga_in_file:
# Store the decoded json dictionary
workflow_dict = json.load(ga_in_file)
workflow_name = workflow_dict["name"]
# For the Jbrowse tool, we unfortunately have to manually edit the parameters instead of setting them
# as runtime values, using runtime parameters makes the tool throw an internal critical error ("replace not found" error)
# Scratchgmod test: need "http" (or "https"), the hostname (+ port)
if constants.CONF_JBROWSE_MENU_URL not in config.keys():
# default
root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME])
else:
root_url = config[constants.CONF_JBROWSE_MENU_URL]
species_strain_sex = org_wf_param.chado_species_name.replace(" ", "-")
jbrowse_menu_url = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format(
root_url=root_url,
genus_sp=genus_species,
Genus=org_wf_param.genus_uppercase,
species_strain_sex=species_strain_sex,
id="{id}")
# Replace values in the workflow dictionary
workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"] = \
workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JBROWSE]["tool_state"]\
.replace("__MENU_URL_ORG__", jbrowse_menu_url)
workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"] = \
workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_1_ORG_STEP_JB_TO_CONTAINER]["tool_state"]\
.replace("__DISPLAY_NAME_ORG__", org_wf_param.full_name)\
.replace("__UNIQUE_ID_ORG__", org_wf_param.species_folder_name)
# Import the workflow in galaxy as a dict
org_wf_param.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict)
# Get its attributes
workflow_dict_list = org_wf_param.instance.workflows.get_workflows(name=workflow_name)
# Then get its ID (required to invoke the workflow)
workflow_id = workflow_dict_list[0]["id"] # Index 0 is the most recently imported workflow (the one we want)
logging.debug("Workflow ID: %s" % workflow_id)
# Check if the workflow is found
try:
show_workflow = org_wf_param.instance.workflows.show_workflow(workflow_id=workflow_id)
except bioblend.ConnectionError:
logging.warning("Error finding workflow %s" % workflow_name)
# Finally, invoke the workflow along with its datamap, parameters and the history in which to invoke it
org_wf_param.instance.workflows.invoke_workflow(
workflow_id=workflow_id,
history_id=org_wf_param.history_id,
params=workflow_parameters,
inputs=datamap,
allow_tool_state_corrections=True)
logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name))
if strains_count == 2:
logging.info("Input organism %s: 2 species detected in input dictionary" % genus_species)
strain_sex_org1 = strains_list[0]
strain_sex_org2 = strains_list[1]
sp_wf_param_org1 = strains[strain_sex_org1]
sp_wf_param_org2 = strains[strain_sex_org2]
# Set workflow path (2 organisms)
workflow_path = os.path.join(os.path.abspath(script_dir), constants_phaeo.WORKFLOWS_PATH, constants_phaeo.WF_LOAD_GFF_JB_2_ORG_FILE)
# Check if the versions of tools specified in the workflow are installed in galaxy
utilities_bioblend.install_workflow_tools(workflow_path=workflow_path, instance=sp_wf_param_org1.instance)
# Set the workflow parameters (individual tools runtime parameters in the workflow)
workflow_parameters = {}
# Input files have no parameters (they are set via assigning the hda IDs in the datamap parameter of the bioblend method)
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {}
# Organism 1
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG1] = {
"organism": sp_wf_param_org1.org_id,
"analysis_id": sp_wf_param_org1.genome_analysis_id,
"do_update": "true"}
# workflow_parameters[JBROWSE_ORG1] = {"jbrowse_menu_url": jbrowse_menu_url_org1}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG1] = {
"organism": sp_wf_param_org1.org_id,
"analysis_id": sp_wf_param_org1.ogs_analysis_id}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG1] = {
"organism_id": sp_wf_param_org1.org_id}
# workflow_parameters[JBROWSE_CONTAINER] = {"organisms": [{"name": org1_full_name, "unique_id": org1_species_folder_name, }, {"name": org2_full_name, "unique_id": org2_species_folder_name}]}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER] = {}
# Organism 2
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_FASTA_ORG2] = {
"organism": sp_wf_param_org2.org_id,
"analysis_id": sp_wf_param_org2.genome_analysis_id,
"do_update": "true"}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_LOAD_GFF_ORG2] = {
"organism": sp_wf_param_org2.org_id,
"analysis_id": sp_wf_param_org2.ogs_analysis_id}
# workflow_parameters[JRBOWSE_ORG2] = {"jbrowse_menu_url": jbrowse_menu_url_org2}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_FEATURE_SYNC_ORG2] = {
"organism_id": sp_wf_param_org2.org_id}
# POPULATE + INDEX DATA
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_POPULATE_VIEWS] = {}
workflow_parameters[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_INDEX] = {}
# Set datamap (mapping of input files in the workflow)
datamap = {}
# Organism 1
datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG1] = {"src": "hda", "id": sp_wf_param_org1.genome_hda_id}
datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG1] = {"src": "hda", "id": sp_wf_param_org1.gff_hda_id}
datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG1] = {"src": "hda", "id": sp_wf_param_org1.proteins_hda_id}
# Organism 2
datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GENOME_ORG2] = {"src": "hda", "id": sp_wf_param_org2.genome_hda_id}
datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_GFF_ORG2] = {"src": "hda", "id": sp_wf_param_org2.gff_hda_id}
datamap[constants_phaeo.WF_LOAD_GFF_JB_2_ORG_INPUT_PROTEINS_ORG2] = {"src": "hda", "id": sp_wf_param_org2.proteins_hda_id}
with open(workflow_path, 'r') as ga_in_file:
# Store the decoded json dictionary
workflow_dict = json.load(ga_in_file)
workflow_name = workflow_dict["name"]
# For the Jbrowse tool, we unfortunately have to manually edit the parameters instead of setting them
# as runtime values, using runtime parameters makes the tool throw an internal critical error ("replace not found" error)
# Scratchgmod test: need "http" (or "https"), the hostname (+ port)
if constants.CONF_JBROWSE_MENU_URL not in config.keys():
# default
root_url = "https://{0}".format(config[constants.CONF_ALL_HOSTNAME])
else:
root_url = config[constants.CONF_JBROWSE_MENU_URL]
species_strain_sex_org1 = sp_wf_param_org1.chado_species_name.replace(" ", "-")
species_strain_sex_org2 = sp_wf_param_org2.chado_species_name.replace(" ", "-")
jbrowse_menu_url_org1 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format(
root_url=root_url,
genus_sp=genus_species,
Genus=sp_wf_param_org1.genus_uppercase,
species_strain_sex=species_strain_sex_org1,
id="{id}")
jbrowse_menu_url_org2 = "{root_url}/sp/{genus_sp}/feature/{Genus}/{species_strain_sex}/mRNA/{id}".format(
root_url=root_url,
genus_sp=genus_species,
Genus=sp_wf_param_org2.genus_uppercase,
species_strain_sex=species_strain_sex_org2,
id="{id}")
# Replace values in the workflow dictionary
jbrowse_tool_state_org1 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JBROWSE_ORG1]["tool_state"]
jbrowse_tool_state_org1 = jbrowse_tool_state_org1.replace("__MENU_URL_ORG1__", jbrowse_menu_url_org1)
jbrowse_tool_state_org2 = workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JRBOWSE_ORG2]["tool_state"]
jbrowse_tool_state_org2 = jbrowse_tool_state_org2.replace("__MENU_URL_ORG2__", jbrowse_menu_url_org2)
# The UNIQUE_ID is specific to a combination genus_species_strain_sex so every combination should have its unique workflow
# in galaxy --> define a naming method for these workflows
workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"] = \
workflow_dict["steps"][constants_phaeo.WF_LOAD_GFF_JB_2_ORG_STEP_JB_TO_CONTAINER]["tool_state"]\
.replace("__DISPLAY_NAME_ORG1__", sp_wf_param_org1.full_name)\
.replace("__UNIQUE_ID_ORG1__", sp_wf_param_org1.species_folder_name)\
.replace("__DISPLAY_NAME_ORG2__", sp_wf_param_org2.full_name)\
.replace("__UNIQUE_ID_ORG2__", sp_wf_param_org2.species_folder_name)
# Import the workflow in galaxy as a dict
sp_wf_param_org1.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict)
# Get its attributes
workflow_dict_list = sp_wf_param_org1.instance.workflows.get_workflows(name=workflow_name)
# Then get its ID (required to invoke the workflow)
workflow_id = workflow_dict_list[0]["id"] # Index 0 is the most recently imported workflow (the one we want)
logging.debug("Workflow ID: %s" % workflow_id)
# Check if the workflow is found
try:
show_workflow = sp_wf_param_org1.instance.workflows.show_workflow(workflow_id=workflow_id)
except bioblend.ConnectionError:
logging.warning("Error finding workflow %s" % workflow_name)
# Finally, invoke the workflow alogn with its datamap, parameters and the history in which to invoke it
sp_wf_param_org1.instance.workflows.invoke_workflow(
workflow_id=workflow_id,
history_id=sp_wf_param_org1.history_id,
params=workflow_parameters,
inputs=datamap,
allow_tool_state_corrections=True)
logging.info("Successfully imported and invoked workflow {0}, check the galaxy instance for the jobs state".format(workflow_name))