Skip to content
Snippets Groups Projects
gga_load_data.py 18.89 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import bioblend
import argparse
import os
import subprocess
import logging
import sys
import fnmatch
import time
import json
import re
import stat
import shutil

from bioblend.galaxy.objects import GalaxyInstance
from bioblend import galaxy

import utilities
import speciesData

""" 
gga_load_data.py

Usage: $ python3 gga_init.py -i input_example.yml --config config.yml [OPTIONS]

Do not call this script before the galaxy container is ready
"""


class LoadData(speciesData.SpeciesData):
    """
    Child of SpeciesData

    Contains methods and attributes to copy data into the galaxy instance's library of this given organism

    Optional data file formatting

    """

    def goto_species_dir(self):
        """
        Go to the species directory (starting from the main dir)

        :return:
        """

        os.chdir(self.main_dir)
        species_dir = os.path.join(self.main_dir, self.genus_species) + "/"
        try:
            os.chdir(species_dir)
        except OSError:
            logging.critical("Cannot access %s" % species_dir)
            sys.exit(0)
        return 1

    def set_get_history(self):
        """
        Create or set the working history to the current species one

        :return:
        """
        try:
            histories = self.instance.histories.get_histories(name=str(self.genus_species))
            self.history_id = histories[0]["id"]
            logging.debug("History ID set for {0}: {1}".format(self.full_name, self.history_id))
        except IndexError:
            logging.info("Creating history for %s" % self.full_name)
            self.instance.histories.create_history(name=str(self.genus_species))
            histories = self.instance.histories.get_histories(name=str(self.genus_species))
            self.history_id = histories[0]["id"]
            logging.debug("History ID set for {0}: {1}".format(self.full_name, self.history_id))

        return self.history_id

    def remove_homo_sapiens_from_db(self):
        """
        Run the GMOD tool to remove the "Homo sapiens" default organism from the original database
        Will do nothing if H. sapiens isn't in the database

        """

        get_organism_tool = self.instance.tools.show_tool("toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.3")
        delete_organism_tool = self.instance.tools.show_tool("toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.4+galaxy0")

        if delete_organism_tool["version"] != "2.3.4+galaxy0":
            toolshed_dict = delete_organism_tool["tool_shed_repository"]
            logging.warning("Changeset for %s is not installed" % toolshed_dict["name"])
            changeset_revision = "b1aa4f9d82fe"
            name = toolshed_dict["name"]
            owner = toolshed_dict["owner"]
            toolshed = "https://" + toolshed_dict["tool_shed"]
            logging.warning("Installing changeset for %s" % toolshed_dict["name"])

            self.instance.toolshed.install_repository_revision(tool_shed_url=toolshed, name=name, owner=owner, 
                                                               changeset_revision=changeset_revision,
                                                               install_tool_dependencies=True,
                                                               install_repository_dependencies=False,
                                                               install_resolver_dependencies=True)


        if get_organism_tool["version"] != "2.3.3":
            toolshed_dict = get_organism_tool["tool_shed_repository"]
            logging.warning("Changeset for %s is not installed" % toolshed_dict["name"])
            changeset_revision = "b07279b5f3bf"
            name = toolshed_dict["name"]
            owner = toolshed_dict["owner"]
            toolshed = "https://" + toolshed_dict["tool_shed"]
            logging.warning("Installing changeset for %s" % toolshed_dict["name"])

            self.instance.toolshed.install_repository_revision(tool_shed_url=toolshed, name=name, owner=owner, 
                                                               changeset_revision=changeset_revision,
                                                               install_tool_dependencies=True,
                                                               install_repository_dependencies=False,
                                                               install_resolver_dependencies=True)

        logging.debug("Getting 'Homo sapiens' ID in instance's chado database")
        get_sapiens_id_job = self.instance.tools.run_tool(
            tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.4+galaxy0",
            history_id=self.history_id,
            tool_inputs={"genus": "Homo", "species": "sapiens"})
        get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"]
        get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output)
        try:
            logging.debug("Deleting Homo 'sapiens' in the instance's chado database")
            get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0]
            sapiens_id = str(
                get_sapiens_id_final_output["organism_id"])  # needs to be str to be recognized by the chado tool
            self.instance.tools.run_tool(
                tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.4+galaxy0",
                history_id=self.history_id,
                tool_inputs={"organism": str(sapiens_id)})
        except bioblend.ConnectionError:
            logging.debug("Homo sapiens isn't in the instance's chado database (bioblend.ConnectionError)")
        except IndexError:
            logging.debug("Homo sapiens isn't in the instance's chado database (IndexError)")
            pass

    def purge_histories(self):
        """
        Delete all histories in the instance

        For testing purposes

        :return:
        """

        histories = self.instance.histories.get_histories()
        self.instance.histories.get_histories(deleted=False)
        for h in histories:
            self.instance.histories.delete_history(history_id=h["id"])

        return histories

    def setup_library(self):
        """
        Create a "Project Data" library in galaxy, mirroring the "src_data" folder of the current organism
        directory tree

        :return:
        """

        self.goto_species_dir()

        # Delete pre-existing lib (probably created by a previous call)
        gio = GalaxyInstance(url=self.instance_url,
                             email=self.config["galaxy_default_admin_email"],
                             password=self.config["galaxy_default_admin_password"])

        folders = dict()
        post_renaming = {}

        for root, dirs, files in os.walk("./src_data", followlinks=True):
            file_list = [os.path.join(root, filename) for filename in files]
            folders[root] = file_list

        if folders:
            # Delete pre-existing lib (probably created by a previous call)
            existing = gio.libraries.get_previews(name='Project Data')
            for lib in existing:
                if not lib.deleted:
                    logging.info('Pre-existing "Project Data" library %s found, removing it' % lib.id)
                    gio.libraries.delete(lib.id)

            logging.info("Creating new 'Project Data' library")
            prj_lib = gio.libraries.create('Project Data', 'Data for current genome annotation project')
            self.library_id = prj_lib.id  # project data folder/library
            logging.info("Library for {0}: {1}".format(self.full_name, self.library_id))

            for fname, files in folders.items():
                if fname and files:
                    folder_name = fname[len("./src_data") + 1:]
                    logging.info("Creating folder: %s" % folder_name)
                    folder = self.create_deep_folder(prj_lib, folder_name)
                    for single_file in files:
                        ftype = 'auto'

                        clean_name = os.path.basename(single_file)
                        clean_name = clean_name.replace('_', ' ')  # Not a good idea for files with a complex name (solution --> rename file or remove the replace)

                        if single_file.endswith('.bam'):
                            ftype = 'bam'
                            bam_label = self.get_bam_label(fname, os.path.basename(single_file))
                            if bam_label:
                                clean_name = bam_label
                            else:
                                clean_name = os.path.splitext(clean_name)[0]
                            if clean_name.endswith("Aligned.sortedByCoord.out"):  # Stupid thing for many local bam files
                                clean_name = clean_name[:-25]
                        elif single_file.endswith('.fasta') or single_file.endswith('.fa') or single_file.endswith(
                                '.faa') or single_file.endswith('.fna'):
                            ftype = 'fasta'
                        elif single_file.endswith('.gff') or single_file.endswith('.gff3'):
                            ftype = 'gff3'
                            clean_name = os.path.splitext(clean_name)[0]
                        elif single_file.endswith('.xml'):
                            ftype = 'xml'
                        elif single_file.endswith('.bw'):
                            ftype = 'bigwig'
                        elif single_file.endswith('.gaf'):
                            ftype = 'tabular'
                        elif single_file.endswith('_tree.txt'):
                            # We don't want to pollute the logs with 20000 useless lines
                            logging.debug("Skipping useless file '%s'" % single_file)
                            continue
                        elif single_file.endswith('.tar.gz') and 'newick' in fname:
                            ftype = 'tar'
                        elif single_file.endswith('.bai') or single_file.endswith('.tar.gz') or single_file.endswith(
                                '.tar.bz2') or single_file.endswith('.raw') or single_file.endswith('.pdf'):
                            logging.info("Skipping useless file '%s'" % single_file)
                            continue

                        logging.info("Adding file '%s' with type '%s' and name '%s'" % (single_file, ftype, clean_name))
                        datasets = prj_lib.upload_from_local(
                            path=single_file,
                            folder=folder,
                            file_type=ftype
                        )

                        # Rename dataset
                        # Need to do it AFTER the datasets import is finished, otherwise the new names are not kept by galaxy
                        # (erased by metadata generation I guess)

                        # Doesn't work for some reason (LibraryDataset not subscriptable, __getitem__() not implemented)
                        # post_renaming[datasets[0]] = clean_name

                    time.sleep(1)

        # Wait for uploads to complete
        logging.info("Waiting for import jobs to finish... please wait")

        # Checking job state (only necessary if ran using SLURM)
        # while True:
        #     try:
        #         # "C" state means the job is completed, no need to wait for it
        #         ret = subprocess.check_output("squeue | grep -v \"C debug\" | grep -v \"JOBID\" || true",
        #                                       shell=True)
        #         if not len(ret):
        #             break
        #         time.sleep(3)
        #     except subprocess.CalledProcessError as inst:
        #         if inst.returncode == 153:  # queue is empty
        #             break
        #         else:
        #             raise

        time.sleep(10)

        # Batch renaming --> Throws a critical error at the moment
        # logging.info("Import finished, now renaming datasets with pretty names")
        # for dataset in post_renaming:
        #     dataset.update(name=post_renaming[dataset])

        logging.info("Finished importing data")

    def create_deep_folder(self, prj_lib, path, parent_folder=None, deep_name=""):
        """
        Create a folder inside a folder in a galaxy library
        Recursive

        :param prj_lib:
        :param path:
        :param parent_folder:
        :param deep_name:
        :return:
        """

        segments = path.split(os.sep)

        deeper_name = os.sep.join([deep_name, segments[0]])
        if deeper_name in self.existing_folders_cache:
            new_folder = self.existing_folders_cache[deeper_name]
        else:
            new_folder = prj_lib.create_folder(segments[0], base_folder=parent_folder)
            self.existing_folders_cache[deeper_name] = new_folder

        if len(segments) > 1:
            new_folder = self.create_deep_folder(prj_lib, os.sep.join(segments[1:]), new_folder, deeper_name)

        return new_folder

    def connect_to_instance(self):
        """
        Test the connection to the galaxy instance for the current organism
        Exit if we cannot connect to the instance

        """

        logging.info("Connecting to the galaxy instance (%s)" % self.instance_url)
        self.instance = galaxy.GalaxyInstance(url=self.instance_url,
                                              email=self.config["galaxy_default_admin_email"],
                                              password=self.config["galaxy_default_admin_password"]
                                              )
        self.instance.histories.get_histories()

        try:
            self.instance.histories.get_histories()
        except bioblend.ConnectionError:
            logging.critical("Cannot connect to galaxy instance (%s) " % self.instance_url)
            sys.exit()
        else:
            logging.info("Successfully connected to galaxy instance (%s) " % self.instance_url)




