Newer
Older

Loraine Gueguen
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import json
import time
import utilities_bioblend
import speciesData
import constants
import constants_phaeo
class OrgWorkflowParam:
def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name,
org_id, history_id, instance):
self.genus_uppercase = genus_uppercase
self.chado_species_name = chado_species_name,
self.full_name = full_name
self.species_folder_name = species_folder_name
self.org_id = org_id
self.history_id = history_id
self.instance = instance
class RunWorkflow(speciesData.SpeciesData):
"""
Run a workflow into the galaxy instance's history of a given species
This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow,
provided the user creates their own workflow in a .ga format, and change the set_parameters function
to have the correct parameters for their workflow
"""
def __init__(self, parameters_dictionary):
super().__init__(parameters_dictionary)
self.history_name = str(self.genus_species)
def set_galaxy_instance(self, config):
# Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1
instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format(
config[constants.CONF_ALL_HTTP_PORT],
self.genus_species)
self.instance = utilities_bioblend.get_galaxy_instance(
instance_url=instance_url,
email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL],
password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD],
)
def set_history(self):
self.history_id = utilities_bioblend.get_history(
instance=self.instance,
history_name=self.history_name)
def get_analyses(self):
get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID,
history_id=self.history_id,
tool_inputs={},
time_sleep=10
)
analyses_dict_list = json.loads(get_analyses_tool_dataset)
return analyses_dict_list
def add_analysis(self, name, programversion, sourcename):
add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
instance=self.instance,
tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID,
history_id=self.history_id,
tool_inputs={"name": name,
"program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM,
"programversion": programversion,
"sourcename": sourcename,
"date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE})
analysis_dict = json.loads(add_analysis_tool_dataset)
analysis_id = str(analysis_dict["analysis_id"])
return analysis_id
def sync_analysis(self, analysis_id):
time.sleep(60)
utilities_bioblend.run_tool(
instance=self.instance,
tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID,
history_id=self.history_id,
tool_inputs={"analysis_id": analysis_id})
def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename):
"""
Add one analysis to Chado database
Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow)
Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism)
"""
analysis_id = None
# Look up list of outputs (dictionaries)
for analyses_dict in analyses_dict_list:
if analyses_dict["name"] == analysis_name:
analysis_id = str(analyses_dict["analysis_id"])
if analysis_id is None:
analysis_id = self.add_analysis(
name=analysis_name,
programversion=analysis_programversion,
sourcename=analysis_sourcename
)
# Synchronize analysis in Tripal
logging.info("Synchronizing analysis %s in Tripal" % analysis_name)
self.sync_analysis(analysis_id=analysis_id)
return analysis_id
def get_invocation_report(self, workflow_name):
"""
Debugging method for workflows
Simply logs and returns a report of the previous workflow invocation (execution of a workflow in
the instance via the API)
:param workflow_name:
:return:
"""
workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name)
workflow_id = workflow_attributes[1]["id"] # Most recently imported workflow (index 1 in the list)
invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id)
invocation_id = invocations[1]["id"] # Most recent invocation
invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id)
logging.debug(invocation_report)
return invocation_report