#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import json import time import utilities_bioblend import species_data import constants import constants_phaeo class OrgWorkflowParam: def __init__(self, genus_uppercase, chado_species_name, full_name, species_folder_name, org_id, history_id, instance): self.genus_uppercase = genus_uppercase self.chado_species_name = chado_species_name self.full_name = full_name self.species_folder_name = species_folder_name self.org_id = org_id self.history_id = history_id self.instance = instance class RunWorkflow(species_data.SpeciesData): """ Run a workflow into the galaxy instance's history of a given species This script is made to work for a Phaeoexplorer-specific workflow, but can be adapted to run any workflow, provided the user creates their own workflow in a .ga format, and change the set_parameters function to have the correct parameters for their workflow """ def __init__(self, parameters_dictionary): super().__init__(parameters_dictionary) self.history_name = str(self.genus_species) def set_galaxy_instance(self, config): # Set the instance url attribute --> TODO: the localhost rule in the docker-compose still doesn't work on scratchgmodv1 self.instance_url = "http://localhost:{0}/sp/{1}/galaxy/".format( config[constants.CONF_ALL_HTTP_PORT], self.genus_species) self.instance = utilities_bioblend.get_galaxy_instance( instance_url=self.instance_url, email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD], ) def set_history(self): self.history_id = utilities_bioblend.get_history( instance=self.instance, history_name=self.history_name) def get_analyses(self): get_analyses_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, history_id=self.history_id, tool_inputs={}, time_sleep=10 ) analyses_dict_list = json.loads(get_analyses_tool_dataset) return analyses_dict_list def get_organisms(self): get_organisms_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, history_id=self.history_id, tool_inputs={}, time_sleep=10 ) organisms_dict_list = json.loads(get_organisms_tool_dataset) # Turn the dataset into a list for parsing return organisms_dict_list def get_organism(self, organisms_dict_list, genus_uppercase, chado_species_name): org_id = None # Look up list of outputs (dictionaries) for org_dict in organisms_dict_list: if org_dict["genus"] == genus_uppercase and org_dict["species"] == chado_species_name: org_id = str(org_dict["organism_id"]) # id needs to be a str to be recognized by chado tools return org_id def add_analysis(self, name, program, programversion, sourcename, date): add_analysis_tool_dataset = utilities_bioblend.run_tool_and_download_single_output_dataset( instance=self.instance, tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, history_id=self.history_id, tool_inputs={"name": name, "program": program, "programversion": programversion, "sourcename": sourcename, "date_executed": date}, time_sleep = 10 ) analysis_dict = json.loads(add_analysis_tool_dataset) analysis_id = str(analysis_dict["analysis_id"]) return analysis_id def sync_analysis(self, analysis_id): utilities_bioblend.run_tool( instance=self.instance, tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, history_id=self.history_id, tool_inputs={"analysis_id": analysis_id}) def add_analysis_if_needed(self, analyses_dict_list, analysis_name, analysis_program, analysis_programversion, analysis_sourcename, analysis_date): """ Add one analysis to Chado database Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow) Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism) :param analyses_dict_list: output dict from get_analyses() :param analysis_name: :param analysis_programversion: :param analysis_sourcename: :return: """ analysis_id = None # Look up list of outputs (dictionaries) for analyses_dict in analyses_dict_list: if analyses_dict["name"] == analysis_name: analysis_id = str(analyses_dict["analysis_id"]) if analysis_id is None: analysis_id = self.add_analysis( name=analysis_name, program=analysis_program, programversion=analysis_programversion, sourcename=analysis_sourcename, date=analysis_date ) return analysis_id def add_analysis_and_sync(self, analyses_dict_list, analysis_name, analysis_program, analysis_programversion, analysis_sourcename, analysis_date): """ Add one analysis to Chado database Required for Chado Load Tripal Synchronize workflow (which should be ran as the first workflow) Called outside workflow for practical reasons (Chado add doesn't have an input link for analysis or organism) :param analyses_dict_list: output dict from get_analyses() :param analysis_name: :param analysis_programversion: :param analysis_sourcename: :return: """ analysis_id = self.add_analysis_if_needed(analyses_dict_list, analysis_name, analysis_program, analysis_programversion, analysis_sourcename, analysis_date) # Synchronize analysis in Tripal logging.info("Synchronizing analysis %s in Tripal" % analysis_name) time.sleep(10) self.sync_analysis(analysis_id=analysis_id) return analysis_id def install_individual_tools(self): """ This function is used to verify that installed tools called outside workflows have the correct versions and changesets If it finds versions don't match, will install the correct version + changeset in the instance Doesn't do anything if versions match :return: """ logging.info("Validating installed individual tools versions and changesets") # Verify that the add_organism and add_analysis versions are correct in the instance # changeset for 2.3.4+galaxy0 has to be manually found because there is no way to get the wanted changeset of a non installed tool via bioblend # except for workflows (.ga) that already contain the changeset revisions inside the steps ids utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ORGANISMS_TOOL_ID, version=constants_phaeo.GET_ORGANISMS_TOOL_VERSION, changeset_revision=constants_phaeo.GET_ORGANISMS_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.GET_ANALYSES_TOOL_ID, version=constants_phaeo.GET_ANALYSES_TOOL_VERSION, changeset_revision=constants_phaeo.GET_ANALYSES_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ORGANISM_TOOL_ID, version=constants_phaeo.ADD_ORGANISM_TOOL_VERSION, changeset_revision=constants_phaeo.ADD_ORGANISM_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ADD_ANALYSIS_TOOL_ID, version=constants_phaeo.ADD_ANALYSIS_TOOL_VERSION, changeset_revision=constants_phaeo.ADD_ANALYSIS_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ANALYSIS_SYNC_TOOL_ID, version=constants_phaeo.ANALYSIS_SYNC_TOOL_VERSION, changeset_revision=constants_phaeo.ANALYSIS_SYNC_TOOL_CHANGESET_REVISION, instance=self.instance) utilities_bioblend.install_repository_revision(tool_id=constants_phaeo.ORGANISM_SYNC_TOOL_ID, version=constants_phaeo.ORGANISM_SYNC_TOOL_VERSION, changeset_revision=constants_phaeo.ORGANISM_SYNC_TOOL_CHANGESET_REVISION, instance=self.instance) logging.info("Success: individual tools versions and changesets validated") def get_invocation_report(self, workflow_name): """ Debugging method for workflows Simply logs and returns a report of the previous workflow invocation (execution of a workflow in the instance via the API) :param workflow_name: :return: """ workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name) workflow_id = workflow_attributes[1]["id"] # Most recently imported workflow (index 1 in the list) invocations = self.instance.workflows.get_invocations(workflow_id=workflow_id) invocation_id = invocations[1]["id"] # Most recent invocation invocation_report = self.instance.invocations.get_invocation_report(invocation_id=invocation_id) logging.debug(invocation_report) return invocation_report