Skip to content
Snippets Groups Projects
autoload.py 25.2 KiB
Newer Older
import bioblend.galaxy.objects
import bioblend
import argparse
import os
import subprocess
import sys
import json
import yaml
import numpy
import pandas
import logging
import re


class Autoload:
    """
    Cleaner version for gga_auto_load (to use in production).

    This class possesses most useful parameters to interact with GGA as attributes (as defined in __init__), so new
    methods can be more easily implemented by copying already existing ones (i.e add new analysis, run a workflow, ...)

    To run the workflows, place them in the same directory as this script, and add the method + the workflow
    parameters in the main invocation (at the end of the file)
    TODO: store workflow as string in
    def __init__(self, species_parameters_dictionary: dict, args):
        self.species_parameters_dictionary = species_parameters_dictionary
        self.args = args
        self.species = species_parameters_dictionary["species"]
        self.genus = species_parameters_dictionary["genus"]
        self.strain = species_parameters_dictionary["strain"]
        self.sex = species_parameters_dictionary["sex"]
        self.common = species_parameters_dictionary["common"]
        self.date = species_parameters_dictionary["date"]
        self.performed = species_parameters_dictionary["performed by"]
        self.genome_version = species_parameters_dictionary["genome version"]
        self.ogs_version = species_parameters_dictionary["ogs version"]
        self.genus_lowercase = self.genus[0].lower() + self.genus[1:]
        self.full_name = " ".join([self.genus_lowercase, self.species, self.strain, self.sex])
        self.abbreviation = " ".join([self.genus_lowercase[0], self.species, self.strain, self.sex])
        self.genus_species = self.genus_lowercase + "_" + self.species
        self.instance_url = "http://localhost/sp/" + self.genus_lowercase + "_" + self.species + "/galaxy/"  # testing
        self.instance: galaxy = None
        self.history_id = None
        self.library_id = None
        self.main_dir = None
        self.species_dir = None
        self.org_id = None
        self.genome_analysis_id = None
        self.ogs_analysis_id = None
        self.tool_panel = None
        self.datasets = dict()
        self.source_files = dict()
        self.workflow_name = None

        # Test the connection to the galaxy instance for the current species
        # Additionally set some class attributes
        # TODO: auth issues with nginx
        self.instance = galaxy.GalaxyInstance(url=self.instance_url,
                                              key="3b36455cb16b4d0e4348e2c42f4bb934",
                                              email="alebars@sb-roscoff.fr",
                                              password="pouet",
                                              verify=True)
        logging.info("testing connection to the galaxy instance ...")
        try:
            self.instance.histories.get_histories()
            self.tool_panel = self.instance.tools.get_tool_panel()
        except bioblend.ConnectionError:
            logging.info("cannot connect to galaxy instance @ " + self.instance_url)
            sys.exit()
        else:
            logging.info("successfully connected to galaxy instance @ " + self.instance_url)

        self.main_dir = os.getcwd() + "/"
        self.species_dir = os.path.join(self.main_dir, self.genus_species) + "/"

    def load_data_in_galaxy(self):
        """
        - create the src_data directory tree for the species
        - change headers for pep file
        - load data into the galaxy container with the galaxy_data_libs_SI.py script

        :param method:
        :return:
        """
        os.chdir(self.main_dir)
        try:
            os.mkdir(self.species_dir)
        except FileExistsError:
            logging.debug("directory " + self.species_dir + " already exists")
        try:
            os.chdir(self.species_dir)
            working_dir = os.getcwd()
        except OSError:
            logging.info("cannot access " + self.species_dir + ", run with higher privileges")
            sys.exit()

        src_data_folders = ["annotation", "genome"]
        species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex])
        try:
            os.mkdir("./src_data")
            os.mkdir("./src_data/annotation")
            os.mkdir("./src_data/genome")
            os.mkdir("./src_data/annotation/" + species_folder_name)
            os.mkdir("./src_data/genome/" + species_folder_name)
        except FileExistsError:
            logging.debug("src_data directory tree already exists")
        except PermissionError:
            logging.debug("insufficient permission to create src_data directory tree")

        # Data import into galaxy
        self.source_files = dict()
        annotation_dir, genome_dir = None, None
        for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]:
            if "annotation/" in d:
                annotation_dir = d
                for f in os.listdir(d):
                    if f.endswith("proteins.fasta"):
                        self.source_files["proteins_file"] = os.path.join(d, f)
                        self.source_files["transcripts_file"] = os.path.join(d, f)
                        self.source_files["gff_file"] = os.path.join(d, f)
            elif "genome/" in d:
                genome_dir = d
                for f in os.listdir(d):
                    if f.endswith(".fa"):
                        self.source_files["genome_file"] = os.path.join(d, f)
        for k, v in self.source_files.items():
            logging.debug("\t" + k + "\t" + v)

        # Changing headers in the *proteins.fasta file from >mRNA* to >protein*
        # production version
        modify_pep_headers = ["/usr/local/genome2/mmo/scripts/phaeoexplorer/phaeoexplorer-change_pep_fasta_header.sh",
                              self.source_files["proteins_file"]]
        # test version
        modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_pep_fasta_header.sh",
                              self.source_files["proteins_file"]]
        logging.info("changing fasta headers in " + self.source_files["proteins_file"])
        subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir)

        # src_data cleaning
        if os.path.exists(annotation_dir + "outfile"):
            subprocess.run(["mv", annotation_dir + "/outfile", self.source_files["proteins_file"]],
                           stdout=subprocess.PIPE,
                           cwd=annotation_dir)
        if os.path.exists(annotation_dir + "gmon.out"):
            subprocess.run(["rm", annotation_dir + "/gmon.out"],
                           stdout=subprocess.PIPE,
                           cwd=annotation_dir)

        setup_data_libraries = "docker-compose exec galaxy /tool_deps/_conda/bin/python /opt/setup_data_libraries.py"
        try:
            logging.info("loading data into the galaxy container")
            subprocess.run(setup_data_libraries,
                           stdout=subprocess.PIPE,
                           shell=True)
        except subprocess.CalledProcessError:
            logging.info("cannot load data into container for " + self.full_name)
            pass
        else:
            logging.info("data successfully loaded into docker container for " + self.full_name)

        self.get_instance_attributes()
        # self.history_id = self.instance.histories.get_current_history()["id"]

        # import all datasets into current history
        self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"])
        self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"])
        self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"])
        self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"])

    def get_instance_attributes(self):
        """
        retrieves a pre-existing instance attributes:
        - working history ID
        - libraries ID
        -

        :return:
        """
        histories = self.instance.histories.get_histories(name=str(self.full_name))
        logging.debug("history ID: " + self.history_id)
        libraries = self.instance.libraries.get_libraries()  # normally only one library
        self.library_id = self.instance.libraries.get_libraries()[0]["id"]  # project data folder/library
        logging.debug("library ID: " + self.history_id)
        instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id)

        folders_ids = {}
        current_folder_name = ""
        for i in instance_source_data_folders:
            for k, v in i.items():
                if k == "name":
                    folders_ids[v] = 0
                    current_folder_name = v
                    folders_ids[current_folder_name] = v
        logging.debug("folders and datasets IDs: ")
        self.datasets = dict()
        for k, v in folders_ids.items():
            logging.info("\t" + k + ": " + v)
            if k == "/genome":
                sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
                for k2, v2 in sub_folder_content.items():
                    for e in v2:
                        if type(e) == dict:
                            if e["name"].endswith(".fa"):
                                self.datasets["genome_file"] = e["ldda_id"]
                                logging.debug("\t\t" + e["name"] + ": " + e["ldda_id"])
            elif k == "/annotation/" + self.genus_species:
                sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True)
                for k2, v2 in sub_folder_content.items():
                    for e in v2:
                        if type(e) == dict:
                            # TODO: manage several files of the same type and manage versions
                            if e["name"].endswith("transcripts-gff.fa"):
                                self.datasets["transcripts_file"] = e["ldda_id"]
                                logging.debug("\t\t" + e["name"] + ": " + e["ldda_id"])
                            elif e["name"].endswith("proteins.fasta"):
                                self.datasets["proteins_file"] = e["ldda_id"]
                                logging.debug("\t\t" + e["name"] + ": " + e["ldda_id"])
                                self.datasets["gff_file"] = e["ldda_id"]
                                logging.debug("\t\t" + e["name"] + ": " + e["ldda_id"])
                                self.datasets["gff_file"] = e["ldda_id"]
                                logging.debug("\t\t" + e["name"] + ": " + e["ldda_id"])
    def run_workflow(self, workflow_name, workflow_parameters, datamap):
        Run the "main" workflow in the galaxy instance
        - import data to library
        - load fasta and gff
        - sync with tripal
        - add jbrowse + organism
        - fill in the tripal views

        TODO: map tool name to step id
        :param workflow_name:
        :param datamap:
        :return:
        """

        logging.debug("running workflow: " + str(workflow_name))
        workflow_ga_file = self.main_dir + "Galaxy-Workflow-" + workflow_name + ".ga"
        if self.strain != "":
            custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga"
            custom_ga_file_path = os.path.abspath(custom_ga_file)
        else:
            custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga"
            custom_ga_file_path = os.path.abspath(custom_ga_file)
        with open(workflow_ga_file, 'r') as ga_in_file:
            workflow = str(ga_in_file.readlines())
            workflow = workflow.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}',
                                        str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"')
            workflow = workflow.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"',
                                        str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"')
            workflow = workflow.replace("\\\\", "\\")  # to restore the correct amount of backslashes in the workflow string before import
            workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}',
                                        "http://localhost/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}")
            # workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}',
            #                             "http://abims--gga.sb-roscoff.fr/sp/" + self.genus_lowercase + "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}")
            workflow = workflow[2:-2]  # if the line under doesn't output a correct json
            # workflow = workflow[:-2]  # if the line above doesn't output a correct json

            workflow_dict = json.loads(workflow)  # doesn't work with eval()

            self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict)
            self.workflow_name = workflow_name
            workflow_attributes = self.instance.workflows.get_workflows(name=workflow_name)
            workflow_id = workflow_attributes[0]["id"]
            show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id)
            logging.debug("workflow ID: " + workflow_id)

            logging.debug("inputs:")
            logging.debug(show_workflow["inputs"])
            self.instance.workflows.invoke_workflow(workflow_id=workflow_id,
                                                    history_id=self.history_id,
                                                    params=workflow_parameters,
                                                    inputs=datamap,
                                                    inputs_by="")
            self.instance.workflows.delete_workflow(workflow_id=workflow_id)

    def init_instance(self):
        """
        Galaxy instance startup in preparation for running workflows
        - remove Homo sapiens from the chado database.
        - add organism and analyses into the chado database
        - get any other existing organisms IDs (mainly used for testing)

        :return:
        """

        self.instance.histories.create_history(name=str(self.full_name))
        histories = self.instance.histories.get_histories(name=str(self.full_name))
        self.history_id = histories[0]["id"]
        logging.debug("history ID: " + self.history_id)
        libraries = self.instance.libraries.get_libraries()  # normally only one library
        self.library_id = self.instance.libraries.get_libraries()[0]["id"]  # project data folder/library
        logging.debug("library ID: " + self.history_id)
        instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id)
        # Delete Homo sapiens from Chado database
        logging.info("getting sapiens ID in instance's chado database")
        get_sapiens_id_job = self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
                                                          history_id=self.history_id,
                                                          tool_inputs={"genus": "Homo", "species": "sapiens"})
        get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"]
        get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output)
        try:
            logging.info("deleting Homo sapiens in the instance's chado database")
            get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0]
            sapiens_id = str(get_sapiens_id_final_output["organism_id"])  # needs to be str to be recognized by the chado tool
            self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2",
                                         history_id=self.history_id,
                                         tool_inputs={"organism": str(sapiens_id)})
        except bioblend.ConnectionError:
            logging.debug("Homo sapiens isn't in the instance's chado database")
            logging.debug("Homo sapiens isn't in the instance's chado database")
        logging.info("adding organism to the instance's chado database")
        self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2",
                                     history_id=self.history_id,
                                     tool_inputs={"abbr": self.abbreviation,
                                                  "genus": self.genus,
                                                  "species": self.species,
                                                  "common": self.common})
        # Add OGS analysis to chado
        logging.info("adding OGS analysis to the instance's chado database")
        self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2",
                                     history_id=self.history_id,
                                     tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version,
                                                  "program": "Performed by Genoscope",
                                                  "programversion": str("OGS" + self.ogs_version),
                                                  "sourcename": "Genoscope",
                                                  "date_executed": self.date})

        # Add genome analysis to chado
        logging.info("adding genome analysis to the instance's chado database")
        self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2",
                                     history_id=self.history_id,
                                     tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version,
                                                  "program": "Performed by Genoscope",
                                                  "programversion": str("genome v" + self.genome_version),
                                                  "sourcename": "Genoscope",
                                                  "date_executed": self.date})

        self.get_organism_and_analyses_ids()
        logging.info("finished initializing instance")

    def get_organism_and_analyses_ids(self):
        """
        Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado
        doesn't accept organism/analyses names as valid inputs

        :return:
        """
        # Get the ID for the current organism in chado
        org = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
            history_id=self.history_id,
            tool_inputs={"genus": self.genus, "species": self.species})
        org_job_out = org["outputs"][0]["id"]
        org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out)
        try:
            org_output = json.loads(org_json_output)[0]
            self.org_id = str(org_output["organism_id"])  # needs to be str to be recognized by chado tools
        except IndexError:
            logging.debug("no organism matching " + self.full_name + " exists in the instance's chado database")
        # Get the ID for the OGS analysis in chado
        ogs_analysis = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2",
            history_id=self.history_id,
            tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version})
        ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"]
        ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out)
        try:
            ogs_analysis_output = json.loads(ogs_analysis_json_output)[0]
            self.ogs_analysis_id = str(
                ogs_analysis_output["analysis_id"])  # needs to be str to be recognized by chado tools
            logging.debug("no matching OGS analysis exists in the instance's chado database")
        # Get the ID for the genome analysis in chado
        genome_analysis = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2",
            history_id=self.history_id,
            tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version})
        genome_analysis_job_out = genome_analysis["outputs"][0]["id"]
        genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out)
        try:
            genome_analysis_output = json.loads(genome_analysis_json_output)[0]
            self.genome_analysis_id = str(
                genome_analysis_output["analysis_id"])  # needs to be str to be recognized by chado tools
            logging.debug("no matching genome analysis exists in the instance's chado database")

    def clean_instance(self):
        """
        TODO: function to purge the instance from analyses and organisms
        :return:
        """
        return None


