Skip to content
Snippets Groups Projects
utilities_bioblend.py 6.56 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-

import logging
import sys
import os
import subprocess
import time
import json
import bioblend
from bioblend import galaxy

import utilities

def get_galaxy_instance(instance_url, email, password):
    """
    Test the connection to the galaxy instance for the current organism
    Exit if we cannot connect to the instance

    """

    logging.info("Connecting to the galaxy instance (%s)" % instance_url)
    instance = galaxy.GalaxyInstance(url=instance_url,
                                     email=email,
                                     password=password)

    try:
        instance.histories.get_histories()
    except bioblend.ConnectionError:
        logging.critical("Cannot connect to galaxy instance (%s) " % instance_url)
        sys.exit()
    else:
        logging.info("Successfully connected to galaxy instance (%s) " % instance_url)

    return instance

def get_history(instance, history_name):
    """
    Create or set the working history to the current species one

    :return:
    """
    try:
        histories = instance.histories.get_histories(name=history_name)
        history_id = histories[0]["id"]
        logging.debug("History ID set for {0}: {1}".format(history_name, history_id))
    except IndexError:
        logging.info("Creating history for %s" % history_name)
        history = instance.histories.create_history(name=history_name)
        history_id = history["id"]
        logging.debug("History ID set for {0}: {1}".format(history_name, history_id))

    return history_id

def check_wf_param(full_name, params):

    if not utilities.no_empty_items(params):
        logging.critical(
            "One empty workflow parameter found for organism {0}: {1})".format(full_name, params))
        sys.exit()

def check_galaxy_state(network_name, script_dir):
    """
    Read the logs of the galaxy container for the current species to check if the service is "ready"
    """

    # Run supervisorctl status in the galaxy container via serexec
    # Change serexec permissions in repo
    try:
        os.chmod("%s/serexec" % script_dir, 0o0755)
    except PermissionError:
        logging.warning("serexec permissions incorrect in %s" % script_dir)
    galaxy_logs = subprocess.run(["%s/serexec" % script_dir, "{0}_galaxy".format(network_name),
                                  "supervisorctl", "status", "galaxy:"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if "galaxy:galaxy_web                RUNNING" in str(galaxy_logs.stdout) \
            and "galaxy:handler0                  RUNNING" in str(galaxy_logs.stdout) \
            and "galaxy:handler1                  RUNNING" in str(galaxy_logs.stdout):
        return 1
    else:
        return 0

def get_species_history_id(instance, full_name):
    """
    Set and return the current species history id in its galaxy instance

    :param instance:
    :param full_name:
    :return:
    """

    histories = instance.histories.get_histories(name=str(full_name))
    history_id = histories[0]["id"]
    show_history = instance.histories.show_history(history_id=history_id)

    return [history_id, show_history]


def run_tool(instance, tool_id, history_id, tool_inputs):

    output_dict = None
    try:
        logging.debug("Running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs))
        output_dict =  instance.tools.run_tool(
            tool_id=tool_id,
            history_id=history_id,
            tool_inputs=tool_inputs)
    except bioblend.ConnectionError:
        logging.error("Unexpected HTTP response (bioblend.ConnectionError) when running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs))

    return output_dict

def run_tool_and_download_single_output_dataset(instance, tool_id, history_id, tool_inputs, time_sleep = 0):

    output_dict = run_tool(instance, tool_id, history_id, tool_inputs)
    if not time_sleep is None:
        time.sleep(time_sleep)
    single_output_dataset_id = output_dict["outputs"][0]["id"]
    dataset = instance.datasets.download_dataset(dataset_id=single_output_dataset_id)

    return dataset

def install_repository_revision(instance, tool_id, version, changeset_revision):

    tool_dict = instance.tools.show_tool(tool_id)
    current_version = tool_dict["version"]
    toolshed_dict = tool_dict["tool_shed_repository"]

    if current_version != version:
        name = toolshed_dict["name"]
        owner = toolshed_dict["owner"]
        toolshed = "https://" + toolshed_dict["tool_shed"]
        logging.warning("Installing changeset revision {0} for {1}".format(changeset_revision, name))

        instance.toolshed.install_repository_revision(tool_shed_url=toolshed,
                                                      name=name,
                                                      owner=owner,
                                                      changeset_revision=changeset_revision,
                                                      install_tool_dependencies=True,
                                                      install_repository_dependencies=False,
                                                      install_resolver_dependencies=True)

def install_workflow_tools(instance, workflow_path):
    """
    Read a .ga file to extract the information about the different tools called.
    Check if every tool is installed via a "show_tool".
    If a tool is not installed (versions don't match), send a warning to the logger and install the required changeset (matching the tool version)
    Doesn't do anything if versions match

    :return:
    """

    logging.info("Validating that installed tools versions and changesets match workflow versions")

    # Load the workflow file (.ga) in a buffer
    with open(workflow_path, 'r') as ga_in_file:

        # Then store the decoded json dictionary
        workflow_dict = json.load(ga_in_file)

        # Look up every "step_id" looking for tools
        for step in workflow_dict["steps"].values():
            if step["tool_id"]:
                # Check if an installed version matches the workflow tool version
                # (If it's not installed, the show_tool version returned will be a default version with the suffix "XXXX+0")
                install_repository_revision(tool_id=step["tool_id"],
                                                               version=step["tool_version"],
                                                               changeset_revision=step["tool_shed_repository"]["changeset_revision"],
                                                               instance=instance)

    logging.info("Tools versions and changeset_revisions from workflow validated")