def get_species_to_load(sp_dict_list):
    """
    """

    

    utilities.get_unique_species_list(sp_dict_list)


    return 1

    
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
                                                 "with galaxy instances for GGA"
                                                 ", following the protocol @ "
                                                 "http://gitlab.sb-roscoff.fr/abims/e-infra/gga")

    parser.add_argument("input",
                        type=str,
                        help="Input file (yml)")

    parser.add_argument("-v", "--verbose",
                        help="Increase output verbosity",
                        action="store_false")

    parser.add_argument("--config",
                        type=str,
                        help="Config path, default to the 'config' file inside the script repository")

    parser.add_argument("--main-directory",
                        type=str,
                        help="Where the stack containers will be located, defaults to working directory")

    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.getLogger("urllib3").setLevel(logging.WARNING)

    # Parsing the config file if provided, using the default config otherwise
    if not args.config:
        args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config")
    else:
        args.config = os.path.abspath(args.config)

    if not args.main_directory:
        args.main_directory = os.getcwd()
    else:
        args.main_directory = os.path.abspath(args.main_directory)

    sp_dict_list = utilities.parse_input(args.input)

    unique_sp_dict_list = utilities.get_unique_species_dict_list(sp_dict_list=sp_dict_list)


    for sp_dict in unique_sp_dict_list:

        # Creating an instance of load_data_for_current_species object
        load_data_for_current_species = LoadData(parameters_dictionary=sp_dict)

        # Starting
        logging.info("gga_load_data.py called for %s" % load_data_for_current_species.full_name)

        # Setting some of the instance attributes
        load_data_for_current_species.main_dir = args.main_directory
        load_data_for_current_species.species_dir = os.path.join(load_data_for_current_species.main_dir,
                                                                 load_data_for_current_species.genus_species +
                                                                 "/")

        # Parse the config yaml file
        load_data_for_current_species.config = utilities.parse_config(args.config)
        # Set the instance url attribute -- Does not work with localhost on scratch (ALB)
        load_data_for_current_species.instance_url = "http://localhost:{0}/sp/{1}_{2}/galaxy/".format(
                load_data_for_current_species.config["http_port"],
                load_data_for_current_species.genus_lowercase,
                load_data_for_current_species.species)

        

        # Check the galaxy container state and proceed if the galaxy services are up and running
        if utilities.check_galaxy_state(genus_lowercase=load_data_for_current_species.genus_lowercase,
                                        species=load_data_for_current_species.species,
                                        script_dir=load_data_for_current_species.script_dir):

            # Load config file
            load_data_for_current_species.config = utilities.parse_config(args.config)

            # Testing connection to the instance
            load_data_for_current_species.connect_to_instance()

            # Load the datasets into a galaxy library
            logging.info("Setting up library for %s" % load_data_for_current_species.full_name)
            load_data_for_current_species.setup_library()
            logging.info("Successfully set up library in galaxy for %s" % load_data_for_current_species.full_name)

            # Set or get the history for the current organism
            load_data_for_current_species.set_get_history()
            
            # Remove H. sapiens from database if here
            # TODO: set a dedicated history for removing H. sapiens (instead of doing it into a species history)
            load_data_for_current_species.remove_homo_sapiens_from_db()

            # logging.info("Importing datasets into history for %s" % load_data_for_current_species.full_name)
            # load_data_for_current_species.import_datasets_into_history()  # Option "--load-history"

            # load_data_for_current_species.purge_histories()  # Testing purposes

            logging.info("Data successfully loaded and imported for %s" % load_data_for_current_species.full_name)

        else:
            logging.critical("The galaxy container for %s is not ready yet!" % load_data_for_current_species.full_name)
            sys.exit()