-
Arthur Le Bars authored
Deploy stacks and load data both working, only phaeoexplorer workflow script to finish. Data loading done directly from the script (not calling the container script anymore). Also some polishing.
e434fd52
run_workflow_phaeoexplorer.py 18.89 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
import bioblend
import bioblend.galaxy.objects
import argparse
import os
import logging
import sys
import json
import utilities
import speciesData
from gga_autoload.gga_load_data import metadata_generator
from bioblend import galaxy
"""
gga_init.py
Usage: $ python3 gga_init.py -i example.yml [OPTIONS]
"""
class RunWorkflow(speciesData.SpeciesData):
"""
Run a workflow into the galaxy instance's history of a given species
This script is made to work for Phaeoexplorer GGA environments, but can be adapted to run any workflow,
provided the user creates their own workflow in a .ga format, and change the set_parameters function
to have the correct parameters for their workflow
"""
def get_species_history_id(self):
"""
Set and return the current species history id in its galaxy instance
:return:
"""
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
self.instance.histories.show_history(history_id=self.history_id)
return self.history_id
def get_instance_attributes(self):
"""
retrieves instance attributes:
- working history ID
- libraries ID (there should only be one library!)
- datasets IDs
:return:
"""
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.debug("history ID: " + self.history_id)
libraries = self.instance.libraries.get_libraries() # normally only one library
self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library
logging.debug("library ID: " + self.history_id)
instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id)
folders_ids = {}
current_folder_name = ""
for i in instance_source_data_folders:
for k, v in i.items():
if k == "name":
folders_ids[v] = 0
current_folder_name = v
if k == "id":
folders_ids[current_folder_name] = v
logging.info("Folders and datasets IDs: ")
datasets = dict()
for k, v in folders_ids.items():
logging.info("\t" + k + ": " + v)
if k == "/genome":
sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
for k2, v2 in sub_folder_content.items():
for e in v2:
if type(e) == dict:
if e["name"].endswith(".fa"):
datasets["genome_file"] = e["ldda_id"]
logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
elif k == "/annotation/" + self.genus_species:
sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
for k2, v2 in sub_folder_content.items():
for e in v2:
if type(e) == dict:
# TODO: manage several files of the same type and manage versions
if e["name"].endswith("transcripts-gff.fa"):
datasets["transcripts_file"] = e["ldda_id"]
logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
elif e["name"].endswith("proteins.fasta"):
datasets["proteins_file"] = e["ldda_id"]
logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
elif e["name"].endswith(".gff"):
datasets["gff_file"] = e["ldda_id"]
logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
elif e["name"].endswith("MALE"):
datasets["gff_file"] = e["ldda_id"]
logging.info("\t\t" + e["name"] + ": " + e["ldda_id"])
return {"history_id": self.history_id, "library_id": self.library_id, "datasets": datasets}
def prepare_history(self):
"""
Galaxy instance startup in preparation for importing datasets and running a workflow
- Remove Homo sapiens from the chado database.
- Add organism and analyses into the chado database --> separate
- Get any other existing organisms IDs before updating the galaxy instance --> separate
Calling this function is mandatory to have a working galaxy instance history
:return:
"""
self.connect_to_instance()
self.get_species_history_id()
histories = self.instance.histories.get_histories(name=str(self.full_name))
# Create the first history
if not histories:
self.instance.histories.create_history(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.debug("history ID: " + self.history_id)
# libraries = self.instance.libraries.get_libraries() # routine check: one library
# self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library
logging.debug("library ID: " + self.history_id)
instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id)
# Delete Homo sapiens from Chado database
logging.debug("Getting 'Homo sapiens' ID in instance's chado database")
get_sapiens_id_job = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
history_id=self.history_id,
tool_inputs={"genus": "Homo", "species": "sapiens"})
get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"]
get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output)
try:
logging.debug("Deleting Homo 'sapiens' in the instance's chado database")
get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0]
sapiens_id = str(
get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2",
history_id=self.history_id,
tool_inputs={"organism": str(sapiens_id)})
except bioblend.ConnectionError:
logging.debug("Homo sapiens isn't in the instance's chado database")
except IndexError:
logging.debug("Homo sapiens isn't in the instance's chado database")
pass
# Add organism (species) to chado
logging.info("Adding organism to the instance's chado database")
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2",
history_id=self.history_id,
tool_inputs={"abbr": self.abbreviation,
"genus": self.genus,
"species": self.species,
"common": self.common})
# Add OGS analysis to chado
logging.info("Adding OGS analysis to the instance's chado database")
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2",
history_id=self.history_id,
tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version,
"program": "Performed by Genoscope",
"programversion": str("OGS" + self.ogs_version),
"sourcename": "Genoscope",
"date_executed": self.date})
# Add genome analysis to chado
logging.info("Adding genome analysis to the instance's chado database")
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2",
history_id=self.history_id,
tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version,
"program": "Performed by Genoscope",
"programversion": str("genome v" + self.genome_version),
"sourcename": "Genoscope",
"date_executed": self.date})
self.get_organism_and_analyses_ids()
logging.info("Finished initializing instance")
def run_workflow(self, workflow_name, workflow_parameters, datamap):
"""
Run the "main" workflow in the galaxy instance
- import data to library
- load fasta and gff
- sync with tripal
- add jbrowse + organism
- fill in the tripal views
TODO: map tool name to step id
:param workflow_name:
:param workflow_parameters:
:param datamap:
:return:
"""
logging.debug("running workflow: " + str(workflow_name))
workflow_ga_file = self.main_dir + "Galaxy-Workflow-" + workflow_name + ".ga"
if self.strain != "":
custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga"
custom_ga_file_path = os.path.abspath(custom_ga_file)
else:
custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga"
custom_ga_file_path = os.path.abspath(custom_ga_file)
with open(workflow_ga_file, 'r') as ga_in_file:
workflow = str(ga_in_file.readlines())
# ugly fix for the jbrowse parameters
workflow = workflow.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}',
str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"')
workflow = workflow.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"',
str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"')
workflow = workflow.replace("\\\\", "\\") # to restore the correct amount of backslashes in the workflow string before import
# test
workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}',
"http://localhost/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}")
# TODO: Uncomment next lines in production
# workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}',
# "http://abims--gga.sb-roscoff.fr/sp/" + self.genus_lowercase + "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}")
workflow = workflow[2:-2] # if the line under doesn't output a correct json
# workflow = workflow[:-2] # if the line above doesn't output a correct json
workflow_dict = json.loads(workflow)
self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict)
self.workflow_name = workflow_name
workflow_attributes = self.instance.workflows.get_workflows(name=self.workflow_name)
workflow_id = workflow_attributes[0]["id"]
show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id)
logging.debug("Workflow ID: " + workflow_id)
logging.debug("Inputs:")
logging.debug(show_workflow["Inputs"])
self.instance.workflows.invoke_workflow(workflow_id=workflow_id,
history_id=self.history_id,
params=workflow_parameters,
inputs=datamap,
inputs_by="")
self.instance.workflows.delete_workflow(workflow_id=workflow_id) # TODO : Keep for prod (add a "if test" condition)
def get_organism_and_analyses_ids(self):
"""
Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado
doesn't accept organism/analyses names as valid inputs
:return:
"""
# Get the ID for the current organism in chado
org = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
history_id=self.history_id,
tool_inputs={"genus": self.genus, "species": self.species})
org_job_out = org["outputs"][0]["id"]
org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out)
try:
org_output = json.loads(org_json_output)[0]
self.org_id = str(org_output["organism_id"]) # id needs to be a str to be recognized by chado tools
except IndexError:
logging.debug("no organism matching " + self.full_name + " exists in the instance's chado database")
# Get the ID for the OGS analysis in chado
ogs_analysis = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2",
history_id=self.history_id,
tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version})
ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"]
ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out)
try:
ogs_analysis_output = json.loads(ogs_analysis_json_output)[0]
self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"])
except IndexError:
logging.debug("no matching OGS analysis exists in the instance's chado database")
# Get the ID for the genome analysis in chado
genome_analysis = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2",
history_id=self.history_id,
tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version})
genome_analysis_job_out = genome_analysis["outputs"][0]["id"]
genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out)
try:
genome_analysis_output = json.loads(genome_analysis_json_output)[0]
self.genome_analysis_id = str(genome_analysis_output["analysis_id"])
except IndexError:
logging.debug("no matching genome analysis exists in the instance's chado database")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
"with galaxy instances for GGA"
", following the protocol @ "
"http://gitlab.sb-roscoff.fr/abims/e-infra/gga")
parser.add_argument("input",
type=str,
help="Input file (yml)")
parser.add_argument("-v", "--verbose",
help="Increase output verbosity",
action="store_false")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.info("Start")
sp_dict_list = utilities.parse_input(args.input)
for sp_dict in sp_dict_list:
run_workflow_for_current_organism = RunWorkflow(parameters_dictionary=sp_dict)
run_workflow_for_current_organism.main_dir = os.path.abspath(args.dir)
if args.init_instance:
logging.info(" Initializing the galaxy instance")
run_workflow_for_current_organism.init_instance()
run_workflow_for_current_organism.get_instance_attributes()
# metadata[genus_species_strain_sex]["initialized"] = True
if args.load_data:
logging.info("Loading data into galaxy")
# run_workflow_for_current_organism.load_data()
# metadata[genus_species_strain_sex]["data_loaded_in_instance"] = True
if args.run_main:
logging.info("Running main workflow")
run_workflow_for_current_organism.get_organism_and_analyses_ids()
workflow_parameters = dict()
load_fasta_dataset="0"
# TODO: Add a name for each parameter
workflow_parameters[load_fasta_dataset] = {}
workflow_parameters["1"] = {}
workflow_parameters["2"] = {}
workflow_parameters["3"] = {}
workflow_parameters["4"] = {"organism": run_workflow_for_current_organism.org_id,
"analysis_id": run_workflow_for_current_organism.genome_analysis_id,
"do_update": "true"}
workflow_parameters["5"] = {"organism": run_workflow_for_current_organism.org_id,
"analysis_id": run_workflow_for_current_organism.ogs_analysis_id}
workflow_parameters["6"] = {"organism_id": run_workflow_for_current_organism.org_id}
workflow_parameters["7"] = {"analysis_id": run_workflow_for_current_organism.ogs_analysis_id}
workflow_parameters["8"] = {"analysis_id": run_workflow_for_current_organism.genome_analysis_id}
workflow_parameters["9"] = {"organism_id": run_workflow_for_current_organism.org_id}
workflow_parameters["10"] = {}
workflow_parameters["11"] = {}
run_workflow_for_current_organism.datamap = dict()
run_workflow_for_current_organism.datamap["0"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["genome_file"]}
run_workflow_for_current_organism.datamap["1"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["gff_file"]}
run_workflow_for_current_organism.datamap["2"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["proteins_file"]}
run_workflow_for_current_organism.datamap["3"] = {"src": "hda", "id": run_workflow_for_current_organism.datasets["transcripts_file"]}
run_workflow_for_current_organism.run_workflow(workflow_name="main",
workflow_parameters=workflow_parameters,
datamap=run_workflow_for_current_organism.datamap)
# metadata[genus_species_strain_sex]["workflows_run"] = metadata[genus_species_strain_sex]["workflows_run"].append("main")