#!/usr/bin/python # -*- coding: utf-8 -*- import bioblend import bioblend.galaxy.objects from bioblend import galaxy import argparse import os import subprocess import logging import sys import yaml import re from datetime import datetime """ load_data.py Usage: $ python3 deploy_stacks.py -i example.yml [OPTIONS] """ def parse_input(input_file): """ Parse the yml input file to extract data to create the SpeciesData objects Return a list of dictionaries. Each dictionary contains data tied to a species :param input_file: :return: """ parsed_sp_dict_list = [] if str(input_file).endswith("yml") or str(input_file).endswith("yaml"): logging.debug("Input format used: YAML") else: logging.critical("Error, please input a YAML file") sys.exit() with open(input_file, 'r') as stream: try: yaml_dict = yaml.safe_load(stream) for k, v in yaml_dict.items(): if k == "config": pass parsed_sp_dict_list.append(v) except yaml.YAMLError as exit_code: logging.critical(exit_code + " (YAML input file might be incorrect)") sys.exit() return parsed_sp_dict_list class LoadData: """ Load data from the src_data subfolders into the galaxy instance's history of a given species """ def __init__(self, parameters_dictionary): self.parameters_dictionary = parameters_dictionary self.species = parameters_dictionary["description"]["species"] self.genus = parameters_dictionary["description"]["genus"] self.strain = parameters_dictionary["description"]["strain"] self.sex = parameters_dictionary["description"]["sex"] self.common = parameters_dictionary["description"]["common_name"] self.date = datetime.today().strftime("%Y-%m-%d") self.origin = parameters_dictionary["description"]["origin"] self.performed = parameters_dictionary["data"]["performed_by"] if parameters_dictionary["data"]["genome_version"] == "": self.genome_version = "1.0" else: self.genome_version = parameters_dictionary["data"]["genome_version"] if parameters_dictionary["data"]["ogs_version"] == "": self.ogs_version = "1.0" else: self.ogs_version = parameters_dictionary["data"]["ogs_version"] self.genus_lowercase = self.genus[0].lower() + self.genus[1:] self.genus_uppercase = self.genus[0].upper() + self.genus[1:] self.species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex]) self.full_name = " ".join([self.genus_uppercase, self.species, self.strain, self.sex]) self.abbreviation = " ".join([self.genus_lowercase[0], self.species, self.strain, self.sex]) self.genus_species = self.genus_lowercase + "_" + self.species self.instance_url = "http://scratchgmodv1:8888/sp/" + self.genus_lowercase + "_" + self.species + "/galaxy/" # Testing with localhost/scratchgmodv1 self.instance = None self.history_id = None self.library_id = None self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0])) self.main_dir = None self.species_dir = None self.org_id = None self.genome_analysis_id = None self.ogs_analysis_id = None self.tool_panel = None self.datasets = dict() self.source_files = dict() self.workflow_name = None self.metadata = dict() self.api_key = "master" # TODO: set the key in config file --> saved for later (master api key access actions are limited) if parameters_dictionary["data"]["parent_directory"] == "" or parameters_dictionary["data"]["parent_directory"] == "/path/to/closest/parent/dir": self.source_data_dir = "/projet/sbr/phaeoexplorer/" # Testing path for phaeoexplorer data else: self.source_data_dir = parameters_dictionary["data"]["parent_directory"] # Directory/subdirectories where data files are located (fasta, gff, ...) self.do_update = False # Update the instance (in histories corresponding to the input) instead of creating a new one self.api_key = "master" # API key used to communicate with the galaxy instance. Cannot be used to do user-tied actions self.species_name_regex_litteral = "(?=\w*V)(?=\w*A)(?=\w*R)(?=\w*I)(?=\w*A)(?=\w*B)(?=\w*L)(?=\w*E)\w+" # Placeholder re def modify_fasta_headers(self): """ Change the fasta headers before integration. :return: """ try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.info("Cannot access " + self.species_dir + ", run with higher privileges") logging.info("Fatal error: exit") sys.exit() self.source_files = dict() annotation_dir, genome_dir = None, None for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]: if "annotation/" in d: annotation_dir = d for f in os.listdir(d): if f.endswith("proteins.fasta"): self.source_files["proteins_file"] = os.path.join(d, f) elif f.endswith("transcripts-gff.fa"): self.source_files["transcripts_file"] = os.path.join(d, f) elif f.endswith(".gff"): self.source_files["gff_file"] = os.path.join(d, f) elif "genome/" in d: genome_dir = d for f in os.listdir(d): if f.endswith(".fa"): self.source_files["genome_file"] = os.path.join(d, f) logging.debug("source files found:") for k, v in self.source_files.items(): logging.debug("\t" + k + "\t" + v) # Changing headers in the *proteins.fasta file from >mRNA* to >protein* # production version modify_pep_headers = [str(self.main_dir) + "/gga_load_data/utils/phaeoexplorer-change_pep_fasta_header.sh", self.source_files["proteins_file"]] # test version # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_pep_fasta_header.sh", # self.source_files["proteins_file"]] logging.info("Changing fasta headers: " + self.source_files["proteins_file"]) subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir) # production version modify_pep_headers = [str(self.main_dir) + "/gga_load_data/utils/phaeoexplorer-change_transcript_fasta_header.sh", self.source_files["proteins_file"]] # test version # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_transcript_fasta_header.sh", # self.source_files["proteins_file"]] logging.info("Changing fasta headers: " + self.source_files["transcripts_file"]) subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir) # src_data cleaning if os.path.exists(annotation_dir + "outfile"): subprocess.run(["mv", annotation_dir + "/outfile", self.source_files["proteins_file"]], stdout=subprocess.PIPE, cwd=annotation_dir) if os.path.exists(annotation_dir + "gmon.out"): subprocess.run(["rm", annotation_dir + "/gmon.out"], stdout=subprocess.PIPE, cwd=annotation_dir) def setup_data_libraries(self): """ - generate blast banks and docker-compose - load data into the galaxy container with the galaxy_data_libs_SI.py script :return: """ try: logging.info("Loading data into the galaxy container") subprocess.run("../serexec genus_species_galaxy /tool_deps/_conda/bin/python /opt/galaxy_data_libs_SI.py", shell=True) except subprocess.CalledProcessError: logging.info("Cannot load data into the galaxy container for " + self.full_name) pass else: logging.info("Data successfully loaded into the galaxy container for " + self.full_name) self.get_species_history_id() # self.get_instance_attributes() # # # import all datasets into current history # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"]) def generate_blast_banks(self): """ Automatically generate blast banks for a species and commit :return: """ def connect_to_instance(self): """ Test the connection to the galaxy instance for the current organism Exit if it cannot connect to the instance """ self.instance = galaxy.GalaxyInstance(url=self.instance_url, email="gga@sb-roscoff.fr", password="password", verify=False) logging.info("Connecting to the galaxy instance ...") try: self.instance.histories.get_histories() self.tool_panel = self.instance.tools.get_tool_panel() except bioblend.ConnectionError: logging.critical("Cannot connect to galaxy instance @ " + self.instance_url) sys.exit() else: logging.info("Successfully connected to galaxy instance @ " + self.instance_url) self.instance.histories.create_history(name="FOO") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction " "with galaxy instances for GGA" ", following the protocol @ " "http://gitlab.sb-roscoff.fr/abims/e-infra/gga") parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_false") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) logging.info("Load data: start") sp_dict_list = parse_input(args.input) for sp_dict in sp_dict_list: o = LoadData(parameters_dictionary=sp_dict) o.main_dir = os.path.abspath(args.dir) o.modify_fasta_headers() logging.info("Successfully formatted files headers " + o.genus[0].upper() + ". " + o.species + " " + o.strain + " " + o.sex) # o.setup_data_libraries() # logging.info("Successfully set up data libraries in galaxy for " + o.genus[0].upper() + ". " + o.species + " " + o.strain + " " + o.sex) logging.info("Load data: done")