-
Arthur Le Bars authored
jinja templating, more workflow changes, trying fixes for the jbrowse gmod tool using API (in progress)
dc2e8f3a
run_workflow_phaeoexplorer.py 29.82 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bioblend
import bioblend.galaxy.objects
import argparse
import os
import logging
import sys
import json
import time
import utilities
import speciesData
from bioblend.galaxy.objects import GalaxyInstance
from bioblend import galaxy
"""
gga_init.py
Usage: $ python3 gga_init.py -i input_example.yml --config [config file] [OPTIONS]
"""
class RunWorkflow(speciesData.SpeciesData):
"""
Run a workflow into the galaxy instance's history of a given species
This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow,
provided the user creates their own workflow in a .ga format, and change the set_parameters function
to have the correct parameters for their workflow (TODO: use a mapping file for parameters and the .ga file)
"""
def set_get_history(self):
"""
Create or set the working history to the current species one
TODO move to utilities
:return:
"""
try:
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.info("History for {0}: {1}".format(self.full_name, self.history_id))
except IndexError:
logging.info("Creating history for %s" % self.full_name)
self.instance.histories.create_history(name=str(self.full_name))
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.info("History for {0}: {1}".format(self.full_name, self.history_id))
return self.history_id
def get_instance_attributes(self):
"""
retrieves instance attributes:
- working history ID
- libraries ID (there should only be one library!)
- datasets IDs
:return:
"""
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.debug("history ID: " + self.history_id)
libraries = self.instance.libraries.get_libraries() # normally only one library
self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library
logging.debug("library ID: " + self.history_id)
instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id)
# Access folders via their absolute path
genome_folder = self.instance.libraries.get_folders(library_id=self.library_id, name="/genome/" + str(self.species_folder_name) + "/v" + str(self.genome_version))
annotation_folder = self.instance.libraries.get_folders(library_id=self.library_id, name="/annotation/" + str(self.species_folder_name) + "/OGS" + str(self.ogs_version))
# Get their IDs
genome_folder_id = genome_folder[0]["id"]
annotation_folder_id = annotation_folder[0]["id"]
# Get the content of the folders
genome_folder_content = self.instance.folders.show_folder(folder_id=genome_folder_id, contents=True)
annotation_folder_content = self.instance.folders.show_folder(folder_id=annotation_folder_id, contents=True)
# Find genome folder datasets
genome_fasta_ldda_id = genome_folder_content["folder_contents"][0]["ldda_id"]
annotation_gff_ldda_id, annotation_proteins_ldda_id, annotation_transcripts_ldda_id = None, None, None
# Several dicts in the annotation folder content (one dict = one file)
for k, v in annotation_folder_content.items():
if k == "folder_contents":
for d in v:
if "proteins" in d["name"]:
annotation_proteins_ldda_id = d["ldda_id"]
if "transcripts" in d["name"]:
annotation_transcripts_ldda_id = d["ldda_id"]
if ".gff" in d["name"]:
annotation_gff_ldda_id = d["ldda_id"]
self.datasets["genome_file"] = genome_fasta_ldda_id
self.datasets["gff_file"] = annotation_gff_ldda_id
self.datasets["proteins_file"] = annotation_proteins_ldda_id
self.datasets["transcripts_file"] = annotation_transcripts_ldda_id
return {"history_id": self.history_id, "library_id": self.library_id, "datasets": self.datasets}
def connect_to_instance(self):
"""
Test the connection to the galaxy instance for the current organism
Exit if we cannot connect to the instance
"""
self.instance = galaxy.GalaxyInstance(url=self.instance_url,
email=self.config["galaxy_default_admin_email"],
password=self.config["galaxy_default_admin_password"]
)
logging.info("Connecting to the galaxy instance %s" % self.instance_url)
try:
self.instance.histories.get_histories()
except bioblend.ConnectionError:
logging.critical("Cannot connect to galaxy instance %s" % self.instance_url)
sys.exit()
else:
logging.info("Successfully connected to galaxy instance %s" % self.instance_url)
def prepare_history(self):
"""
Galaxy instance startup in preparation for importing datasets and running a workflow
- Add organism and analyses into the chado database
- Get any other existing organisms IDs before updating the galaxy instance --> separate
Calling this function is mandatory to have a working galaxy instance history
:return:
"""
self.connect_to_instance()
histories = self.instance.histories.get_histories(name=str(self.full_name))
# Add organism (species) to chado
logging.info("Adding organism to the instance's chado database")
if self.common == "" or self.common is None:
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.3",
history_id=self.history_id,
tool_inputs={"abbr": self.abbreviation,
"genus": self.genus_uppercase,
"species": self.chado_species_name,
"common": self.abbreviation})
else:
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.3",
history_id=self.history_id,
tool_inputs={"abbr": self.abbreviation,
"genus": self.genus_uppercase,
"species": self.chado_species_name,
"common": self.common})
# Add OGS analysis to chado
logging.info("Adding OGS analysis to the instance's chado database")
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.3",
history_id=self.history_id,
tool_inputs={"name": self.full_name_lowercase + " OGS" + self.ogs_version,
"program": "Performed by Genoscope",
"programversion": str(self.sex + " OGS" + self.ogs_version),
"sourcename": "Genoscope",
"date_executed": self.date})
# Add genome analysis to chado
logging.info("Adding genome analysis to the instance's chado database")
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2",
history_id=self.history_id,
tool_inputs={"name": self.full_name_lowercase + " genome v" + self.genome_version,
"program": "Performed by Genoscope",
"programversion": str(self.sex + "genome v" + self.genome_version),
"sourcename": "Genoscope",
"date_executed": self.date})
# Also get the organism and analyses IDs
self.get_organism_and_analyses_ids()
logging.info("Finished initializing instance")
def run_workflow(self, workflow_path, workflow_parameters, workflow_name, datamap):
"""
Run a workflow in galaxy
Requires the .ga file to be loaded as a dictionary (optionally could be uploaded as a raw file)
:param workflow_name:
:param workflow_parameters:
:param datamap:
:return:
"""
logging.info("Importing workflow: " + str(workflow_path))
workflow_ga_file = workflow_path
with open(workflow_ga_file, 'r') as ga_in_file:
# Store the decoded json dictionary
workflow_dict = json.load(ga_in_file)
self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict)
workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name)
workflow_id = workflow_attributes[0]["id"]
show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id)
try:
logging.debug("Workflow ID: %s" % workflow_id)
except Exception:
logging.warning("Error retrieving workflow attributes for workflow %s" % workflow_name)
self.instance.workflows.invoke_workflow(workflow_id=workflow_id,
history_id=self.history_id,
params=workflow_parameters,
inputs=datamap,
inputs_by="",
allow_tool_state_corrections=True)
logging.info("Successfully imported and invoked workflow {0}, check your galaxy instance ({1}) for the jobs state".format(workflow_name, self.instance_url))
def import_datasets_into_history(self):
"""
Find datasets in a library, get their ID and import them into the current history if they are not already
:return:
"""
# Instanciate the instance
gio = GalaxyInstance(url=self.instance_url,
email=self.config["galaxy_default_admin_email"],
password=self.config["galaxy_default_admin_password"])
prj_lib = gio.libraries.get_previews(name="Project Data")
self.library_id = prj_lib[0].id
instance_source_data_folders = self.instance.libraries.get_folders(library_id=str(self.library_id))
folders_ids = {}
current_folder_name = ""
# Loop over the folders in the library and map folders names to their IDs
for i in instance_source_data_folders:
for k, v in i.items():
if k == "name":
folders_ids[v] = 0
current_folder_name = v
if k == "id":
folders_ids[current_folder_name] = v
# Iterating over the folders to find datasets and map datasets to their IDs
logging.info("Datasets IDs: ")
for k, v in folders_ids.items():
if k == "/genome":
sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
final_sub_folder_content = self.instance.folders.show_folder(folder_id=sub_folder_content["folder_contents"][0]["id"], contents=True)
for k2, v2 in final_sub_folder_content.items():
for e in v2:
if type(e) == dict:
if e["name"].endswith(".fa"):
self.datasets["genome_file"] = e["ldda_id"]
logging.info("\t" + e["name"] + ": " + e["ldda_id"])
if k == "/annotation":
sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
final_sub_folder_content = self.instance.folders.show_folder(folder_id=sub_folder_content["folder_contents"][0]["id"], contents=True)
for k2, v2 in final_sub_folder_content.items():
for e in v2:
if type(e) == dict:
# TODO: manage versions? (differentiate between the correct folders using self.config)
if "transcripts" in e["name"]:
self.datasets["transcripts_file"] = e["ldda_id"]
logging.info("\t" + e["name"] + ": " + e["ldda_id"])
elif "proteins" in e["name"]:
self.datasets["proteins_file"] = e["ldda_id"]
logging.info("\t" + e["name"] + ": " + e["ldda_id"])
elif "gff" in e["name"]:
self.datasets["gff_file"] = e["ldda_id"]
logging.info("\t" + e["name"] + ": " + e["ldda_id"])
logging.info("Uploading datasets into history %s" % self.history_id)
self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"])
self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"])
self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"])
self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"])
return {"history_id": self.history_id, "library_id": self.library_id, "datasets": self.datasets}
def get_organism_and_analyses_ids(self):
"""
Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado
doesn't accept organism/analyses names as valid inputs
WARNING: It is mandatory to call this function before invoking a workflow
:return:
"""
# Get the ID for the current organism in chado
org = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.3",
history_id=self.history_id,
tool_inputs={"abbr": self.abbreviation,
"genus": self.genus_uppercase,
"species": self.chado_species_name,
"common": self.common})
time.sleep(3)
# Run tool again (sometimes the tool doesn't return anything despite the organism already being in the db)
org = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.3",
history_id=self.history_id,
tool_inputs={"abbr": self.abbreviation,
"genus": self.genus_uppercase,
"species": self.chado_species_name,
"common": self.common})
time.sleep(10)
org_job_out = org["outputs"][0]["id"]
org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out)
try:
org_output = json.loads(org_json_output)[0]
self.org_id = str(org_output["organism_id"]) # id needs to be a str to be recognized by chado tools
except IndexError:
logging.debug("No organism matching " + self.full_name + " exists in the instance's chado database")
# Get the ID for the OGS analysis in chado
ogs_analysis = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.3",
history_id=self.history_id,
tool_inputs={"name": self.full_name_lowercase + " OGS" + self.ogs_version})
ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"]
ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out)
try:
ogs_analysis_output = json.loads(ogs_analysis_json_output)[0]
self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"])
except IndexError:
logging.debug("no matching OGS analysis exists in the instance's chado database")
# Get the ID for the genome analysis in chado
genome_analysis = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.3",
history_id=self.history_id,
tool_inputs={"name": self.full_name_lowercase + " genome v" + self.genome_version})
genome_analysis_job_out = genome_analysis["outputs"][0]["id"]
genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out)
try:
genome_analysis_output = json.loads(genome_analysis_json_output)[0]
self.genome_analysis_id = str(genome_analysis_output["analysis_id"])
except IndexError:
logging.debug("no matching genome analysis exists in the instance's chado database")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
"with galaxy instances for GGA"
", following the protocol @ "
"http://gitlab.sb-roscoff.fr/abims/e-infra/gga")
parser.add_argument("input",
type=str,
help="Input file (yml)")
parser.add_argument("-v", "--verbose",
help="Increase output verbosity",
action="store_true")
parser.add_argument("--config",
type=str,
help="Config path, default to the 'config' file inside the script repository")
parser.add_argument("--main-directory",
type=str,
help="Where the stack containers will be located, defaults to working directory")
parser.add_argument("--workflow", "-w",
type=str,
help="Worfklow to run")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
logging.getLogger("bioblend").setLevel(logging.INFO)
# Parsing the config file if provided, using the default config otherwise
if not args.config:
args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config")
else:
args.config = os.path.abspath(args.config)
if not args.main_directory:
args.main_directory = os.getcwd()
else:
args.main_directory = os.path.abspath(args.main_directory)
sp_dict_list = utilities.parse_input(args.input)
for sp_dict in sp_dict_list:
# Creating an instance of the RunWorkflow object for the current organism
run_workflow_for_current_organism = RunWorkflow(parameters_dictionary=sp_dict)
# Checking if user specified a workflow to run
if not args.workflow:
logging.critical("No workflow specified, exiting")
sys.exit()
else:
workflow = os.path.abspath(args.workflow)
# Verifying the galaxy container is running
if utilities.check_galaxy_state(genus_lowercase=run_workflow_for_current_organism.genus_lowercase,
species=run_workflow_for_current_organism.species,
script_dir=run_workflow_for_current_organism.script_dir):
# Starting
logging.info("run_workflow.py called for %s" % run_workflow_for_current_organism.full_name)
# Setting some of the instance attributes
run_workflow_for_current_organism.main_dir = args.main_directory
run_workflow_for_current_organism.species_dir = os.path.join(run_workflow_for_current_organism.main_dir,
run_workflow_for_current_organism.genus_species +
"/")
# Parse the config yaml file
run_workflow_for_current_organism.config = utilities.parse_config(args.config)
# Set the instance url attribute
for env_variable, value in run_workflow_for_current_organism.config.items():
if env_variable == "hostname":
run_workflow_for_current_organism.instance_url = "http://{0}:8888/sp/{1}/galaxy/".format(
value, run_workflow_for_current_organism.genus_species)
break
else:
run_workflow_for_current_organism.instance_url = "http://localhost:8888/sp/{0}_{1}/galaxy/".format(
run_workflow_for_current_organism.genus_lowercase,
run_workflow_for_current_organism.species)
# TODO: Create distinct methods to call different pre-set workflows using CL arguments/config options (i.e load-chado, jbrowse, functional-annotation, orthology, ...)
# If input workflow is Chado_load_Tripal_synchronize.ga
if "Chado_load_Tripal_synchronize" in str(workflow):
logging.info("Executing workflow 'Chado_load_Tripal_synchronize'")
run_workflow_for_current_organism.connect_to_instance()
run_workflow_for_current_organism.set_get_history()
# run_workflow_for_current_organism.get_species_history_id()
# Prepare the instance+history for the current organism (add organism and analyses in Chado) TODO: add argument "setup"
# (although it should pose no problem as the "Chado add" refuses to duplicate an analysis/organism anyway)
run_workflow_for_current_organism.prepare_history()
# Get the attributes of the instance and project data files
run_workflow_for_current_organism.get_instance_attributes()
run_workflow_for_current_organism.get_organism_and_analyses_ids()
# Import datasets into history
# TODO: it seems it is not required anymore since using "ldda" option for datasets in the workflow datamap doesn't need files from history
run_workflow_for_current_organism.import_datasets_into_history()
# Explicit workflow parameter names
GENOME_FASTA_FILE = "0"
GFF_FILE = "1"
PROTEINS_FASTA_FILE = "2"
TRANSCRIPTS_FASTA_FILE = "3"
LOAD_FASTA_IN_CHADO = "4"
LOAD_GFF_IN_CHADO = "5"
SYNC_ORGANISM_INTO_TRIPAL = "6"
SYNC_GENOME_ANALYSIS_INTO_TRIPAL = "7"
SYNC_OGS_ANALYSIS_INTO_TRIPAL = "8"
SYNC_FEATURES_INTO_TRIPAL = "9"
workflow_parameters = {}
workflow_parameters[GENOME_FASTA_FILE] = {}
workflow_parameters[GFF_FILE] = {}
workflow_parameters[PROTEINS_FASTA_FILE] = {}
workflow_parameters[TRANSCRIPTS_FASTA_FILE] = {}
workflow_parameters[LOAD_FASTA_IN_CHADO] = {"organism": run_workflow_for_current_organism.org_id,
"analysis_id": run_workflow_for_current_organism.genome_analysis_id,
"do_update": "true"}
# Change "do_update": "true" to "do_update": "false" in above parameters to prevent appending/updates to the fasta file in chado
# WARNING: It is safer to never update it and just change the genome/ogs versions in the config
workflow_parameters[LOAD_GFF_IN_CHADO] = {"organism": run_workflow_for_current_organism.org_id,
"analysis_id": run_workflow_for_current_organism.ogs_analysis_id}
workflow_parameters[SYNC_ORGANISM_INTO_TRIPAL] = {"organism_id": run_workflow_for_current_organism.org_id}
workflow_parameters[SYNC_GENOME_ANALYSIS_INTO_TRIPAL] = {"analysis_id": run_workflow_for_current_organism.ogs_analysis_id}
workflow_parameters[SYNC_OGS_ANALYSIS_INTO_TRIPAL] = {"analysis_id": run_workflow_for_current_organism.genome_analysis_id}
workflow_parameters[SYNC_FEATURES_INTO_TRIPAL] = {"organism_id": run_workflow_for_current_organism.org_id}
# Datamap for input datasets - dataset source (type): ldda (LibraryDatasetDatasetAssociation)
run_workflow_for_current_organism.datamap = {}
run_workflow_for_current_organism.datamap[GENOME_FASTA_FILE] = {"src": "ldda", "id": run_workflow_for_current_organism.datasets["genome_file"]}
run_workflow_for_current_organism.datamap[GFF_FILE] = {"src": "ldda", "id": run_workflow_for_current_organism.datasets["gff_file"]}
run_workflow_for_current_organism.datamap[PROTEINS_FASTA_FILE] = {"src": "ldda", "id": run_workflow_for_current_organism.datasets["proteins_file"]}
run_workflow_for_current_organism.datamap[TRANSCRIPTS_FASTA_FILE] = {"src": "ldda", "id": run_workflow_for_current_organism.datasets["transcripts_file"]}
run_workflow_for_current_organism.datamap = {}
run_workflow_for_current_organism.datamap[GENOME_FASTA_FILE] = {"src": "ldda", "id":
run_workflow_for_current_organism.datasets["genome_file"]}
run_workflow_for_current_organism.datamap[GFF_FILE] = {"src": "ldda",
"id": run_workflow_for_current_organism.datasets[
"gff_file"]}
# Run the Chado load Tripal sync workflow with the parameters set above
run_workflow_for_current_organism.run_workflow(workflow_path=workflow,
workflow_parameters=workflow_parameters,
datamap=run_workflow_for_current_organism.datamap,
workflow_name="Chado load Tripal synchronize")
# Jbrowse creation workflow
elif "Jbrowse" in str(workflow):
logging.info("Executing workflow 'Jbrowse'")
run_workflow_for_current_organism.connect_to_instance()
run_workflow_for_current_organism.set_get_history()
run_workflow_for_current_organism.get_instance_attributes()
run_workflow_for_current_organism.get_organism_and_analyses_ids()
run_workflow_for_current_organism.import_datasets_into_history()
GENOME_FASTA_FILE = "0"
GFF_FILE = "1"
ADD_JBROWSE = "2"
ADD_ORGANISM_TO_JBROWSE = "3"
workflow_parameters = {}
workflow_parameters[GENOME_FASTA_FILE] = {}
workflow_parameters[GFF_FILE] = {}
# Jbrowse custom feature url
workflow_parameters[ADD_JBROWSE] = {"jb_menu": {"menu_url": "http://{hostname}:{port}/sp/{genus_sp}/feature/{Genus}/{species}/{id}".format(hostname=run_workflow_for_current_organism.config["hostname"],
port=run_workflow_for_current_organism.config["http_port"],
genus_sp=run_workflow_for_current_organism.genus_species,
Genus=run_workflow_for_current_organism.genus_uppercase,
species=run_workflow_for_current_organism.species,
id="id")}}
# Organism to add to the Jbrowse "container" (consists of a name and an id, not tied to the galaxy instance or chado/tripal names and ids)
workflow_parameters[ADD_ORGANISM_TO_JBROWSE] = {"name": [{"name": run_workflow_for_current_organism.full_name,
"unique_id": run_workflow_for_current_organism.abbreviation}]}
run_workflow_for_current_organism.datamap = {}
run_workflow_for_current_organism.datamap[GENOME_FASTA_FILE] = {"src": "ldda", "id": run_workflow_for_current_organism.datasets["genome_file"]}
run_workflow_for_current_organism.datamap[GFF_FILE] = {"src": "ldda", "id": run_workflow_for_current_organism.datasets["gff_file"]}
# Run the jbrowse creation workflow
run_workflow_for_current_organism.run_workflow(workflow_path=workflow,
workflow_parameters=workflow_parameters,
datamap=run_workflow_for_current_organism.datamap,
workflow_name="Chado load Tripal synchronize")
else:
logging.critical("The galaxy container for %s is not ready yet!" % run_workflow_for_current_organism.full_name)
sys.exit()