parser = argparse.ArgumentParser(description="Input genus, species, strain, version")
parser.add_argument("json", type=str, help="Input JSON file")
parser.add_argument("-v", "--verbose",
                    help="Increase output verbosity",
                    action="store_true")
parser.add_argument("--init-instance",
                    help="Initialization of galaxy instance. Run first in an empty instance",
                    action="store_true")
parser.add_argument("--load-data",
                    help="Create src_data directory tree and load its data into the instance",
                    action="store_true")
parser.add_argument("--run-main",
                    help="Run main workflow (load data into chado, sync all with tripal, "
                         "index tripal data, populate materialized view, "
                         "create a jbrowse for the current genus_species_strain_sex and add organism to jbrowse",
                    action="store_true")
args = parser.parse_args()

if args.verbose:
    logging.basicConfig(level=logging.DEBUG)
else:
    logging.basicConfig(level=logging.INFO)

sp_dict_list = list()
with open(args.json, 'r') as infile:
    json_sp_dict = json.load(infile)
    json_sp_dump = json.dumps(json_sp_dict, indent=4, sort_keys=True)
    for json_sp in json_sp_dict:
        sp_dict_list.append(json_sp)

for sp_dict in sp_dict_list:
    al = Autoload(species_parameters_dictionary=sp_dict, args=args)
    if args.init_instance:
        logging.info("initializing the galaxy instance")
        al.init_instance()
        al.get_instance_attributes()
    if args.load_data:
        logging.info("loading data into galaxy")
        al.load_data_in_galaxy()
    if args.run_main:
        logging.info("running main workflow")
        al.get_organism_and_analyses_ids()
        workflow_parameters = dict()
        workflow_parameters["0"] = {}
        workflow_parameters["1"] = {}
        workflow_parameters["2"] = {}
        workflow_parameters["3"] = {}
        workflow_parameters["4"] = {"organism": al.org_id,
                                    "analysis_id": al.genome_analysis_id,
                                    "do_update": "true"}
        workflow_parameters["5"] = {"organism": al.org_id,
                                    "analysis_id": al.ogs_analysis_id}
        workflow_parameters["6"] = {"organism_id": al.org_id}
        workflow_parameters["7"] = {"analysis_id": al.ogs_analysis_id}
        workflow_parameters["8"] = {"analysis_id": al.genome_analysis_id}
        workflow_parameters["9"] = {"organism_id": al.org_id}
        workflow_parameters["10"] = {}
        workflow_parameters["11"] = {}

        al.datamap = dict()
        al.datamap["0"] = {"src": "hda", "id": al.datasets["genome_file"]}
        al.datamap["1"] = {"src": "hda", "id": al.datasets["gff_file"]}
        al.datamap["2"] = {"src": "hda", "id": al.datasets["proteins_file"]}
        al.datamap["3"] = {"src": "hda", "id": al.datasets["transcripts_file"]}

        al.run_workflow(workflow_name="main", workflow_parameters=workflow_parameters, datamap=al.datamap)