#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import json import time import utilities_bioblend import species_data import constants import constants_phaeo class OrgWorkflowParam: def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name, org_id, history_id, instance): self.genus_uppercase = genus_uppercase self.chado_species_name = chado_species_name self.full_name = full_name self.species_folder_name = species_folder_name self.org_id = org_id self.history_id = history_id self.instance = instance class RunWorkflow(species_data.SpeciesData): """ Run a workflow into the galaxy instance's history of a given species This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow, provided the user creates their own workflow in a .ga format, and change the set_parameters function to have the correct parameters for their workflow """ def __init__(self, parameters_dictionary): super().__init__(parameters_dictionary) self.history_name = str(self.genus_species) def set_galaxy_instance(self, config): # Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1 self.instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format( config[constants.CONF_ALL_HTTP_PORT], self.genus_species) self.instance = utilities_bioblend.get_galaxy_instance( instance_url=self.instance_url, email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD], ) def set_history(self): self.history_id = utilities_bioblend.get_history( instance=self.instance, history_name=self.history_name) def get_analyses(self): get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, history_id=self.history_id, tool_inputs={}, time_sleep=10 ) analyses_dict_list = json.loads(get_analyses_tool_dataset) return analyses_dict_list def add_analysis(self, name, programversion, sourcename): add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, history_id=self.history_id, tool_inputs={"name": name, "program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM, "programversion": programversion, "sourcename": sourcename, "date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE}, time_sleep = 10 ) analysis_dict = json.loads(add_analysis_tool_dataset) analysis_id = str(analysis_dict["analysis_id"]) return analysis_id def sync_analysis(self, analysis_id): utilities_bioblend.run_tool( instance=self.instance, tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, history_id=self.history_id, tool_inputs={"analysis_id": analysis_id}) def add_analysis_if_needed(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename): """ Add one analysis to Chado database Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow) Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism) :param analyses_dict_list: output dict from get_analyses() :param analysis_name: :param analysis_programversion: :param analysis_sourcename: :return: """ analysis_id = None # Look up list of outputs (dictionaries) for analyses_dict in analyses_dict_list: if analyses_dict["name"] == analysis_name: analysis_id = str(analyses_dict["analysis_id"]) if analysis_id is None: analysis_id = self.add_analysis( name=analysis_name, programversion=analysis_programversion, sourcename=analysis_sourcename ) return analysis_id def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename): """ Add one analysis to Chado database Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow) Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism) :param analyses_dict_list: output dict from get_analyses() :param analysis_name: :param analysis_programversion: :param analysis_sourcename: :return: """ analysis_id = self.add_analysis_if_needed(analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename) # Synchronize analysis in Tripal logging.info("Synchronizing analysis %s in Tripal" % analysis_name) time.sleep(10) self.sync_analysis(analysis_id=analysis_id) return analysis_id def get_invocation_report(self, workflow_name): """ Debugging method for workflows Simply logs and returns a report of the previous workflow invocation (execution of a workflow in the instance via the API) :param workflow_name: :return: """ workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name) workflow_id = workflow_attributes[1]["id"] # Most recently imported workflow (index 1 in the list) invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id) invocation_id = invocations[1]["id"] # Most recent invocation invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id) logging.debug(invocation_report) return invocation_report