-
Loraine Gueguen authored2fbeb177
runWorkflowPhaeo.py 5.22 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import json
import time
import utilities_bioblend
import speciesData
import constants
import constants_phaeo
class OrgWorkflowParam:
def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name,
org_id, history_id, instance):
self.genus_uppercase = genus_uppercase
self.chado_species_name = chado_species_name,
self.full_name = full_name
self.species_folder_name = species_folder_name
self.org_id = org_id
self.history_id = history_id
self.instance = instance
class RunWorkflow(speciesData.SpeciesData):
"""
Run a workflow into the galaxy instance's history of a given species
This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow,
provided the user creates their own workflow in a .ga format, and change the set_parameters function
to have the correct parameters for their workflow
"""
def __init__(self, parameters_dictionary):
super().__init__(parameters_dictionary)
self.history_name = str(self.genus_species)
def set_galaxy_instance(self, config):
# Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1
instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format(
config[constants.CONF_ALL_HTTP_PORT],
self.genus_species)
self.instance = utilities_bioblend.get_galaxy_instance(
instance_url=instance_url,
email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL],
password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD],
)
def set_history(self):
self.history_id = utilities_bioblend.get_history(
instance=self.instance,
history_name=self.history_name)
def get_analyses(self):
get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID,
history_id=self.history_id,
tool_inputs={},
time_sleep=10
)
analyses_dict_list = json.loads(get_analyses_tool_dataset)
return analyses_dict_list
def add_analysis(self, name, programversion, sourcename):
add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID,
history_id=self.history_id,
tool_inputs={"name": name,
"program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM,
"programversion": programversion,
"sourcename": sourcename,
"date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE})
analysis_dict = json.loads(add_analysis_tool_dataset)
analysis_id = str(analysis_dict["analysis_id"])
return analysis_id
def sync_analysis(self, analysis_id):
time.sleep(60)
utilities_bioblend.run_tool(
instance=self.instance,
tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID,
history_id=self.history_id,
tool_inputs={"analysis_id": analysis_id})
def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename):
"""
Add one analysis to Chado database
Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow)
Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism)
"""
analysis_id = None
# Look up list of outputs (dictionaries)
for analyses_dict in analyses_dict_list:
if analyses_dict["name"] == analysis_name:
analysis_id = str(analyses_dict["analysis_id"])
if analysis_id is None:
analysis_id = self.add_analysis(
name=analysis_name,
programversion=analysis_programversion,
sourcename=analysis_sourcename
)
# Synchronize analysis in Tripal
logging.info("Synchronizing analysis %s in Tripal" % analysis_name)
self.sync_analysis(analysis_id=analysis_id)
return analysis_id
def get_invocation_report(self, workflow_name):
"""
Debugging method for workflows
Simply logs and returns a report of the previous workflow invocation (execution of a workflow in
the instance via the API)
:param workflow_name:
:return:
"""
workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name)
workflow_id = workflow_attributes[1]["id"] # Most recently imported workflow (index 1 in the list)
invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id)
invocation_id = invocations[1]["id"] # Most recent invocation
invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id)
logging.debug(invocation_report)
return invocation_report