#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import json import time import utilities_bioblend import speciesData import constants import constants_phaeo class OrgWorkflowParam: def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name, org_id, history_id, instance): self.genus_uppercase = genus_uppercase self.chado_species_name = chado_species_name, self.full_name = full_name self.species_folder_name = species_folder_name self.org_id = org_id self.history_id = history_id self.instance = instance class RunWorkflow(speciesData.SpeciesData): """ Run a workflow into the galaxy instance's history of a given species This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow, provided the user creates their own workflow in a .ga format, and change the set_parameters function to have the correct parameters for their workflow """ def __init__(self, parameters_dictionary): super().__init__(parameters_dictionary) self.history_name = str(self.genus_species) def set_galaxy_instance(self, config): # Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1 instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format( config[constants.CONF_ALL_HTTP_PORT], self.genus_species) self.instance = utilities_bioblend.get_galaxy_instance( instance_url=instance_url, email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD], ) def set_history(self): self.history_id = utilities_bioblend.get_history( instance=self.instance, history_name=self.history_name) def get_analyses(self): get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, history_id=self.history_id, tool_inputs={}, time_sleep=10 ) analyses_dict_list = json.loads(get_analyses_tool_dataset) return analyses_dict_list def add_analysis(self, name, programversion, sourcename): add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, history_id=self.history_id, tool_inputs={"name": name, "program": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_PROGRAM, "programversion": programversion, "sourcename": sourcename, "date_executed": constants_phaeo.ADD_ANALYSIS_TOOL_PARAM_DATE}) analysis_dict = json.loads(add_analysis_tool_dataset) analysis_id = str(analysis_dict["analysis_id"]) return analysis_id def sync_analysis(self, analysis_id): time.sleep(60) utilities_bioblend.run_tool( instance=self.instance, tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, history_id=self.history_id, tool_inputs={"analysis_id": analysis_id}) def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_programversion, analysis_sourcename): """ Add one analysis to Chado database Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow) Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism) """ analysis_id = None # Look up list of outputs (dictionaries) for analyses_dict in analyses_dict_list: if analyses_dict["name"] == analysis_name: analysis_id = str(analyses_dict["analysis_id"]) if analysis_id is None: analysis_id = self.add_analysis( name=analysis_name, programversion=analysis_programversion, sourcename=analysis_sourcename ) # Synchronize analysis in Tripal logging.info("Synchronizing analysis %s in Tripal" % analysis_name) self.sync_analysis(analysis_id=analysis_id) return analysis_id def get_invocation_report(self, workflow_name): """ Debugging method for workflows Simply logs and returns a report of the previous workflow invocation (execution of a workflow in the instance via the API) :param workflow_name: :return: """ workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name) workflow_id = workflow_attributes[1]["id"] # Most recently imported workflow (index 1 in the list) invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id) invocation_id = invocations[1]["id"] # Most recent invocation invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id) logging.debug(invocation_report) return invocation_report