#!/usr/bin/python # -*- coding: utf-8 -*- import logging import sys import os import subprocess import time import json import bioblend from bioblend import galaxy import utilities def get_galaxy_instance(instance_url, email, password): """ Test the connection to the galaxy instance for the current organism Exit if we cannot connect to the instance """ logging.info("Connecting to the galaxy instance (%s)" % instance_url) instance = galaxy.GalaxyInstance(url=instance_url, email=email, password=password) try: instance.histories.get_histories() except bioblend.ConnectionError: logging.critical("Cannot connect to galaxy instance (%s) " % instance_url) sys.exit() else: logging.info("Successfully connected to galaxy instance (%s) " % instance_url) return instance def get_history(instance, history_name): """ Create or set the working history to the current species one :return: """ try: histories = instance.histories.get_histories(name=history_name) history_id = histories[0]["id"] logging.debug("History ID set for {0}: {1}".format(history_name, history_id)) except IndexError: logging.info("Creating history for %s" % history_name) history = instance.histories.create_history(name=history_name) history_id = history["id"] logging.debug("History ID set for {0}: {1}".format(history_name, history_id)) return history_id def check_wf_param(full_name, params): if not utilities.no_empty_items(params): logging.critical( "One empty workflow parameter found for organism {0}: {1})".format(full_name, params)) sys.exit() def check_galaxy_state(network_name, script_dir): """ Read the logs of the galaxy container for the current species to check if the service is "ready" """ # Run supervisorctl status in the galaxy container via serexec # Change serexec permissions in repo try: os.chmod("%s/serexec" % script_dir, 0o0755) except PermissionError: logging.warning("serexec permissions incorrect in %s" % script_dir) galaxy_logs = subprocess.run(["%s/serexec" % script_dir, "{0}_galaxy".format(network_name), "supervisorctl", "status", "galaxy:"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if "galaxy:galaxy_web RUNNING" in str(galaxy_logs.stdout) \ and "galaxy:handler0 RUNNING" in str(galaxy_logs.stdout) \ and "galaxy:handler1 RUNNING" in str(galaxy_logs.stdout): return 1 else: return 0 def get_species_history_id(instance, full_name): """ Set and return the current species history id in its galaxy instance :param instance: :param full_name: :return: """ histories = instance.histories.get_histories(name=str(full_name)) history_id = histories[0]["id"] show_history = instance.histories.show_history(history_id=history_id) return [history_id, show_history] def run_tool(instance, tool_id, history_id, tool_inputs): output_dict = None try: logging.debug("Running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs)) output_dict = instance.tools.run_tool( tool_id=tool_id, history_id=history_id, tool_inputs=tool_inputs) except bioblend.ConnectionError: logging.error("Unexpected HTTP response (bioblend.ConnectionError) when running tool {0} with tool inputs: {1}".format(tool_id, tool_inputs)) return output_dict def run_tool_and_download_single_output_dataset(instance, tool_id, history_id, tool_inputs, time_sleep = 0): output_dict = run_tool(instance, tool_id, history_id, tool_inputs) if not time_sleep is None: time.sleep(time_sleep) single_output_dataset_id = output_dict["outputs"][0]["id"] dataset = instance.datasets.download_dataset(dataset_id=single_output_dataset_id) return dataset def install_repository_revision(instance, tool_id, version, changeset_revision): tool_dict = instance.tools.show_tool(tool_id) current_version = tool_dict["version"] toolshed_dict = tool_dict["tool_shed_repository"] if current_version != version: name = toolshed_dict["name"] owner = toolshed_dict["owner"] toolshed = "https://" + toolshed_dict["tool_shed"] logging.warning("Installing changeset revision {0} for {1}".format(changeset_revision, name)) instance.toolshed.install_repository_revision(tool_shed_url=toolshed, name=name, owner=owner, changeset_revision=changeset_revision, install_tool_dependencies=True, install_repository_dependencies=False, install_resolver_dependencies=True) def install_workflow_tools(instance, workflow_path): """ Read a .ga file to extract the information about the different tools called. Check if every tool is installed via a "show_tool". If a tool is not installed (versions don't match), send a warning to the logger and install the required changeset (matching the tool version) Doesn't do anything if versions match :return: """ logging.info("Validating that installed tools versions and changesets match workflow versions") # Load the workflow file (.ga) in a buffer with open(workflow_path, 'r') as ga_in_file: # Then store the decoded json dictionary workflow_dict = json.load(ga_in_file) # Look up every "step_id" looking for tools for step in workflow_dict["steps"].values(): if step["tool_id"]: # Check if an installed version matches the workflow tool version # (If it's not installed, the show_tool version returned will be a default version with the suffix "XXXX+0") install_repository_revision(tool_id=step["tool_id"], version=step["tool_version"], changeset_revision=step["tool_shed_repository"]["changeset_revision"], instance=instance) logging.info("Tools versions and changeset_revisions from workflow validated")