Skip to content
Snippets Groups Projects
runWorkflowPhaeo.py 5.22 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging
import json
import time

import utilities_bioblend
import speciesData
import constants
import constants_phaeo

class OrgWorkflowParam:

    def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name,
                 org_id, history_id, instance):
        self.genus_uppercase = genus_uppercase
        self.chado_species_name = chado_species_name,
        self.full_name = full_name
        self.species_folder_name = species_folder_name
        self.org_id = org_id
        self.history_id = history_id
        self.instance = instance

class RunWorkflow(speciesData.SpeciesData):
    """
    Run a workflow into the galaxy instance's history of a given species


    This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow,
    provided the user creates their own workflow in a .ga format, and change the set_parameters function
    to have the correct parameters for their workflow

    """

    def __init__(self, parameters_dictionary):

        super().__init__(parameters_dictionary)
        self.history_name = str(self.genus_species)

    def set_galaxy_instance(self, config):

        # Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1
        instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format(
            config[constants.CONF_ALL_HTTP_PORT],
            self.genus_species)

        self.instance = utilities_bioblend.get_galaxy_instance(
            instance_url=instance_url,
            email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL],
            password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD],
        )

    def set_history(self):
        self.history_id = utilities_bioblend.get_history(
            instance=self.instance,
            history_name=self.history_name)

    def get_analyses(self):

        get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
            instance=self.instance,
            tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID,
            history_id=self.history_id,
            tool_inputs={},
            time_sleep=10
        )
        analyses_dict_list = json.loads(get_analyses_tool_dataset)
        return analyses_dict_list

    def add_analysis(self, name, programversion, sourcename):

        add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset(
            instance=self.instance,
            tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID,
            history_id=self.history_id,
            tool_inputs={"name": name,
                         "program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM,
                         "programversion": programversion,
                         "sourcename": sourcename,
                         "date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE})
        analysis_dict = json.loads(add_analysis_tool_dataset)
        analysis_id = str(analysis_dict["analysis_id"])

        return analysis_id

    def sync_analysis(self, analysis_id):

        time.sleep(60)
        utilities_bioblend.run_tool(
            instance=self.instance,
            tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID,
            history_id=self.history_id,
            tool_inputs={"analysis_id": analysis_id})

    def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename):
        """
        Add one analysis to Chado database
        Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow)
        Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism)
        """

        analysis_id = None

        # Look up list of outputs (dictionaries)
        for analyses_dict in analyses_dict_list:
            if analyses_dict["name"] == analysis_name:
                analysis_id = str(analyses_dict["analysis_id"])

        if analysis_id is None:
            analysis_id = self.add_analysis(
                name=analysis_name,
                programversion=analysis_programversion,
                sourcename=analysis_sourcename
            )

        # Synchronize analysis in Tripal
        logging.info("Synchronizing analysis %s in Tripal" % analysis_name)
        self.sync_analysis(analysis_id=analysis_id)

        return analysis_id

    def get_invocation_report(self, workflow_name):
        """
        Debugging method for workflows

        Simply logs and returns a report of the previous workflow invocation (execution of a workflow in
        the instance via the API)

        :param workflow_name:
        :return:
        """

        workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name)
        workflow_id = workflow_attributes[1]["id"]  # Most recently imported workflow (index 1 in the list)
        invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id)
        invocation_id = invocations[1]["id"]  # Most recent invocation
        invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id)

        logging.debug(invocation_report)

        return invocation_report