Newer
Older

Loraine Gueguen
committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import json

Loraine Gueguen
committed
import time

Loraine Gueguen
committed
import utilities_bioblend

Loraine Gueguen
committed
import constants
import constants_phaeo
class OrgWorkflowParam:
def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name,
org_id, history_id, instance):
self.genus_uppercase = genus_uppercase
self.chado_species_name = chado_species_name

Loraine Gueguen
committed
self.full_name = full_name
self.species_folder_name = species_folder_name
self.org_id = org_id
self.history_id = history_id
self.instance = instance
class RunWorkflow(species_data.SpeciesData):

Loraine Gueguen
committed
"""
Run a workflow into the galaxy instance's history of a given species
This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow,
provided the user creates their own workflow in a .ga format, and change the set_parameters function
to have the correct parameters for their workflow
"""
def __init__(self, parameters_dictionary):
super().__init__(parameters_dictionary)
self.history_name = str(self.genus_species)
def set_galaxy_instance(self, config):
# Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1
self.instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format(

Loraine Gueguen
committed
config[constants.CONF_ALL_HTTP_PORT],
self.genus_species)
self.instance = utilities_bioblend.get_galaxy_instance(

Loraine Gueguen
committed
email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL],
password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD],
)
def set_history(self):
self.history_id = utilities_bioblend.get_history(
instance=self.instance,
history_name=self.history_name)
def get_analyses(self):
get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID,
history_id=self.history_id,
tool_inputs={},
time_sleep=10
)
analyses_dict_list = json.loads(get_analyses_tool_dataset)
return analyses_dict_list
def add_analysis(self, name, programversion, sourcename):
add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID,
history_id=self.history_id,
tool_inputs={"name": name,
"program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM,
"programversion": programversion,
"sourcename": sourcename,
"date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE},
time_sleep = 10

Loraine Gueguen
committed
analysis_dict = json.loads(add_analysis_tool_dataset)
analysis_id = str(analysis_dict["analysis_id"])
return analysis_id
def sync_analysis(self, analysis_id):
utilities_bioblend.run_tool(
instance=self.instance,
tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID,
history_id=self.history_id,
tool_inputs={"analysis_id": analysis_id})
def add_analysis_if_needed(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename):

Loraine Gueguen
committed
"""
Add one analysis to Chado database
Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow)
Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism)
:param analyses_dict_list: output dict from get_analyses()
:param analysis_name:
:param analysis_programversion:
:param analysis_sourcename:
:return:

Loraine Gueguen
committed
"""
analysis_id = None
# Look up list of outputs (dictionaries)
for analyses_dict in analyses_dict_list:
if analyses_dict["name"] == analysis_name:
analysis_id = str(analyses_dict["analysis_id"])
if analysis_id is None:
analysis_id = self.add_analysis(
name=analysis_name,
programversion=analysis_programversion,
sourcename=analysis_sourcename
)
return analysis_id
def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename):
"""
Add one analysis to Chado database
Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow)
Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism)
:param analyses_dict_list: output dict from get_analyses()
:param analysis_name:
:param analysis_programversion:
:param analysis_sourcename:
:return:
"""
analysis_id = self.add_analysis_if_needed(analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename)

Loraine Gueguen
committed
# Synchronize analysis in Tripal
logging.info("Synchronizing analysis %s in Tripal" % analysis_name)

Loraine Gueguen
committed
time.sleep(10)

Loraine Gueguen
committed
self.sync_analysis(analysis_id=analysis_id)
return analysis_id
def get_invocation_report(self, workflow_name):
"""
Debugging method for workflows
Simply logs and returns a report of the previous workflow invocation (execution of a workflow in
the instance via the API)
:param workflow_name:
:return:
"""
workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name)
workflow_id = workflow_attributes[1]["id"] # Most recently imported workflow (index 1 in the list)
invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id)
invocation_id = invocations[1]["id"] # Most recent invocation
invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id)
logging.debug(invocation_report)
return invocation_report