Skip to content
Snippets Groups Projects
load_data.py 11.3 KiB
Newer Older
#!/usr/bin/python
# -*- coding: utf-8 -*-


import bioblend
import bioblend.galaxy.objects
from bioblend import galaxy
import logging
import sys
import yaml
import re
from datetime import datetime
Usage: $ python3 deploy_stacks.py -i example.yml [OPTIONS]
"""

def parse_input(input_file):
    """
    Parse the yml input file to extract data to create the SpeciesData objects
    Return a list of dictionaries. Each dictionary contains data tied to a species
    parsed_sp_dict_list = []

    if str(input_file).endswith("yml") or str(input_file).endswith("yaml"):
        logging.debug("Input format used: YAML")
    else:
        logging.critical("Error, please input a YAML file")
        sys.exit()
    with open(input_file, 'r') as stream:
        try:
            yaml_dict = yaml.safe_load(stream)
            for k, v in yaml_dict.items():
                if k == "config":
                    pass
                parsed_sp_dict_list.append(v)
        except yaml.YAMLError as exit_code:
            logging.critical(exit_code + " (YAML input file might be incorrect)")
            sys.exit()
    return parsed_sp_dict_list


class LoadData:
    """
    Load data from the src_data subfolders into the galaxy instance's history of a given species

    """

    def __init__(self, parameters_dictionary):
        self.parameters_dictionary = parameters_dictionary
        self.species = parameters_dictionary["description"]["species"]
        self.genus = parameters_dictionary["description"]["genus"]
        self.strain = parameters_dictionary["description"]["strain"]
        self.sex = parameters_dictionary["description"]["sex"]
        self.common = parameters_dictionary["description"]["common_name"]
        self.date = datetime.today().strftime("%Y-%m-%d")
        self.origin = parameters_dictionary["description"]["origin"]
        self.performed = parameters_dictionary["data"]["performed_by"]
        if parameters_dictionary["data"]["genome_version"] == "":
            self.genome_version = "1.0"
        else:
            self.genome_version = parameters_dictionary["data"]["genome_version"]
        if parameters_dictionary["data"]["ogs_version"] == "":
            self.ogs_version = "1.0"
        else:
            self.ogs_version = parameters_dictionary["data"]["ogs_version"]
        self.genus_lowercase = self.genus[0].lower() + self.genus[1:]
        self.genus_uppercase = self.genus[0].upper() + self.genus[1:]
        self.species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex])
        self.full_name = " ".join([self.genus_uppercase, self.species, self.strain, self.sex])
        self.abbreviation = " ".join([self.genus_lowercase[0], self.species, self.strain, self.sex])
        self.genus_species = self.genus_lowercase + "_" + self.species
        self.instance_url = "http://scratchgmodv1:8888/sp/" + self.genus_lowercase + "_" + self.species + "/galaxy/"
        # Testing with localhost/scratchgmodv1
        self.instance = None
        self.history_id = None
        self.library_id = None
        self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
        self.main_dir = None
        self.species_dir = None
        self.org_id = None
        self.genome_analysis_id = None
        self.ogs_analysis_id = None
        self.tool_panel = None
        self.datasets = dict()
        self.source_files = dict()
        self.workflow_name = None
        self.metadata = dict()
        self.api_key = "master"  # TODO: set the key in config file --> saved for later (master api key access actions are limited)
        if parameters_dictionary["data"]["parent_directory"] == "" or parameters_dictionary["data"]["parent_directory"] == "/path/to/closest/parent/dir":
            self.source_data_dir = "/projet/sbr/phaeoexplorer/"  # Testing path for phaeoexplorer data
        else:
            self.source_data_dir = parameters_dictionary["data"]["parent_directory"]
        # Directory/subdirectories where data files are located (fasta, gff, ...)
        self.do_update = False
        # Update the instance (in histories corresponding to the input) instead of creating a new one
        self.api_key = "master"
        # API key used to communicate with the galaxy instance. Cannot be used to do user-tied actions
        self.species_name_regex_litteral = "(?=\w*V)(?=\w*A)(?=\w*R)(?=\w*I)(?=\w*A)(?=\w*B)(?=\w*L)(?=\w*E)\w+"  # Placeholder re


    def modify_fasta_headers(self):
        """
        Change the fasta headers before integration.

        :return:
        """

        try:
            os.chdir(self.species_dir)
            working_dir = os.getcwd()
        except OSError:
            logging.info("Cannot access " + self.species_dir + ", run with higher privileges")
            logging.info("Fatal error: exit")
            sys.exit()
        self.source_files = dict()
        annotation_dir, genome_dir = None, None
        for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]:
            if "annotation/" in d:
                annotation_dir = d
                for f in os.listdir(d):
                    if f.endswith("proteins.fasta"):
                        self.source_files["proteins_file"] = os.path.join(d, f)
                    elif f.endswith("transcripts-gff.fa"):
                        self.source_files["transcripts_file"] = os.path.join(d, f)
                    elif f.endswith(".gff"):
                        self.source_files["gff_file"] = os.path.join(d, f)
            elif "genome/" in d:
                genome_dir = d
                for f in os.listdir(d):
                    if f.endswith(".fa"):
                        self.source_files["genome_file"] = os.path.join(d, f)
                logging.debug("source files found:")
        for k, v in self.source_files.items():
            logging.debug("\t" + k + "\t" + v)

        # Changing headers in the *proteins.fasta file from >mRNA* to >protein*
        # production version
        modify_pep_headers = [str(self.main_dir) + "/gga_load_data/utils/phaeoexplorer-change_pep_fasta_header.sh",
                              self.source_files["proteins_file"]]
        # test version
        # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_pep_fasta_header.sh",
                              # self.source_files["proteins_file"]]
        logging.info("Changing fasta headers: " + self.source_files["proteins_file"])
        subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir)
        # production version
        modify_pep_headers = [str(self.main_dir) + "/gga_load_data/utils/phaeoexplorer-change_transcript_fasta_header.sh",
                              self.source_files["proteins_file"]]
        # test version
        # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_transcript_fasta_header.sh",
        #                       self.source_files["proteins_file"]]
        logging.info("Changing fasta headers: " + self.source_files["transcripts_file"])
        subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir)

        # src_data cleaning
        if os.path.exists(annotation_dir + "outfile"):
            subprocess.run(["mv", annotation_dir + "/outfile", self.source_files["proteins_file"]],
                           stdout=subprocess.PIPE,
                           cwd=annotation_dir)
        if os.path.exists(annotation_dir + "gmon.out"):
            subprocess.run(["rm", annotation_dir + "/gmon.out"],
                           stdout=subprocess.PIPE,
                           cwd=annotation_dir)


    def setup_data_libraries(self):
        """
        - generate blast banks and docker-compose
        - load data into the galaxy container with the galaxy_data_libs_SI.py script

        :return:
        """

        try:
            logging.info("Loading data into the galaxy container")
            subprocess.run("../serexec genus_species_galaxy /tool_deps/_conda/bin/python /opt/galaxy_data_libs_SI.py", shell=True)
        except subprocess.CalledProcessError:
            logging.info("Cannot load data into the galaxy container for " + self.full_name)
            pass
        else:
            logging.info("Data successfully loaded into the galaxy container for " + self.full_name)

        self.get_species_history_id()
        # self.get_instance_attributes()
        #
        # # import all datasets into current history
        # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"])
        # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"])
        # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"])
        # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"])



    def generate_blast_banks(self):
        """
        Automatically generate blast banks for a species and commit

        :return:
        """


    def connect_to_instance(self):
        """
        Test the connection to the galaxy instance for the current organism
        Exit if it cannot connect to the instance
        """
        self.instance = galaxy.GalaxyInstance(url=self.instance_url, email="gga@sb-roscoff.fr", password="password",
                                              verify=False)
        logging.info("Connecting to the galaxy instance ...")
        try:
            self.instance.histories.get_histories()
            self.tool_panel = self.instance.tools.get_tool_panel()
        except bioblend.ConnectionError:
            logging.critical("Cannot connect to galaxy instance @ " + self.instance_url)
            sys.exit()
        else:
            logging.info("Successfully connected to galaxy instance @ " + self.instance_url)
        self.instance.histories.create_history(name="FOO")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
                                                 "with galaxy instances for GGA"
                                                 ", following the protocol @ "
                                                 "http://gitlab.sb-roscoff.fr/abims/e-infra/gga")

    parser.add_argument("input",
                        type=str,
                        help="Input file (yml)")

    parser.add_argument("-v", "--verbose",
                        help="Increase output verbosity",
                        action="store_false")

    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)
    logging.info("Load data: start")
    sp_dict_list = parse_input(args.input)
    for sp_dict in sp_dict_list:
        o = LoadData(parameters_dictionary=sp_dict)
        o.main_dir = os.path.abspath(args.dir)
        o.modify_fasta_headers()
        logging.info("Successfully formatted files headers " + o.genus[0].upper() + ". " + o.species + " " + o.strain + " " + o.sex)
        # o.setup_data_libraries()
        # logging.info("Successfully set up data libraries in galaxy for " + o.genus[0].upper() + ". " + o.species + " " + o.strain + " " + o.sex)