#!/usr/bin/env python3 # -*- coding: utf-8 -*- import bioblend import argparse import os import subprocess import logging import sys import fnmatch import time import json import re import stat import shutil from bioblend.galaxy.objects import GalaxyInstance from bioblend import galaxy import utilities import speciesData """ gga_get_data.py Usage: $ python3 gga_get_data.py -i input_example.yml --config config.yml [OPTIONS] """ class GetData(speciesData.SpeciesData): """ Child of SpeciesData Contains methods and attributes to copy data into the src_data subfolders of an organism """ def goto_species_dir(self): """ Go to the species directory (starting from the main dir) :return: """ os.chdir(self.main_dir) species_dir = os.path.join(self.main_dir, self.genus_species) + "/" try: os.chdir(species_dir) except OSError: logging.critical("Cannot access %s" % species_dir) sys.exit(0) return 1 def batch_modify_fasta_headers(self): """ Change the fasta headers before integration, so that the indexing tool in galaxy interprets the headers correctly and doesn't throw an error The function will use the class attribute "source_datasets", pointing to files in the galaxy library to find the fasta files that need their headers formatted :return: """ proteins_file = None proteins_outfile = None annotation_dir = None organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.ogs_version)) self.goto_species_dir() for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]: if "annotation" in d and self.species_folder_name in d and self.ogs_version in d: for f in os.listdir(d): if "proteins" in f: proteins_file = os.path.join(d, f) proteins_outfile = os.path.join(d, "outfile_proteins.fa") annotation_dir = os.path.abspath(d) # Formatting the headers if proteins_file is not None: self.format_fasta_headers(infile=proteins_file, outfile=proteins_outfile, pattern="^>mRNA", repl=">protein") if os.path.exists(annotation_dir + "/outfile_proteins.fa"): subprocess.call(["mv", annotation_dir + "/outfile_proteins.fa", proteins_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=annotation_dir) subprocess.call(["rm", annotation_dir + "/outfile_proteins.fa"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=annotation_dir) else: logging.warning("Skipping proteins fasta headers formatting (FileNotFound)") @staticmethod def format_fasta_headers(infile, outfile, pattern, repl): """ Format the fasta headers of a given file, given a matching pattern and a replacement string :param infile: :param outfile: :param pattern: :param repl: :return: """ infile = open(infile, 'r') outfile = open(outfile, 'w') lines = infile.readlines() for line in lines: line_out = re.sub(pattern, repl, line) outfile.write(line_out) infile.close() outfile.close() def get_source_data_files_from_path(self): """ Find source data files in the parent_directory Link data files TODO: manage access to the "parent directory" subdirectories properly TODO: implement search/tests for individual file paths :return: """ try: os.chdir(self.species_dir) except OSError: logging.critical("Cannot access " + self.species_dir) sys.exit(0) organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.genome_version)) organism_genome_dir = os.path.abspath("./src_data/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version)) datasets_to_get = {"genome_path": self.genome_path, "gff_path": self.gff_path, "transcripts_path": self.transcripts_path, "proteins_path": self.proteins_path, "interpro_path": self.interpro_path, "orthofinder_path": self.orthofinder_path, "blastp_path": self.blastp_path, "blastx_path": self.blastx_path} genome_datasets = ["genome_path"] annotation_datasets = ["gff_path", "transcripts_path", "proteins_path", "orthofinder_path", "interpro_path", "blastp_path", "blastx_path"] # Where to store blast results? search_excluded_datasets = ["interpro_path", "orthofinder_path", "blastp_path", "blastx_path"] # These datasets will not be searched if missing in the input file # Automatically find the "main" genome and annotation datasets (genome, transcripts, proteome, gff) # Triggers when a dataset path is empty # Exits after a single iteration to not replicate the search # This is VERY specific to phaeoexplorer, as the search depends on how the folders and datasets are named for k, v in datasets_to_get.items(): if k not in search_excluded_datasets and v == "": print("Dataset not specified (%s), searching datasets" % k) self.find_dataset_from_source_data_parent_dir() break # Copy dataset in the organism src_data dir tree correct folder for k, v in datasets_to_get.items(): if k in genome_datasets: shutil.copyfile(os.path.abspath(v), os.path.join(organism_genome_dir, os.path.basename(v))) logging.info("Copied {0} into {1}".format(v, organism_genome_dir)) elif k in annotation_datasets: shutil.copyfile(os.path.abspath(v), os.path.join(organism_annotation_dir, os.path.basename(v))) logging.info("Copied {0} into {1}".format(v, organism_annotation_dir)) else: pass os.chdir(self.main_dir) def find_dataset_from_source_data_parent_dir(self): """ "Fail case" func if a dataset isn't specified in the input file. This func will search the specified "parent directory" for files matching the current species Highly specific to the phaeoexplorer project! Doesn't work for interpro, orthofinder and blast datasets at the moment, those absolutely need to be written in the input file :return """ organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.genome_version)) organism_genome_dir = os.path.abspath("./src_data/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version)) for dirpath, dirnames, files in os.walk(self.source_data_dir): if self.genus_upper and self.species in str(dirpath): for f in files: if "Contaminants" not in str(f): try: if fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".fa"): logging.info("Genome assembly file found - " + str(f)) self.genome_path = os.path.abspath(f) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".gff"): logging.info("GFF file - " + str(f)) self.gff_path = os.path.abspath(f) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_transcripts-gff.fa"): logging.info("Transcripts file - " + str(f)) self.transcripts_path = os.path.abspath(f) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_proteins.fa"): logging.info("Proteins file - " + str(f)) self.proteins_path = os.path.abspath(f) except Exception as exc: logging.debug("Error raised %s" % exc) def generate_blast_banks(self): """ TODO Do we need to generate blast banks? :return: """ return 0 if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction " "with galaxy instances for GGA" ", following the protocol @ " "http://gitlab.sb-roscoff.fr/abims/e-infra/gga") parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_false") parser.add_argument("--config", type=str, help="Config path, default to the 'config' file inside the script repository") parser.add_argument("--main-directory", type=str, help="Where the stack containers will be located, defaults to working directory") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) logging.getLogger("urllib3").setLevel(logging.WARNING) # Parsing the config file if provided, using the default config otherwise if not args.config: args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config") else: args.config = os.path.abspath(args.config) if not args.main_directory: args.main_directory = os.getcwd() else: args.main_directory = os.path.abspath(args.main_directory) sp_dict_list = utilities.parse_input(args.input) for sp_dict in sp_dict_list: # Creating an instance of get_data_for_current_species object get_data_for_current_species = GetData(parameters_dictionary=sp_dict) # Starting logging.info("gga_load_data.py called for %s" % get_data_for_current_species.full_name) # Setting some of the instance attributes get_data_for_current_species.main_dir = args.main_directory get_data_for_current_species.species_dir = os.path.join(get_data_for_current_species.main_dir, get_data_for_current_species.genus_species + "/") # Parse the config yaml file get_data_for_current_species.config = utilities.parse_config(args.config) # Change serexec permissions in repo try: os.chmod("%s/serexec" % get_data_for_current_species.script_dir, 0o0777) except PermissionError: logging.critical("Cannot access %s, exiting" % get_data_for_current_species.script_dir) # Load config file get_data_for_current_species.config = utilities.parse_config(args.config) # Retrieve datasets logging.info("Finding and copying datasets for %s" % get_data_for_current_species.full_name) get_data_for_current_species.get_source_data_files_from_path() logging.info("Sucessfully copied datasets for %s" % get_data_for_current_species.full_name) # Format fasta headers (proteins) logging.info("Formatting fasta files headers %s " % get_data_for_current_species.full_name) get_data_for_current_species.batch_modify_fasta_headers() logging.info("Successfully formatted files headers %s " % get_data_for_current_species.full_name) logging.info("Data successfully loaded and imported for %s" % get_data_for_current_species.full_name)