#!/usr/bin/python # -*- coding: utf-8 -*- import bioblend import argparse import os import subprocess import logging import sys import utilities import speciesData import fnmatch import time import json import re from bioblend.galaxy.objects import GalaxyInstance from bioblend import galaxy """ gga_load_data.py Usage: $ python3 gga_init.py -i example.yml [OPTIONS] Do not call this script before the galaxy container is ready """ class LoadData(speciesData.SpeciesData): """ Child of SpeciesData Contains methods and attributes to copy data into the src_data subfolders of an organism and then into the galaxy instance's history of this given organism Optional fasta headers reformat """ def goto_species_dir(self): """ Go to the species directory (starting from the main dir) :return: """ os.chdir(self.main_dir) species_dir = os.path.join(self.main_dir, self.genus_species) + "/" try: os.chdir(species_dir) except OSError: logging.critical("Cannot access %s" % species_dir) sys.exit(0) return 1 def modify_fasta_headers(self): """ Change the fasta headers before integration, so that the indexing tool in galaxy interprets the headers correctly and doesn't throw an error The function will use the class attribute "source_datasets", pointing to files in the galaxy library to find the fasta files that need their headers formatted :return: """ annotation_dir, genome_dir = None, None for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]: if "annotation/" in d: annotation_dir = d for f in os.listdir(d): if f.endswith("proteins.fasta"): self.source_files["proteins_file"] = os.path.join(d, f) elif f.endswith("transcripts-gff.fa"): self.source_files["transcripts_file"] = os.path.join(d, f) elif f.endswith(".gff"): self.source_files["gff_file"] = os.path.join(d, f) elif "genome/" in d: genome_dir = d for f in os.listdir(d): if f.endswith(".fa"): self.source_files["genome_file"] = os.path.join(d, f) logging.debug("source files found:") for k, v in self.source_files.items(): logging.debug("\t" + k + "\t" + v) # Formatting headers in the fasta files # Proteome logging.debug("Formatting fasta headers for " + self.source_files["proteins_file"]) # Transcriptome logging.debug("Formatting fasta headers for " + self.source_files["transcripts_file"]) self.format_fasta_headers(infile=self.source_files["transcripts_file"], outfile=str(self.source_files["transcripts_file"])) # Rename outputs and cleaning script logs if os.path.exists(annotation_dir + "outfile"): subprocess.run(["mv", annotation_dir + "/outfile", self.source_files["proteins_file"]], stdout=subprocess.PIPE, cwd=annotation_dir) if os.path.exists(annotation_dir + "gmon.out"): subprocess.run(["rm", annotation_dir + "/gmon.out"], stdout=subprocess.PIPE, cwd=annotation_dir) def format_fasta_headers(self, infile, outfile, pattern, repl): """ Format the fasta headers of a given file, given a matching pattern and a replacement string :param infile: :param outfile: :param pattern: :param repl: :return: """ infile = open(infile, 'r') outfile = open(outfile, 'w') lines = infile.readlines() for line in lines: line_out = re.sub(pattern, repl, line) outfile.write(line_out) infile.close() outfile.close() def get_source_data_files_from_path(self): """ Link data files :return: """ try: os.chdir(self.species_dir) except OSError: logging.critical("Cannot access " + self.species_dir) sys.exit(0) organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.genome_version)) organism_genome_dir = os.path.abspath("./src_data/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version)) for dirpath, dirnames, files in os.walk(self.source_data_dir): if "0" in str(dirpath): # TODO: Ensures to take the correct files (other dirs hold files with valid names), this is for Phaeoexplorer only! for f in files: if "Contaminants" not in str(f): try: if fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".fa"): logging.info("Genome assembly file - " + str(f)) organism_genome_dir = organism_genome_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_genome_dir) organism_genome_dir = os.path.abspath("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".gff"): logging.info("GFF file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_transcripts-gff.fa"): logging.info("Transcripts file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_proteins.fa"): logging.info("Proteins file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) except FileExistsError: logging.warning("Error raised (FileExistsError)") except TypeError: logging.warning("Error raised (TypeError)") except NotADirectoryError: logging.warning("Error raised (NotADirectoryError)") os.chdir(self.main_dir) def set_get_history(self): """ Create or set the working history to the organism's one :return: """ try: histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.info("History for {0}: {1}".format(self.full_name, self.history_id)) except IndexError: logging.info("Creating history for %s" % self.full_name) self.instance.histories.create_history(name=str(self.full_name)) histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.info("History for {0}: {1}".format(self.full_name, self.history_id)) return self.history_id def import_datasets_into_history(self): """ Find datasets in a library, get their ID and import thme into the current history if they are not already :return: """ gio = GalaxyInstance(url=self.instance_url, email=self.config["custom_galaxy_default_admin_email"], password=self.config["custom_galaxy_default_admin_password"]) prj_lib = gio.libraries.get_previews(name="Project Data") self.library_id = prj_lib[0].id instance_source_data_folders = self.instance.libraries.get_folders(library_id=str(self.library_id)) folders_ids = {} current_folder_name = "" for i in instance_source_data_folders: for k, v in i.items(): if k == "name": folders_ids[v] = 0 current_folder_name = v if k == "id": folders_ids[current_folder_name] = v logging.info("Datasets IDs: ") for k, v in folders_ids.items(): if k == "/genome": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) final_sub_folder_content = self.instance.folders.show_folder(folder_id=sub_folder_content["folder_contents"][0]["id"], contents=True) for k2, v2 in final_sub_folder_content.items(): for e in v2: if type(e) == dict: if e["name"].endswith(".fa"): self.datasets["genome_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) if k == "/annotation": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) final_sub_folder_content = self.instance.folders.show_folder(folder_id=sub_folder_content["folder_contents"][0]["id"], contents=True) for k2, v2 in final_sub_folder_content.items(): for e in v2: if type(e) == dict: # TODO: manage versions (differentiate between the correct folders using self.config) if "transcripts" in e["name"]: self.datasets["transcripts_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) elif "proteins" in e["name"]: self.datasets["proteins_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) elif "gff" in e["name"]: self.datasets["gff_file"] = e["ldda_id"] logging.info("\t" + e["name"] + ": " + e["ldda_id"]) logging.info("Uploading datasets into history %s" % self.history_id) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"]) return {"history_id": self.history_id, "library_id": self.library_id, "datasets": self.datasets} def prepare_history(self): """ Galaxy instance startup in preparation for importing datasets and running a workflow - Remove Homo sapiens from the chado database. - Add organism and analyses into the chado database --> separate - Get any other existing organisms IDs before updating the galaxy instance --> separate Calling this function is mandatory to have a working galaxy instance history :return: """ self.connect_to_instance() self.set_get_history() # Delete Homo sapiens from Chado database logging.debug("Getting 'Homo sapiens' ID in instance's chado database") get_sapiens_id_job = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": "Homo", "species": "sapiens"}) get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"] get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output) try: logging.debug("Deleting Homo 'sapiens' in the instance's chado database") get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0] sapiens_id = str( get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2", history_id=self.history_id, tool_inputs={"organism": str(sapiens_id)}) except bioblend.ConnectionError: logging.debug("Homo sapiens isn't in the instance's chado database") except IndexError: logging.debug("Homo sapiens isn't in the instance's chado database") pass # Add organism (species) to chado logging.info("Adding organism to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus, "species": self.species, "common": self.common}) # Add OGS analysis to chado logging.info("Adding OGS analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version, "program": "Performed by Genoscope", "programversion": str("OGS" + self.ogs_version), "sourcename": "Genoscope", "date_executed": self.date}) # Add genome analysis to chado logging.info("Adding genome analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version, "program": "Performed by Genoscope", "programversion": str("genome v" + self.genome_version), "sourcename": "Genoscope", "date_executed": self.date}) self.get_organism_and_analyses_ids() logging.info("Finished initializing instance") def purge_histories(self): """ Delete all histories in the instance :return: """ histories = self.instance.histories.get_histories() self.instance.histories.get_histories(deleted=False) for h in histories: self.instance.histories.delete_history(history_id=h["id"]) return histories def setup_library(self): """ Create a "Project Data" library in galaxy, mirroring the "src_data" folder of the current organism directory tree Credits to Anthony Bretaudeau for the initial code :return: """ self.goto_species_dir() # Delete pre-existing lib (probably created by a previous call) gio = GalaxyInstance(url=self.instance_url, email=self.config["custom_galaxy_default_admin_email"], password=self.config["custom_galaxy_default_admin_password"]) folders = dict() post_renaming = {} for root, dirs, files in os.walk("./src_data", followlinks=True): file_list = [os.path.join(root, filename) for filename in files] folders[root] = file_list if folders: # Delete pre-existing lib (probably created by a previous call) existing = gio.libraries.get_previews(name='Project Data') for lib in existing: if not lib.deleted: logging.info('Pre-existing "Project Data" library %s found, removing it' % lib.id) gio.libraries.delete(lib.id) logging.info("Creating new 'Project Data' library") prj_lib = gio.libraries.create('Project Data', 'Data for current genome annotation project') self.library_id = prj_lib.id # project data folder/library logging.info("Library for {0}: {1}".format(self.full_name, self.library_id)) for fname, files in folders.items(): if fname and files: folder_name = fname[len("./src_data") + 1:] logging.info("Creating folder: %s" % folder_name) folder = self.create_deep_folder(prj_lib, folder_name) for single_file in files: ftype = 'auto' clean_name = os.path.basename(single_file) clean_name = clean_name.replace('_', ' ') if single_file.endswith('.bam'): ftype = 'bam' bam_label = self.get_bam_label(fname, os.path.basename(single_file)) if bam_label: clean_name = bam_label else: clean_name = os.path.splitext(clean_name)[0] if clean_name.endswith("Aligned.sortedByCoord.out"): # Stupid thing for many local bam files clean_name = clean_name[:-25] elif single_file.endswith('.fasta') or single_file.endswith('.fa') or single_file.endswith( '.faa') or single_file.endswith('.fna'): ftype = 'fasta' elif single_file.endswith('.gff') or single_file.endswith('.gff3'): ftype = 'gff3' clean_name = os.path.splitext(clean_name)[0] elif single_file.endswith('.xml'): ftype = 'xml' elif single_file.endswith('.bw'): ftype = 'bigwig' elif single_file.endswith('.gaf'): ftype = 'tabular' elif single_file.endswith('_tree.txt'): # We don't want to pollute the logs with 20000 useless lines logging.debug("Skipping useless file '%s'" % single_file) continue elif single_file.endswith('.tar.gz') and 'newick' in fname: ftype = 'tar' elif single_file.endswith('.bai') or single_file.endswith('.tar.gz') or single_file.endswith( '.tar.bz2') or single_file.endswith('.raw') or single_file.endswith('.pdf'): logging.info("Skipping useless file '%s'" % single_file) continue logging.info("Adding file '%s' with type '%s' and name '%s'" % (single_file, ftype, clean_name)) datasets = prj_lib.upload_from_local( path=single_file, folder=folder, file_type=ftype ) # Rename dataset # Need to do it AFTER the datasets import is finished, otherwise the new names are not kept by galaxy # (erased by metadata generation I guess) # ALB: Doesn't work for some reason (LibraryDataset not subscriptable, __getitem__() not implemented) # post_renaming[datasets[0]] = clean_name time.sleep(1) # Wait for uploads to complete logging.info("Waiting for import jobs to finish... please wait") # while True: # try: # # "C" state means the job is completed, no need to wait for it # ret = subprocess.check_output("squeue | grep -v \"C debug\" | grep -v \"JOBID\" || true", # shell=True) # if not len(ret): # break # time.sleep(3) # except subprocess.CalledProcessError as inst: # if inst.returncode == 153: # queue is empty # break # else: # raise time.sleep(10) # ALB: Batch renaming # logging.info("Import finished, now renaming datasets with pretty names") # for dataset in post_renaming: # dataset.update(name=post_renaming[dataset]) logging.info("Finished importing data") def create_deep_folder(self, prj_lib, path, parent_folder=None, deep_name=""): """ Create a folder inside a folder in a galaxy library Recursive :param prj_lib: :param path: :param parent_folder: :param deep_name: :return: """ segments = path.split(os.sep) deeper_name = os.sep.join([deep_name, segments[0]]) if deeper_name in self.existing_folders_cache: new_folder = self.existing_folders_cache[deeper_name] else: new_folder = prj_lib.create_folder(segments[0], base_folder=parent_folder) self.existing_folders_cache[deeper_name] = new_folder if len(segments) > 1: new_folder = self.create_deep_folder(prj_lib, os.sep.join(segments[1:]), new_folder, deeper_name) return new_folder def setup_data_libraries(self): """ Load data into the galaxy container with the galaxy_data_libs_SI.py script written by A. Bretaudeau DEPRECATED :return: """ self.goto_species_dir() try: logging.info("Loading data into the galaxy container") subprocess.call(["../serexec","{0}_{1}_galaxy".format(self.genus_lowercase, self.species), "/tool_deps/_conda/bin/python", "/opt/galaxy_data_libs_SI.py"]) except subprocess.CalledProcessError: logging.info("Cannot load data into the galaxy container for " + self.full_name) pass else: logging.info("Data successfully loaded into the galaxy container for " + self.full_name) def generate_blast_banks(self): """ TODO: Automatically generate blast banks for a species and commit :return: """ def connect_to_instance(self): """ Test the connection to the galaxy instance for the current organism Exit if we cannot connect to the instance """ self.instance = galaxy.GalaxyInstance(url=self.instance_url, email=self.config["custom_galaxy_default_admin_email"], password=self.config["custom_galaxy_default_admin_password"] ) logging.info("Connecting to the galaxy instance...") try: self.instance.histories.get_histories() except bioblend.ConnectionError: logging.critical("Cannot connect to galaxy instance @ " + self.instance_url) sys.exit() else: logging.info("Successfully connected to galaxy instance @ " + self.instance_url) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction " "with galaxy instances for GGA" ", following the protocol @ " "http://gitlab.sb-roscoff.fr/abims/e-infra/gga") parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_false") parser.add_argument("--config", type=str, help="Config path, default to the 'config' file inside the script repository") parser.add_argument("--main-directory", type=str, help="Where the stack containers will be located, defaults to working directory") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) # Parsing the config file if provided, using the default config otherwise if not args.config: args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config") else: args.config = os.path.abspath(args.config) if not args.main_directory: args.main_directory = os.getcwd() else: args.main_directory = os.path.abspath(args.main_directory) sp_dict_list = utilities.parse_input(args.input) for sp_dict in sp_dict_list: # Creating an instance of load_data_for_current_species object load_data_for_current_species = LoadData(parameters_dictionary=sp_dict) # Starting logging.info("gga_load_data.py called for %s" % load_data_for_current_species.full_name) # Setting some of the instance attributes load_data_for_current_species.main_dir = args.main_directory load_data_for_current_species.species_dir = os.path.join(load_data_for_current_species.main_dir, load_data_for_current_species.genus_species + "/") load_data_for_current_species.config = utilities.parse_config(args.config) # Check the galaxy logs and proceed if the galaxy service is up and ready if utilities.read_galaxy_logs(genus_lowercase=load_data_for_current_species.genus_lowercase, species=load_data_for_current_species.species): # Load config file load_data_for_current_species.config = utilities.parse_config(args.config) # Testing connection to the instance load_data_for_current_species.connect_to_instance() # Retrieve datasets logging.info("Finding datasets for %s" % load_data_for_current_species.full_name) load_data_for_current_species.get_source_data_files_from_path() # Load the datasets into a galaxy library # logging.info("Setting up library for %s" % load_data_for_current_species.full_name) # load_data_for_current_species.setup_library() # logging.info("Successfully set up library in galaxy for %s" % load_data_for_current_species.full_name) # Set or get the history for the current organism load_data_for_current_species.set_get_history() logging.info("Importing datasets into history for %s" % load_data_for_current_species.full_name) load_data_for_current_species.import_datasets_into_history() # load_data_for_current_species.modify_fasta_headers() # logging.info("Successfully formatted files headers %s " % load_data_for_current_species.full_name) # load_data_for_current_species.purge_histories() logging.info("Data loaded and imported for %s" % load_data_for_current_species.full_name) else: logging.critical("The galaxy container for %s is not ready yet!" % load_data_for_current_species.full_name)