-
Arthur Le Bars authored
jinja templating, more workflow changes, trying fixes for the jbrowse gmod tool using API (in progress)
dc2e8f3a
gga_load_data.py 25.47 KiB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bioblend
import argparse
import os
import subprocess
import logging
import sys
import utilities
import speciesData
import fnmatch
import time
import json
import re
import stat
import shutil
from bioblend.galaxy.objects import GalaxyInstance
from bioblend import galaxy
"""
gga_load_data.py
Usage: $ python3 gga_init.py -i input_example.yml [OPTIONS]
Do not call this script before the galaxy container is ready
"""
class LoadData(speciesData.SpeciesData):
"""
Child of SpeciesData
Contains methods and attributes to copy data into the src_data subfolders of an organism and then into the
galaxy instance's history of this given organism
Optional fasta headers reformat
"""
def goto_species_dir(self):
"""
Go to the species directory (starting from the main dir)
:return:
"""
os.chdir(self.main_dir)
species_dir = os.path.join(self.main_dir, self.genus_species) + "/"
try:
os.chdir(species_dir)
except OSError:
logging.critical("Cannot access %s" % species_dir)
sys.exit(0)
return 1
def batch_modify_fasta_headers(self):
"""
Change the fasta headers before integration, so that the indexing tool in galaxy interprets the headers
correctly and doesn't throw an error
The function will use the class attribute "source_datasets", pointing to files in the galaxy
library to find the fasta files that need their headers formatted
:return:
"""
proteins_file = None
annotation_dir = None
organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.genome_version))
self.goto_species_dir()
for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]:
if "annotation" in d and self.species_folder_name in d and self.ogs_version in d:
for f in os.listdir(d):
if "proteins" in f:
proteins_file = os.path.join(d, f)
proteins_outfile = os.path.join(d, "outfile_proteins.fa")
annotation_dir = os.path.abspath(d)
# Formatting the headers
if proteins_file is not None:
self.format_fasta_headers(infile=proteins_file,
outfile=proteins_outfile,
pattern="^>mRNA",
repl=">protein")
if os.path.exists(annotation_dir + "/outfile_proteins.fa"):
subprocess.run(["mv", annotation_dir + "/outfile_proteins.fa", proteins_file],
stdout=subprocess.PIPE,
cwd=annotation_dir)
# subprocess.run(["rm", annotation_dir + "/outfile_proteins.fa"], stdout=subprocess.PIPE, cwd=annotation_dir)
else:
logging.warning("Skipping proteins fasta headers formatting (FileNotFound)")
def format_fasta_headers(self, infile, outfile, pattern, repl):
"""
Format the fasta headers of a given file, given a matching pattern and a replacement string
:param infile:
:param outfile:
:param pattern:
:param repl:
:return:
"""
infile = open(infile, 'r')
outfile = open(outfile, 'w')
lines = infile.readlines()
for line in lines:
line_out = re.sub(pattern, repl, line)
outfile.write(line_out)
infile.close()
outfile.close()
def get_source_data_files_from_path(self):
"""
Find source data files in the parent_directory
Link data files
TODO: implement search/tests for individual file paths
:return:
"""
try:
os.chdir(self.species_dir)
except OSError:
logging.critical("Cannot access " + self.species_dir)
sys.exit(0)
organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.genome_version))
organism_genome_dir = os.path.abspath("./src_data/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version))
for dirpath, dirnames, files in os.walk(self.source_data_dir):
if "0" in str(dirpath): # TODO: Ensures to take the correct files (other dirs hold files with valid names), this is for Phaeoexplorer only!
for f in files:
if "Contaminants" not in str(f):
try:
if fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".fa"):
logging.info("Genome assembly file - " + str(f))
# os.symlink(os.path.join(dirpath, f), organism_genome_dir)
# organism_genome_dir = os.path.abspath("./src_data/genome/" +
# self.species_folder_name + "/v" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_genome_dir, f))
elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".gff"):
logging.info("GFF file - " + str(f))
# os.symlink(os.path.join(dirpath, f), organism_annotation_dir)
# organism_annotation_dir = os.path.abspath("./src_data/annotation/" +
# self.species_folder_name + "/OGS" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_annotation_dir, f))
elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" +
self.sex.upper() + "_transcripts-gff.fa"):
logging.info("Transcripts file - " + str(f))
# os.symlink(os.path.join(dirpath, f), organism_annotation_dir)
# organism_annotation_dir = os.path.abspath("./src_data/annotation/" +
# self.species_folder_name + "/OGS" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_annotation_dir, f))
elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_proteins.fa"):
logging.info("Proteins file - " + str(f))
# os.symlink(os.path.join(dirpath, f), organism_annotation_dir)
# organism_annotation_dir = os.path.abspath("./src_data/annotation/" +
# self.species_folder_name + "/OGS" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_annotation_dir, f))
except FileExistsError as exc:
logging.warning("Error raised (FileExistsError)")
logging.warning(exc)
except TypeError as exc:
logging.warning("Error raised (TypeError)")
logging.warning(exc)
except NotADirectoryError as exc:
logging.warning("Error raised (NotADirectoryError)")
logging.warning(exc)
os.chdir(self.main_dir)
def set_get_history(self):
"""
Create or set the working history to the current species one
TODO move to utilities
:return:
"""
try:
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.info("History for {0}: {1}".format(self.full_name, self.history_id))
except IndexError:
logging.info("Creating history for %s" % self.full_name)
self.instance.histories.create_history(name=str(self.full_name))
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.info("History for {0}: {1}".format(self.full_name, self.history_id))
return self.history_id
def remove_homo_sapiens_from_db(self):
"""
Run the GMOD tool to remove the "Homo sapiens" default organism from the original database
Will do nothing if H. sapiens isn't in the database
"""
logging.debug("Getting 'Homo sapiens' ID in instance's chado database")
get_sapiens_id_job = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
history_id=self.history_id,
tool_inputs={"genus": "Homo", "species": "sapiens"})
get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"]
get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output)
try:
logging.debug("Deleting Homo 'sapiens' in the instance's chado database")
get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0]
sapiens_id = str(
get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2",
history_id=self.history_id,
tool_inputs={"organism": str(sapiens_id)})
except bioblend.ConnectionError:
logging.debug("Homo sapiens isn't in the instance's chado database")
except IndexError:
logging.debug("Homo sapiens isn't in the instance's chado database")
pass
def purge_histories(self):
"""
Delete all histories in the instance
For testing purposes
:return:
"""
histories = self.instance.histories.get_histories()
self.instance.histories.get_histories(deleted=False)
for h in histories:
self.instance.histories.delete_history(history_id=h["id"])
return histories
def setup_library(self):
"""
Create a "Project Data" library in galaxy, mirroring the "src_data" folder of the current organism
directory tree
Credits go to Anthony Bretaudeau for the initial code
:return:
"""
self.goto_species_dir()
# Delete pre-existing lib (probably created by a previous call)
gio = GalaxyInstance(url=self.instance_url,
email=self.config["custom_galaxy_default_admin_email"],
password=self.config["custom_galaxy_default_admin_password"])
folders = dict()
post_renaming = {}
for root, dirs, files in os.walk("./src_data", followlinks=True):
file_list = [os.path.join(root, filename) for filename in files]
folders[root] = file_list
if folders:
# Delete pre-existing lib (probably created by a previous call)
existing = gio.libraries.get_previews(name='Project Data')
for lib in existing:
if not lib.deleted:
logging.info('Pre-existing "Project Data" library %s found, removing it' % lib.id)
gio.libraries.delete(lib.id)
logging.info("Creating new 'Project Data' library")
prj_lib = gio.libraries.create('Project Data', 'Data for current genome annotation project')
self.library_id = prj_lib.id # project data folder/library
logging.info("Library for {0}: {1}".format(self.full_name, self.library_id))
for fname, files in folders.items():
if fname and files:
folder_name = fname[len("./src_data") + 1:]
logging.info("Creating folder: %s" % folder_name)
folder = self.create_deep_folder(prj_lib, folder_name)
for single_file in files:
ftype = 'auto'
clean_name = os.path.basename(single_file)
clean_name = clean_name.replace('_', ' ')
if single_file.endswith('.bam'):
ftype = 'bam'
bam_label = self.get_bam_label(fname, os.path.basename(single_file))
if bam_label:
clean_name = bam_label
else:
clean_name = os.path.splitext(clean_name)[0]
if clean_name.endswith("Aligned.sortedByCoord.out"): # Stupid thing for many local bam files
clean_name = clean_name[:-25]
elif single_file.endswith('.fasta') or single_file.endswith('.fa') or single_file.endswith(
'.faa') or single_file.endswith('.fna'):
ftype = 'fasta'
elif single_file.endswith('.gff') or single_file.endswith('.gff3'):
ftype = 'gff3'
clean_name = os.path.splitext(clean_name)[0]
elif single_file.endswith('.xml'):
ftype = 'xml'
elif single_file.endswith('.bw'):
ftype = 'bigwig'
elif single_file.endswith('.gaf'):
ftype = 'tabular'
elif single_file.endswith('_tree.txt'):
# We don't want to pollute the logs with 20000 useless lines
logging.debug("Skipping useless file '%s'" % single_file)
continue
elif single_file.endswith('.tar.gz') and 'newick' in fname:
ftype = 'tar'
elif single_file.endswith('.bai') or single_file.endswith('.tar.gz') or single_file.endswith(
'.tar.bz2') or single_file.endswith('.raw') or single_file.endswith('.pdf'):
logging.info("Skipping useless file '%s'" % single_file)
continue
logging.info("Adding file '%s' with type '%s' and name '%s'" % (single_file, ftype, clean_name))
datasets = prj_lib.upload_from_local(
path=single_file,
folder=folder,
file_type=ftype
)
# Rename dataset
# Need to do it AFTER the datasets import is finished, otherwise the new names are not kept by galaxy
# (erased by metadata generation I guess)
# Doesn't work for some reason (LibraryDataset not subscriptable, __getitem__() not implemented)
# post_renaming[datasets[0]] = clean_name
time.sleep(1)
# Wait for uploads to complete
logging.info("Waiting for import jobs to finish... please wait")
# Checking job state (only necessary if ran using SLURM)
# while True:
# try:
# # "C" state means the job is completed, no need to wait for it
# ret = subprocess.check_output("squeue | grep -v \"C debug\" | grep -v \"JOBID\" || true",
# shell=True)
# if not len(ret):
# break
# time.sleep(3)
# except subprocess.CalledProcessError as inst:
# if inst.returncode == 153: # queue is empty
# break
# else:
# raise
time.sleep(10)
# Batch renaming --> Throws a critical error at the moment
# logging.info("Import finished, now renaming datasets with pretty names")
# for dataset in post_renaming:
# dataset.update(name=post_renaming[dataset])
logging.info("Finished importing data")
def create_deep_folder(self, prj_lib, path, parent_folder=None, deep_name=""):
"""
Create a folder inside a folder in a galaxy library
Recursive
:param prj_lib:
:param path:
:param parent_folder:
:param deep_name:
:return:
"""
segments = path.split(os.sep)
deeper_name = os.sep.join([deep_name, segments[0]])
if deeper_name in self.existing_folders_cache:
new_folder = self.existing_folders_cache[deeper_name]
else:
new_folder = prj_lib.create_folder(segments[0], base_folder=parent_folder)
self.existing_folders_cache[deeper_name] = new_folder
if len(segments) > 1:
new_folder = self.create_deep_folder(prj_lib, os.sep.join(segments[1:]), new_folder, deeper_name)
return new_folder
def setup_data_libraries(self):
"""
Load data into the galaxy container with the galaxy_data_libs_SI.py script written by A. Bretaudeau
DEPRECATED
:return:
"""
self.goto_species_dir()
try:
logging.info("Loading data into the galaxy container")
subprocess.call(["../serexec","{0}_{1}_galaxy".format(self.genus_lowercase, self.species),
"/tool_deps/_conda/bin/python",
"/opt/galaxy_data_libs_SI.py"])
except subprocess.CalledProcessError:
logging.info("Cannot load data into the galaxy container for " + self.full_name)
pass
else:
logging.info("Data successfully loaded into the galaxy container for " + self.full_name)
def generate_blast_banks(self):
"""
TODO: Automatically generate blast banks for a species and commit
:return:
"""
def connect_to_instance(self):
"""
Test the connection to the galaxy instance for the current organism
Exit if we cannot connect to the instance
"""
self.instance = galaxy.GalaxyInstance(url=self.instance_url,
email=self.config["custom_galaxy_default_admin_email"],
password=self.config["custom_galaxy_default_admin_password"]
)
logging.info("Connecting to the galaxy instance...")
try:
self.instance.histories.get_histories()
except bioblend.ConnectionError:
logging.critical("Cannot connect to galaxy instance @ " + self.instance_url)
sys.exit()
else:
logging.info("Successfully connected to galaxy instance @ " + self.instance_url)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
"with galaxy instances for GGA"
", following the protocol @ "
"http://gitlab.sb-roscoff.fr/abims/e-infra/gga")
parser.add_argument("input",
type=str,
help="Input file (yml)")
parser.add_argument("-v", "--verbose",
help="Increase output verbosity",
action="store_false")
parser.add_argument("--config",
type=str,
help="Config path, default to the 'config' file inside the script repository")
parser.add_argument("--main-directory",
type=str,
help="Where the stack containers will be located, defaults to working directory")
parser.add_argument("--load-history",
help="Load the found files into history, will ask for confirmation if a file with the same name is already found in an history",
action="store_false")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Parsing the config file if provided, using the default config otherwise
if not args.config:
args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config")
else:
args.config = os.path.abspath(args.config)
if not args.main_directory:
args.main_directory = os.getcwd()
else:
args.main_directory = os.path.abspath(args.main_directory)
sp_dict_list = utilities.parse_input(args.input)
for sp_dict in sp_dict_list:
# Creating an instance of load_data_for_current_species object
load_data_for_current_species = LoadData(parameters_dictionary=sp_dict)
# Starting
logging.info("gga_load_data.py called for %s" % load_data_for_current_species.full_name)
# Setting some of the instance attributes
load_data_for_current_species.main_dir = args.main_directory
load_data_for_current_species.species_dir = os.path.join(load_data_for_current_species.main_dir,
load_data_for_current_species.genus_species +
"/")
# Parse the config yaml file
load_data_for_current_species.config = utilities.parse_config(args.config)
# Set the instance url attribute
for env_variable, value in load_data_for_current_species.config.items():
if env_variable == "custom_host":
# TODO:
load_data_for_current_species.instance_url = "http://{0}:8888/sp/{1}_{2}/galaxy/".format(
value, load_data_for_current_species.genus_lowercase, load_data_for_current_species.species)
break
else:
load_data_for_current_species.instance_url = "http://localhost:8888/sp/{0}_{1}/galaxy/".format(
load_data_for_current_species.genus_lowercase,
load_data_for_current_species.species)
# Change serexec permissions in repo
try:
os.chmod("%s/serexec" % load_data_for_current_species.script_dir, 0o0777)
except PermissionError:
logging.critical("Cannot access %s, exiting" % load_data_for_current_species.script_dir)
# Check the galaxy container state and proceed if the galaxy services are up and running
if utilities.check_galaxy_state(genus_lowercase=load_data_for_current_species.genus_lowercase,
species=load_data_for_current_species.species,
script_dir=load_data_for_current_species.script_dir):
# Load config file
load_data_for_current_species.config = utilities.parse_config(args.config)
# Testing connection to the instance
load_data_for_current_species.connect_to_instance()
# Retrieve datasets
logging.info("Finding and copying datasets for %s" % load_data_for_current_species.full_name)
load_data_for_current_species.get_source_data_files_from_path()
logging.info("Sucessfully copied datasets for %s" % load_data_for_current_species.full_name)
# # Format fasta headers (proteins)
logging.info("Formatting fasta files headers %s " % load_data_for_current_species.full_name)
load_data_for_current_species.batch_modify_fasta_headers()
logging.info("Successfully formatted files headers %s " % load_data_for_current_species.full_name)
# Load the datasets into a galaxy library
logging.info("Setting up library for %s" % load_data_for_current_species.full_name)
load_data_for_current_species.setup_library()
logging.info("Successfully set up library in galaxy for %s" % load_data_for_current_species.full_name)
# Set or get the history for the current organism
load_data_for_current_species.set_get_history()
# Remove H. sapiens from database if here TODO: set a dedicated history for removing H. sapiens (instead of doing it into a species history)
load_data_for_current_species.remove_homo_sapiens_from_db()
# logging.info("Importing datasets into history for %s" % load_data_for_current_species.full_name)
# load_data_for_current_species.import_datasets_into_history()
# load_data_for_current_species.purge_histories() # Testing purposes
logging.info("Data successfully loaded and imported for %s" % load_data_for_current_species.full_name)
else:
logging.critical("The galaxy container for %s is not ready yet!" % load_data_for_current_species.full_name)
sys.exit()