-
Loraine Gueguen authored6c86b6f9
utilities_bioblend.py 6.56 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
import logging
import sys
import os
import subprocess
import time
import json
import bioblend
from bioblend import galaxy
import utilities
def get_galaxy_instance(instance_url, email, password):
"""
Test the connection to the galaxy instance for the current organism
Exit if we cannot connect to the instance
"""
logging.info("Connecting to the galaxy instance (%s)" % instance_url)
instance = galaxy.GalaxyInstance(url=instance_url,
email=email,
password=password)
try:
instance.histories.get_histories()
except bioblend.ConnectionError:
logging.critical("Cannot connect to galaxy instance (%s) " % instance_url)
sys.exit()
else:
logging.info("Successfully connected to galaxy instance (%s) " % instance_url)
return instance
def get_history(instance, history_name):
"""
Create or set the working history to the current species one
:return:
"""
try:
histories = instance.histories.get_histories(name=history_name)
history_id = histories[0]["id"]
logging.debug("History ID set for {0}: {1}".format(history_name, history_id))
except IndexError:
logging.info("Creating history for %s" % history_name)
history = instance.histories.create_history(name=history_name)
history_id = history["id"]
logging.debug("History ID set for {0}: {1}".format(history_name, history_id))
return history_id
def check_wf_param(full_name, params):
if not utilities.no_empty_items(params):
logging.critical(
"One empty workflow parameter found for organism {0}: {1})".format(full_name, params))
sys.exit()
def check_galaxy_state(network_name, script_dir):
"""
Read the logs of the galaxy container for the current species to check if the service is "ready"
"""
# Run supervisorctl status in the galaxy container via serexec
# Change serexec permissions in repo
try:
os.chmod("%s/serexec" % script_dir, 0o0755)
except PermissionError:
logging.warning("serexec permissions incorrect in %s" % script_dir)
galaxy_logs = subprocess.run(["%s/serexec" % script_dir, "{0}_galaxy".format(network_name),
"supervisorctl", "status", "galaxy:"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if "galaxy:galaxy_web RUNNING" in str(galaxy_logs.stdout) \
and "galaxy:handler0 RUNNING" in str(galaxy_logs.stdout) \
and "galaxy:handler1 RUNNING" in str(galaxy_logs.stdout):
return 1
else:
return 0
def get_species_history_id(instance, full_name):
"""
Set and return the current species history id in its galaxy instance
:param instance:
:param full_name:
:return:
"""
histories = instance.histories.get_histories(name=str(full_name))
history_id = histories[0]["id"]
show_history = instance.histories.show_history(history_id=history_id)
return [history_id, show_history]
def run_tool(instance, tool_id, history_id, tool_inputs):
output_dict = None
try:
logging.debug("Running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs))
output_dict = instance.tools.run_tool(
tool_id=tool_id,
history_id=history_id,
tool_inputs=tool_inputs)
except bioblend.ConnectionError:
logging.error("Unexpected HTTP response (bioblend.ConnectionError) when running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs))
return output_dict
def run_tool_and_download_single_output_dataset(instance, tool_id, history_id, tool_inputs, time_sleep = 0):
output_dict = run_tool(instance, tool_id, history_id, tool_inputs)
if not time_sleep is None:
time.sleep(time_sleep)
single_output_dataset_id = output_dict["outputs"][0]["id"]
dataset = instance.datasets.download_dataset(dataset_id=single_output_dataset_id)
return dataset
def install_repository_revision(instance, tool_id, version, changeset_revision):
tool_dict = instance.tools.show_tool(tool_id)
current_version = tool_dict["version"]
toolshed_dict = tool_dict["tool_shed_repository"]
if current_version != version:
name = toolshed_dict["name"]
owner = toolshed_dict["owner"]
toolshed = "https://" + toolshed_dict["tool_shed"]
logging.warning("Installing changeset revision {0} for {1}".format(changeset_revision, name))
instance.toolshed.install_repository_revision(tool_shed_url=toolshed,
name=name,
owner=owner,
changeset_revision=changeset_revision,
install_tool_dependencies=True,
install_repository_dependencies=False,
install_resolver_dependencies=True)
def install_workflow_tools(instance, workflow_path):
"""
Read a .ga file to extract the information about the different tools called.
Check if every tool is installed via a "show_tool".
If a tool is not installed (versions don't match), send a warning to the logger and install the required changeset (matching the tool version)
Doesn't do anything if versions match
:return:
"""
logging.info("Validating that installed tools versions and changesets match workflow versions")
# Load the workflow file (.ga) in a buffer
with open(workflow_path, 'r') as ga_in_file:
# Then store the decoded json dictionary
workflow_dict = json.load(ga_in_file)
# Look up every "step_id" looking for tools
for step in workflow_dict["steps"].values():
if step["tool_id"]:
# Check if an installed version matches the workflow tool version
# (If it's not installed, the show_tool version returned will be a default version with the suffix "XXXX+0")
install_repository_revision(tool_id=step["tool_id"],
version=step["tool_version"],
changeset_revision=step["tool_shed_repository"]["changeset_revision"],
instance=instance)
logging.info("Tools versions and changeset_revisions from workflow validated")