#!/usr/bin/env python3 import bioblend import bioblend.galaxy.objects from bioblend import galaxy import argparse import os import subprocess import logging import sys import json import yaml import re import metadata_generator import docker_compose_generator import table_parser import fnmatch import shutil from datetime import datetime """ gga_load_data main script Scripted integration of new data into GGA instances. The input is either a table-like (csv, xls, ...) or a json (TODO: yaml) file that describes what data is to be integrated (genus, species, sex, strain, data), see data_example.json for an example of the correct syntax. The script will parse the input and take care of everything, from source files directory tree creation to running the gmod tools inside the galaxy instances of organisms. By default, the script will do everything needed to have a functional instance from scratch. If you want to bypass this behavior, you have to specify --update as a parameter. The script can also be used to update an existing GGA instance with new data. For example, you have an instance "genus_species" with data for the male sex and want to add the female sex to the same GGA instance. To do this, create your configuration input file as you would normally, and add the "--update" argument when invoking the script. TODO: - add config file (inside repo or outside with argument - update existing history - clean/delete instance - delete stack - commit the files for blast banks TODO EOSC/Cloudification: - divide into 2 general-use scripts - create docker swarm, stacks, etc... (docker side) - load data into libraries (method to load it at init, and a method/script to load it separately (galaxy side) (alb: galaxy_data_libs_SI does this already?) STEPS: - read input (yml, maybe xlsx later) - create dir_tree -- DONE - find and copy data -- DONE - change file headers, etc.. (ext scripts for data manipulation) -- IN PROGRESS - generate blast banks and links -- NOT DONE - generate and edit nginx confs -- DONE - generate dc and start the containers -- IN PROGRESS - connect to instance and launch tools>workflows -- IN PROGRESS - generate and update metadata -- IN PROGRESS NOTES: - A master API key cannot be used, as some functions are tied to a user (like creating an history), so the access to the galaxy instance must be done using email and password """ def parse_input(input_file): """ Parse the yml, json or tabulated input in order to set attributes for the Autoload class :param input_file: :return: """ parsed_sp_dict_list = [] if str(input_file).endswith("yml") or str(input_file).endswith("yaml"): logging.debug("Input format used: YAML") else: logging.critical("Error, please input a YAML file") sys.exit() with open(input_file, 'r') as stream: try: yaml_dict = yaml.safe_load(stream) for k, v in yaml_dict.items(): parsed_sp_dict_list.append(v) except yaml.YAMLError as exc: logging.debug(exc) return parsed_sp_dict_list class Autoload: """ The "Autoload" class contains attributes and functions to interact with the galaxy container of the GGA environment """ def __init__(self, parameters_dictionary, args): self.parameters_dictionary = parameters_dictionary self.args = args self.species = parameters_dictionary["description"]["species"] self.genus = parameters_dictionary["description"]["genus"] self.strain = parameters_dictionary["description"]["strain"] self.sex = parameters_dictionary["description"]["sex"] self.common = parameters_dictionary["description"]["common_name"] self.date = datetime.today().strftime("%Y-%m-%d") self.origin = parameters_dictionary["description"]["origin"] self.performed = parameters_dictionary["data"]["performed_by"] if parameters_dictionary["data"]["genome_version"] == "": self.genome_version = "1.0" else: self.genome_version = parameters_dictionary["data"]["genome_version"] if parameters_dictionary["data"]["ogs_version"] == "": self.ogs_version = "1.0" else: self.ogs_version = parameters_dictionary["data"]["ogs_version"] self.genus_lowercase = self.genus[0].lower() + self.genus[1:] self.genus_uppercase = self.genus[0].upper() + self.genus[1:] self.species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex]) self.full_name = " ".join([self.genus_lowercase, self.species, self.strain, self.sex]) self.abbreviation = " ".join([self.genus_lowercase[0], self.species, self.strain, self.sex]) self.genus_species = self.genus_lowercase + "_" + self.species self.instance_url = "http://scratchgmodv1:8888/sp/" + self.genus_lowercase + "_" + self.species + "/galaxy/" # Testing with localhost/scratchgmodv1 self.instance = None self.history_id = None self.library_id = None self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0])) self.main_dir = None self.species_dir = None self.org_id = None self.genome_analysis_id = None self.ogs_analysis_id = None self.tool_panel = None self.datasets = dict() self.source_files = dict() self.workflow_name = None self.docker_compose_generator = None self.metadata = dict() self.api_key = "dev" # TODO: set the key in config file --> saved for later (master api key access actions are limited) if parameters_dictionary["data"]["parent_directory"] == "" or parameters_dictionary["data"]["parent_directory"] == "/path/to/closest/parent/dir": self.source_data_dir = "/projet/sbr/phaeoexplorer/" # Testing path for phaeoexplorer data else: self.source_data_dir = parameters_dictionary["data"]["parent_directory"] # Directory/subdirectories where data files are located (fasta, gff, ...), point to a directory as close as possible to the source files self.do_update = False # Update the instance (in histories corresponding to the input) instead of creating a new one // TODO: move this variable inside methods self.api_key = "dev" # API key used to communicate with the galaxy instance. Set to "dev" for the moment // TODO: find a way to create, store then use the api key safely # def get_source_data(self, max_depth): # """ # TODO: saved for later just in case # # Find and copy source data files to src_data directory tree # - recursively search for the correct files (within a fixed max depth) # - requires the organism src_data directory tree to already be properly created for the organism (run generate_dir_tree) # - the source files must have "transcripts", "proteins"/"pep", "genome" in their name, and a gff extension # # """ # src_data_dir = os.path.join(self.species_dir, "/src_data") # sp_regex = "(?=\w*V)(?=\w*A)(?=\w*R)(?=\w*I)(?=\w*A)(?=\w*B)(?=\w*L)(?=\w*E)\w+" # example with VARIABLE # # # The regex works using the species attribute (unique) --> regex is probably not necessary # sp_regex = "" # for i in self.species: # sp_regex = sp_regex + "?=\w*" + i + ")" # sp_regex = sp_regex + ")\w+" # re_dict = dict() # re_dict["gff"] = None # re_dict["transcripts"] = None # re_dict["proteins"] = None # re_dict["genome"] = None # reg = None # # for dirpath, dirnames, files in os.walk(self.source_data_dir): # for f in files: # if self.species and self.sex in f: # logging.info("File found") def generate_dir_tree(self): """ Generate the directory tree for an organism and move datasets into src_data TODO: DOCKER -- this is the one the "docker" parts of the script :return: """ os.chdir(self.main_dir) self.main_dir = os.getcwd() + "/" self.species_dir = os.path.join(self.main_dir, self.genus_species) + "/" try: os.mkdir(self.species_dir) except FileExistsError: logging.debug("Directory " + self.species_dir + " already exists") try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.critical("Cannot access " + self.species_dir + ", run with higher privileges") sys.exit() try: os.mkdir("./nginx/") os.mkdir("./nginx/conf") with open(os.path.abspath("./nginx/conf/default.conf"), 'w') as conf: conf.write("server {\n\tlisten 80;\n\tserver_name ~.;\n\tlocation /download/ {\n\t\talias /project_data/; \n\t\tautoindex on;\n\t}\n}") # The species nginx conf except FileExistsError: logging.debug("NginX conf exists") # src_data_folders = ["annotation", "genome"] # The directories to generate not_empty_attributes = filter_empty_not_empty_items([self.genus_lowercase, self.species, self.strain, self.sex])["not_empty"] self.species_folder_name = "_".join(not_empty_attributes) # self.species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex]) organism_annotation_dir, organism_genome_dir = None, None # Create src_data dir tree try: os.mkdir("./src_data") os.mkdir("./src_data/annotation") os.mkdir("./src_data/genome") os.mkdir("./src_data/tracks") os.mkdir("./src_data/annotation/" + self.species_folder_name) os.mkdir("./src_data/genome/" + self.species_folder_name) os.mkdir("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.ogs_version) os.mkdir("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) organism_genome_dir = os.path.abspath("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) except FileExistsError: if self.do_update: logging.info("Updating src_data directory tree") else: logging.debug("The src_data directory tree already exists") except PermissionError: logging.critical("Insufficient permission to create src_data directory tree") sys.exit() # Path to the templates used to generate the custom docker-compose files for an input species stack_template_path = self.script_dir + "/templates/stack-organism.yml" traefik_template_path = self.script_dir + "/templates/traefik.yml" authelia_config_path = self.script_dir + "/templates/authelia_config.yml" authelia_users_path = self.script_dir + "/templates/authelia_users.yml" if self.sex and self.strain: genus_species_strain_sex = self.genus.lower() + "_" + self.species + "_" + self.strain + "_" + self.sex else: genus_species_strain_sex = self.genus.lower() + "_" + self.species with open(stack_template_path, 'r') as infile: organism_content = list() for line in infile: # One-liner to replace placeholders by the genus and species organism_content.append( line.replace("genus_species", str(self.genus.lower() + "_" + self.species)).replace("Genus species", str(self.genus + " " + self.species)).replace("Genus/species", str(self.genus + "/" + self.species)).replace("gspecies", str( self.genus.lower()[0] + self.species)).replace("genus_species_strain_sex", genus_species_strain_sex)) with open("./docker-compose.yml", 'w') as outfile: for line in organism_content: outfile.write(line) subprocess.call(["python3", self.script_dir + "/create_mounts.py"], cwd=working_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Create mounts for the containers try: os.mkdir("../traefik") os.mkdir("../traefik/authelia") shutil.copy(authelia_config_path, "../traefik/authelia/configuration.yml") shutil.copy(authelia_users_path, "../traefik/authelia/users.yml") # TODO: custom users (add a config file?) subprocess.call(["python3", self.script_dir + "/create_mounts.py"], cwd=working_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Create mounts for the containers except FileExistsError: logging.debug("Traefik directory already exists") try: shutil.copy(traefik_template_path, "../traefik/docker-compose.yml") except FileExistsError: logging.debug("Traefik compose file already exists") subprocess.call(["python3", self.script_dir + "/create_mounts.py"], cwd=working_dir) def get_source_data_files(self): """ Find all files in source_data directory, to link the matching files in the src_data dir tree :return: """ try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.critical("Cannot access " + self.species_dir + ", run with higher privileges") sys.exit() organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) organism_genome_dir = os.path.abspath("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) for dirpath, dirnames, files in os.walk(self.source_data_dir): if "0" in str(dirpath): # Ensures to take the correct files (other dirs hold files with the correct names, but I don't know if they are the same) #alb for f in files: if "Contaminants" not in str(f): try: if fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".fa"): logging.info("Genome assembly file - " + str(f)) organism_genome_dir = organism_genome_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_genome_dir) organism_genome_dir = os.path.abspath("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".gff"): logging.info("GFF file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_transcripts-gff.fa"): logging.info("Transcripts file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_proteins.fa"): logging.info("Proteins file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) except FileExistsError: logging.warning("Error raised (FileExistsError)") except TypeError: logging.warning("Error raised (TypeError)") except NotADirectoryError: logging.warning("Error raised (NotADirectoryError)") def deploy_stack(self): """ Call the script "deploy.sh" used to initiliaze the swarm cluster if needed and launch/update the stack :return: """ # Launch and update docker stacks (cf docs) # TODO: add a fail condition? subprocess.call(["sh", self.script_dir + "/deploy.sh", self.genus_species, self.main_dir + "/traefik"]) def modify_fasta_headers(self): """ Change the fasta headers before integration. :return: """ try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.info("Cannot access " + self.species_dir + ", run with higher privileges") logging.info("Fatal error: exit") sys.exit() self.source_files = dict() annotation_dir, genome_dir = None, None for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]: if "annotation/" in d: annotation_dir = d for f in os.listdir(d): if f.endswith("proteins.fasta"): self.source_files["proteins_file"] = os.path.join(d, f) elif f.endswith("transcripts-gff.fa"): self.source_files["transcripts_file"] = os.path.join(d, f) elif f.endswith(".gff"): self.source_files["gff_file"] = os.path.join(d, f) elif "genome/" in d: genome_dir = d for f in os.listdir(d): if f.endswith(".fa"): self.source_files["genome_file"] = os.path.join(d, f) logging.debug("source files found:") for k, v in self.source_files.items(): logging.debug("\t" + k + "\t" + v) # Changing headers in the *proteins.fasta file from >mRNA* to >protein* # production version modify_pep_headers = [str(self.main_dir) + "/gga_load_data/ext_scripts/phaeoexplorer-change_pep_fasta_header.sh", self.source_files["proteins_file"]] # test version # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_pep_fasta_header.sh", # self.source_files["proteins_file"]] logging.info("Changing fasta headers: " + self.source_files["proteins_file"]) subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir) # production version modify_pep_headers = [str(self.main_dir) + "/gga_load_data/ext_scripts/phaeoexplorer-change_transcript_fasta_header.sh", self.source_files["proteins_file"]] # test version # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_transcript_fasta_header.sh", # self.source_files["proteins_file"]] logging.info("Changing fasta headers: " + self.source_files["transcripts_file"]) subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir) # src_data cleaning if os.path.exists(annotation_dir + "outfile"): subprocess.run(["mv", annotation_dir + "/outfile", self.source_files["proteins_file"]], stdout=subprocess.PIPE, cwd=annotation_dir) if os.path.exists(annotation_dir + "gmon.out"): subprocess.run(["rm", annotation_dir + "/gmon.out"], stdout=subprocess.PIPE, cwd=annotation_dir) def generate_blast_banks(self): """ TODO Automatically generate blast banks for a species TODO: auto commit the files? :return: """ # @commit_files def generate_blast_banks_and_commit(self): """ TODO :return: """ return None def commit_files(self): """ TODO Commit files to a git repo Commits to the gga repo for phaeoexplorer TODO: add repo to config file :return: """ return None def connect_to_instance(self): """ TODO: move in init/access Test the connection to the galaxy instance for the current organism Exit if it cannot connect to the instance """ self.instance = galaxy.GalaxyInstance(url=self.instance_url, email="admin@galaxy.org", password="password", verify=False) logging.info("Connecting to the galaxy instance ...") try: self.instance.histories.get_histories() self.tool_panel = self.instance.tools.get_tool_panel() except bioblend.ConnectionError: logging.critical("Cannot connect to galaxy instance @ " + self.instance_url) sys.exit() else: logging.info("Successfully connected to galaxy instance @ " + self.instance_url) self.instance.histories.create_history(name=str(self.full_name)) def setup_data_libraries(self): """ - generate blast banks and docker-compose - load data into the galaxy container with the galaxy_data_libs_SI.py script :return: """ try: logging.info("Loading data into the galaxy container") subprocess.run("../serexec genus_species_galaxy /tool_deps/_conda/bin/python /opt/setup_data_libraries.py", shell=True) except subprocess.CalledProcessError: logging.info("Cannot load data into the galaxy container for " + self.full_name) pass else: logging.info("Data successfully loaded into the galaxy container for " + self.full_name) self.get_species_history_id() # self.get_instance_attributes() # # # import all datasets into current history # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"]) def get_species_history_id(self): """ Set and return the current species history id in its galaxy instance :return: """ histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] self.instance.histories.show_history(history_id=self.history_id) return self.history_id def create_species_history(self): histories = self.instance.histories.get_histories(name=str(self.full_name)) print("\n" + str(histories) + "\n" + self.full_name + "\n") if not histories: self.instance.histories.create_history(name=str(self.full_name)) print("Created history!") def get_instance_attributes(self): """ retrieves instance attributes: - working history ID - libraries ID (there should only be one library!) - datasets IDs :return: """ histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.debug("history ID: " + self.history_id) libraries = self.instance.libraries.get_libraries() # normally only one library self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library logging.debug("library ID: " + self.history_id) instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) folders_ids = {} current_folder_name = "" for i in instance_source_data_folders: for k, v in i.items(): if k == "name": folders_ids[v] = 0 current_folder_name = v if k == "id": folders_ids[current_folder_name] = v logging.info("Folders and datasets IDs: ") self.datasets = dict() for k, v in folders_ids.items(): logging.info("\t" + k + ": " + v) if k == "/genome": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: if e["name"].endswith(".fa"): self.datasets["genome_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif k == "/annotation/" + self.genus_species: sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: # TODO: manage several files of the same type and manage versions if e["name"].endswith("transcripts-gff.fa"): self.datasets["transcripts_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("proteins.fasta"): self.datasets["proteins_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith(".gff"): self.datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("MALE"): self.datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) def init_instance(self): """ Galaxy instance startup in preparation for running workflows - remove Homo sapiens from the chado database. - add organism and analyses into the chado database --> separate - get any other existing organisms IDs before updating the galaxy instance --> separate TODO: move the library and analysis/data stuff to a separate function :return: """ self.connect_to_instance() self.get_species_history_id() histories = self.instance.histories.get_histories(name=str(self.full_name)) # Create the first history if not histories: self.instance.histories.create_history(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.debug("history ID: " + self.history_id) # libraries = self.instance.libraries.get_libraries() # routine check: one library # self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library logging.debug("library ID: " + self.history_id) instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) # Delete Homo sapiens from Chado database logging.debug("Getting 'Homo sapiens' ID in instance's chado database") get_sapiens_id_job = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": "Homo", "species": "sapiens"}) get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"] get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output) try: logging.debug("Deleting Homo 'sapiens' in the instance's chado database") get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0] sapiens_id = str( get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2", history_id=self.history_id, tool_inputs={"organism": str(sapiens_id)}) except bioblend.ConnectionError: logging.debug("Homo sapiens isn't in the instance's chado database") except IndexError: logging.debug("Homo sapiens isn't in the instance's chado database") pass # TODO: the following actions should be done in a separate function (in case if the user wants to do everything him/herself -- for EOSC) # Add organism (species) to chado logging.info("Adding organism to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus, "species": self.species, "common": self.common}) # Add OGS analysis to chado logging.info("Adding OGS analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version, "program": "Performed by Genoscope", "programversion": str("OGS" + self.ogs_version), "sourcename": "Genoscope", "date_executed": self.date}) # Add genome analysis to chado logging.info("Adding genome analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version, "program": "Performed by Genoscope", "programversion": str("genome v" + self.genome_version), "sourcename": "Genoscope", "date_executed": self.date}) self.get_organism_and_analyses_ids() logging.info("Finished initializing instance") def run_workflow(self, workflow_name, workflow_parameters, datamap): """ Run the "main" workflow in the galaxy instance - import data to library - load fasta and gff - sync with tripal - add jbrowse + organism - fill in the tripal views TODO: map tool name to step id :param workflow_name: :param workflow_parameters: :param datamap: :return: """ logging.debug("running workflow: " + str(workflow_name)) workflow_ga_file = self.main_dir + "Galaxy-Workflow-" + workflow_name + ".ga" if self.strain != "": custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) else: custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) with open(workflow_ga_file, 'r') as ga_in_file: workflow = str(ga_in_file.readlines()) # ugly fix for the jbrowse parameters workflow = workflow.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}', str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"') workflow = workflow.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"', str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"') workflow = workflow.replace("\\\\", "\\") # to restore the correct amount of backslashes in the workflow string before import # test workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', "http://localhost/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") # production # workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', # "http://abims--gga.sb-roscoff.fr/sp/" + self.genus_lowercase + "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") workflow = workflow[2:-2] # if the line under doesn't output a correct json # workflow = workflow[:-2] # if the line above doesn't output a correct json workflow_dict = json.loads(workflow) self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) self.workflow_name = workflow_name workflow_attributes = self.instance.workflows.get_workflows(name=self.workflow_name) workflow_id = workflow_attributes[0]["id"] show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id) logging.debug("Workflow ID: " + workflow_id) logging.debug("Inputs:") logging.debug(show_workflow["Inputs"]) self.instance.workflows.invoke_workflow(workflow_id=workflow_id, history_id=self.history_id, params=workflow_parameters, inputs=datamap, inputs_by="") self.instance.workflows.delete_workflow(workflow_id=workflow_id) def load_data_in_galaxy(self): """ Function to load the src_data folder in galaxy :return: """ logging.info("Loading data in galaxy") return None def get_organism_and_analyses_ids(self): """ Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado doesn't accept organism/analyses names as valid inputs :return: """ # Get the ID for the current organism in chado org = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": self.genus, "species": self.species}) org_job_out = org["outputs"][0]["id"] org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out) try: org_output = json.loads(org_json_output)[0] self.org_id = str(org_output["organism_id"]) # id needs to be a str to be recognized by chado tools except IndexError: logging.debug("no organism matching " + self.full_name + " exists in the instance's chado database") # Get the ID for the OGS analysis in chado ogs_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version}) ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"] ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out) try: ogs_analysis_output = json.loads(ogs_analysis_json_output)[0] self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching OGS analysis exists in the instance's chado database") # Get the ID for the genome analysis in chado genome_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version}) genome_analysis_job_out = genome_analysis["outputs"][0]["id"] genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out) try: genome_analysis_output = json.loads(genome_analysis_json_output)[0] self.genome_analysis_id = str(genome_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching genome analysis exists in the instance's chado database") def clean_instance(self): """ TODO: method to purge the instance from analyses and organisms :return: """ return None def filter_empty_not_empty_items(li): ret = {"empty": [], "not_empty": []} for i in li: if i is None or i == "": ret["empty"].append(i) else: ret["not_empty"].append(i) return ret if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction with galaxy instances for GGA" ", following the protocol @ " "http://gitlab.sb-roscoff.fr/abims/e-infra/gga") # Dev arguments, TODO: remove in production branch! parser.add_argument("--full", help="Run everything, from src_data dir tree creation, moving data files (abims) into src_data," "modify headers (abims), generate blast banks (doesn't commit them: TODO), initialize GGA instance, load the data and run," " the main workflow. To update/add data to container, use --update in conjunction to --full (TODO)") parser.add_argument("--init-instance", help="Initialization of galaxy instance. Run first in an empty instance, DEV", action="store_true") parser.add_argument("--load-data", help="Create src_data directory tree, copy datasets to src_data, and load these datasets into the instance, DEV", action="store_true") parser.add_argument("--run-main", help="Run main workflow (load data into chado, sync all with tripal, " "index tripal data, populate materialized view, " "create a jbrowse for the current genus_species_strain_sex and add organism to jbrowse") parser.add_argument("--generate-docker-compose", help="Generate docker-compose.yml for current species, DEV") parser.add_argument("--link-source", help="Find source files in source data dir and copy them to src_data, DEV, OBSOLETE", action="store_true") # Production arguments parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_false") parser.add_argument("--update", help="Update an already integrated organisms with new data from input file, docker-compose.yml will not be re-generated" ", assuming the instances for the organisms are already generated and initialized", action="store_false") parser.add_argument("--dir", help="Path of the main directory, either absolute or relative, defaults to current directory", default=os.getcwd()) args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) logging.info("Start") sp_dict_list = parse_input(args.input) for sp_dict in sp_dict_list: al = Autoload(parameters_dictionary=sp_dict, args=args) al.main_dir = os.path.abspath(args.dir) if args.load_data: al.generate_dir_tree() logging.info("Successfully generated the directory tree for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) al.get_source_data_files() logging.info("Successfully retrieved source data files for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) al.deploy_stack() logging.info("Successfully deployed containers stack for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # al.connect_to_instance() # logging.info("Connected to instance") # # al.create_species_history() # logging.info("Created a history") # # al.setup_data_libraries() # logging.info("Setting up data libraries") # al.init_instance() # logging.info("Successfully initialized instance for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # al.setup_data_libraries() # logging.info("Successfully set up data libraries in galaxy for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # if args.init_instance: # logging.info(" Initializing the galaxy instance") # al.init_instance() # al.get_instance_attributes() # # metadata[genus_species_strain_sex]["initialized"] = True # if args.load_data: # logging.info("Loading data into galaxy") # # al.load_data() # # metadata[genus_species_strain_sex]["data_loaded_in_instance"] = True # if args.run_main: # logging.info("Running main workflow") # al.get_organism_and_analyses_ids() # workflow_parameters = dict() # workflow_parameters["0"] = {} # workflow_parameters["1"] = {} # workflow_parameters["2"] = {} # workflow_parameters["3"] = {} # workflow_parameters["4"] = {"organism": al.org_id, # "analysis_id": al.genome_analysis_id, # "do_update": "true"} # workflow_parameters["5"] = {"organism": al.org_id, # "analysis_id": al.ogs_analysis_id} # workflow_parameters["6"] = {"organism_id": al.org_id} # workflow_parameters["7"] = {"analysis_id": al.ogs_analysis_id} # workflow_parameters["8"] = {"analysis_id": al.genome_analysis_id} # workflow_parameters["9"] = {"organism_id": al.org_id} # workflow_parameters["10"] = {} # workflow_parameters["11"] = {} # # al.datamap = dict() # al.datamap["0"] = {"src": "hda", "id": al.datasets["genome_file"]} # al.datamap["1"] = {"src": "hda", "id": al.datasets["gff_file"]} # al.datamap["2"] = {"src": "hda", "id": al.datasets["proteins_file"]} # al.datamap["3"] = {"src": "hda", "id": al.datasets["transcripts_file"]} # # al.run_workflow(workflow_name="main", workflow_parameters=workflow_parameters, datamap=al.datamap) # # metadata[genus_species_strain_sex]["workflows_run"] = metadata[genus_species_strain_sex]["workflows_run"].append("main") # # if args.link_source: # print('DEV') # al.generate_dir_tree() # print(al.main_dir) # print(al.species_dir) logging.info("Exit")