Skip to content
Snippets Groups Projects
Commit a9df26bb authored by Arthur Le Bars's avatar Arthur Le Bars
Browse files

delete gga_preprocessing

parent c161a821
No related branches found
No related tags found
1 merge request!1Release 1.0
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bioblend
import argparse
import os
import subprocess
import logging
import sys
import fnmatch
import time
import json
import re
import stat
import shutil
import tarfile
import utilities
import speciesData
# Import the phaeoexplorer-tools gga_preprocessing modules (descriptions transfer for the gff file)
sys.path.append("/projet/sbr/gga/dev/gga/swarm/gga/swarm/phaeoexplorer-tools/gga_preprocessing") # Change this path (path to module) in production
from gga_preprocessing import phaeoexplorer_hectar_transfer
from gga_preprocessing import ec_32_orthologs_transfer
"""
gga_preprocessing_phaeoexplorer.py
Usage: $ python3 gga_preprocessing_phaeoexplorer.py -i input_example.yml --config config.yml [OPTIONS]
This script is streamlined with "gga_get_data.py" from the "gga_load_data" repo and cannot be used separately
"""
class GgaPreprocess(speciesData.SpeciesData):
"""
Child of SpeciesData
Methods to process datasets and results from Phaeoexplorer for the gga pipeline
"""
def goto_species_dir(self):
"""
Go to the species directory (starting from the main dir)
:return:
"""
os.chdir(self.main_dir)
species_dir = os.path.join(self.main_dir, self.genus_species) + "/"
try:
os.chdir(species_dir)
except OSError:
logging.critical("Cannot access %s" % species_dir)
sys.exit(0)
return 1
def transfer_manual_annotations(self):
"""
Transfer manual annotations to the species gff file
:return:
"""
self.goto_species_dir()
# Location of the manual annotation file (tabulated)
manual_annotation_file = "/projet/sbr/phaeoexplorer_raw/finalresult/functional_annotation_orthofinder/EctsiV2_gene_info_LATEST.txt"
if not self.sex or self.sex == "" or self.sex is None:
orthofinder_file = "/projet/sbr/phaeoexplorer_raw/finalresult/functional_annotation_orthofinder/ORTHOLOGUES_EC32/{0}-{1}_proteins__v__Ectocarpus-sp_Ec32_proteins-ISOFORM1.tsv".format(self.genus_uppercase, self.species)
else:
orthofinder_file = "/projet/sbr/phaeoexplorer_raw/finalresult/functional_annotation_orthofinder/ORTHOLOGUES_EC32/{0}-{1}_{2}_proteins__v__Ectocarpus-sp_Ec32_proteins-ISOFORM1.tsv".format(self.genus_uppercase, self.species, self.sex.upper())
if os.path.exists(manual_annotation_file):
if os.path.isfile(orthofinder_file):
# file_basename = os.path.basename(orthofinder_file)
for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]:
if "annotation" in d and self.species_folder_name in d and self.ogs_version in d:
for f in os.listdir(d):
if f.endswith(".gff"):
gff_file = os.path.join(d, f)
annotation_dir = os.path.abspath(d)
# shutil.copy(orthofinder_file, os.path.join(d, file_basename))
if os.path.isfile(manual_annotation_file) and os.path.isfile(gff_file) and os.path.isfile(orthofinder_file):
logging.info("Transferring manual annotations from {0} to {1}".format(manual_annotation_file, gff_file))
manual_annotations_transfer.orthologs_transfer(manual_annotation_file=manual_annotation_file,
orthofinder_file=orthofinder_file,
gff_file=gff_file,
outdir=annotation_dir)
new_gff_file = os.path.join(d, "%s_TRANSFERRED_MANUAL.gff" % os.path.basename(gff_file))
# if not os.path.isfile(new_gff_file):
# new_gff_file = os.path.join(d, "{0}-{1}_TRANSFERRED_MANUAL.gff".format(self.genus_uppercase, self.species))
os.replace(os.path.abspath(new_gff_file), os.path.abspath(gff_file))
# os.replace()
else:
logging.info("Orthofinder output file ({0}) doesn't exist for {1}".format(orthofinder_file, self.full_name))
else:
logging.info("Manual annotation file ({0}) doesn't exist for {1}".format(manual_annotation_file, self.full_name))
return 1
def transfer_hectar_annotations(self):
"""
Extract hectar output file and transfer hectar annotations to the species gff file
The extracted Hectar output file is deleted upon completion
:return:
"""
self.goto_species_dir()
hectar_output_archive = "/projet/sbr/phaeoexplorer_raw/finalresult/functional_annotation_hectar/finalresult_hectar_phaeoexplorer_20200228.tgz"
# Hectar archive containing all hectar outputs (path might change)
if os.path.exists(hectar_output_archive):
for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]:
if "annotation" in d and self.species_folder_name in d and self.ogs_version in d:
for f in os.listdir(d):
if f.endswith(".gff"):
gff_file = os.path.join(d, f)
annotation_dir = os.path.abspath(d)
hectar_file = ""
logging.info("Extracting hectar output file for %s" % self.full_name)
with tarfile.open(os.path.abspath(hectar_output_archive)) as archive_tar:
if not self.sex or self.sex == "" or self.sex is None:
for archive_file in archive_tar.getmembers():
archive_tar.extract("finalresult/{0}-{1}/hectar_output.csv".format(self.genus_uppercase, self.species),
path=self.annotation_dir)
hectar_file = os.path.join(annotation_dir, "finalresult/{0}-{1}/hectar_output.csv".format(self.genus_uppercase, self.species))
else:
for archive_file in archive_tar.getmembers():
archive_tar.extract("finalresult/{0}-{1}_{2}/hectar_output.csv".format(self.genus_uppercase, self.species, self.sex.upper()),
path=annotation_dir)
hectar_file = os.path.join(annotation_dir, "finalresult/{0}-{1}_{2}/hectar_output.csv".format(self.genus_uppercase, self.species, self.sex.upper()))
if os.path.isfile(hectar_file):
hectar_transfer.hectar_transfer(hectar_file=hectar_file,
gff_file=gff_file,
outdir=annotation_dir)
new_gff_file = os.path.join(d, "%s_TRANSFERRED_HECTAR.gff" % os.path.basename(gff_file))
# if not os.path.isfile(new_gff_file):
# new_gff_file = os.path.join(d, "{0}-{1}_TRANSFERRED_HECTAR.gff".format(self.genus_uppercase, self.species))
os.replace(os.path.abspath(new_gff_file), os.path.abspath(gff_file))
else:
logging.info("Hectar output archive ({0}) doesn't exist for {1}".format(hectar_output_archive, self.full_name))
return 1
def batch_modify_fasta_headers(self):
"""
Change the fasta headers before integration, so that the indexing tool in galaxy interprets the headers
correctly and doesn't throw an error
The function will use the class attribute "source_datasets", pointing to files in the galaxy
library to find the fasta files that need their headers formatted
:return:
"""
proteins_file = None
proteins_outfile = None
annotation_dir = None
organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.ogs_version))
self.goto_species_dir()
for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]:
if "annotation" in d and self.species_folder_name in d and self.ogs_version in d:
for f in os.listdir(d):
if "proteins" in f:
proteins_file = os.path.join(d, f)
proteins_outfile = os.path.join(d, "outfile_proteins.fa")
annotation_dir = os.path.abspath(d)
# Formatting the headers
if proteins_file is not None:
self.format_fasta_headers(infile=proteins_file,
outfile=proteins_outfile,
pattern="^>mRNA",
repl=">protein")
if os.path.exists(annotation_dir + "/outfile_proteins.fa"):
subprocess.call(["mv", annotation_dir + "/outfile_proteins.fa", proteins_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=annotation_dir)
subprocess.call(["rm", annotation_dir + "/outfile_proteins.fa"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=annotation_dir)
else:
logging.warning("Skipping proteins fasta headers formatting (FileNotFound)")
@staticmethod
def format_fasta_headers(infile, outfile, pattern, repl):
"""
Format the fasta headers of a given file, given a matching pattern and a replacement string
:param infile:
:param outfile:
:param pattern:
:param repl:
:return:
"""
infile = open(infile, 'r')
outfile = open(outfile, 'w')
lines = infile.readlines()
for line in lines:
line_out = re.sub(pattern, repl, line)
outfile.write(line_out)
infile.close()
outfile.close()
def clean_up_dir_tree(self):
"""
:return:
"""
organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.ogs_version))
if os.path.exists(os.path.join(organism_annotation_dir, "finalresult")):
shutil.rmtree(os.path.join(organism_annotation_dir, "finalresult"))
return 1
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
"with galaxy instances for GGA"
", following the protocol @ "
"http://gitlab.sb-roscoff.fr/abims/e-infra/gga")
parser.add_argument("input",
type=str,
help="Input file (yml)")
parser.add_argument("-v", "--verbose",
help="Increase output verbosity",
action="store_false")
parser.add_argument("--config",
type=str,
help="Config path, default to the 'config' file inside the script repository")
parser.add_argument("--main-directory",
type=str,
help="Where the stack containers will be located, defaults to working directory")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Parsing the config file if provided, using the default config otherwise
if not args.config:
args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config")
else:
args.config = os.path.abspath(args.config)
if not args.main_directory:
args.main_directory = os.getcwd()
else:
args.main_directory = os.path.abspath(args.main_directory)
sp_dict_list = utilities.parse_input(args.input)
for sp_dict in sp_dict_list:
# Creating an instance of gga_preprocessor object
gga_preprocessor = GgaPreprocess(parameters_dictionary=sp_dict)
# Starting
logging.info("gga_preprocessing_phaeoexplorer.py called for %s" % gga_preprocessor.full_name)
# Setting some of the instance attributes
gga_preprocessor.main_dir = args.main_directory
gga_preprocessor.species_dir = os.path.join(gga_preprocessor.main_dir,
gga_preprocessor.genus_species +
"/")
# Parse the config yaml file
gga_preprocessor.config = utilities.parse_config(args.config)
# Transfer manual annotations
gga_preprocessor.transfer_manual_annotations()
# Transfer hectar annotations
gga_preprocessor.transfer_hectar_annotations()
# Format fasta headers
gga_preprocessor.batch_modify_fasta_headers()
# Clean up src_data
gga_preprocessor.clean_up_dir_tree()
logging.info("Datasets processed for %s" % gga_preprocessor.full_name)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment