#!/usr/bin/python # -*- coding: utf-8 -*- import bioblend import bioblend.galaxy.objects from bioblend import galaxy import argparse import os import subprocess import logging import sys import json import yaml import re import metadata_generator import docker_compose_generator import table_parser import fnmatch import shutil from datetime import datetime import create_input_instance """ deploy_stacks.py TODO: - add config file (inside repo or outside with argument - update existing history - clean/delete instance? - delete stack? - commit the files for blast banks. TODO EOSC/Cloudification: - divide into 2 general-use scripts - create docker swarm, stacks, etc... (docker side) - load data into libraries (method to load it at init, and a method/script to load it separately (galaxy side) (alb: galaxy_data_libs_SI does this already?) STEPS: - read input (yml, maybe xlsx later) - create dir_tree -- DONE - find and copy data -- DONE - change file headers, etc.. (ext scripts for data manipulation) -- IN PROGRESS - generate blast banks and links -- NOT DONE - generate and edit nginx confs -- DONE - generate dc and start the containers -- IN PROGRESS - connect to instance and launch tools>workflows -- IN PROGRESS - generate and update metadata -- IN PROGRESS NOTES: - A master API key cannot be used, as some functions are tied to a user (like creating an history), so the access to the galaxy instance must be done using email and password (definable in yml_example_input.yml) """ def parse_input(input_file): """ Parse the yml, json or tabulated input in order to set attributes for the Autoload class :param input_file: :return: """ parsed_sp_dict_list = [] if str(input_file).endswith("yml") or str(input_file).endswith("yaml"): logging.debug("Input format used: YAML") else: logging.critical("Error, please input a YAML file") sys.exit() with open(input_file, 'r') as stream: try: yaml_dict = yaml.safe_load(stream) for k, v in yaml_dict.items(): parsed_sp_dict_list.append(v) except yaml.YAMLError as exc: logging.debug(exc) return parsed_sp_dict_list class DeploySpeciesStacks: """ The class DeploySpeciesStacks """ def __init__(self, parameters_dictionary, args): self.parameters_dictionary = parameters_dictionary self.args = args self.species = parameters_dictionary["description"]["species"] self.genus = parameters_dictionary["description"]["genus"] self.strain = parameters_dictionary["description"]["strain"] self.sex = parameters_dictionary["description"]["sex"] self.common = parameters_dictionary["description"]["common_name"] self.date = datetime.today().strftime("%Y-%m-%d") self.origin = parameters_dictionary["description"]["origin"] self.performed = parameters_dictionary["data"]["performed_by"] if parameters_dictionary["data"]["genome_version"] == "": self.genome_version = "1.0" else: self.genome_version = parameters_dictionary["data"]["genome_version"] if parameters_dictionary["data"]["ogs_version"] == "": self.ogs_version = "1.0" else: self.ogs_version = parameters_dictionary["data"]["ogs_version"] self.genus_lowercase = self.genus[0].lower() + self.genus[1:] self.genus_uppercase = self.genus[0].upper() + self.genus[1:] self.species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex]) self.full_name = " ".join([self.genus_uppercase, self.species, self.strain, self.sex]) self.abbreviation = " ".join([self.genus_lowercase[0], self.species, self.strain, self.sex]) self.genus_species = self.genus_lowercase + "_" + self.species self.instance_url = "http://scratchgmodv1:8888/sp/" + self.genus_lowercase + "_" + self.species + "/galaxy/" # Testing with localhost/scratchgmodv1 self.instance = None self.history_id = None self.library_id = None self.script_dir = os.path.dirname(os.path.realpath(sys.argv[0])) self.main_dir = None self.species_dir = None self.org_id = None self.genome_analysis_id = None self.ogs_analysis_id = None self.tool_panel = None self.datasets = dict() self.source_files = dict() self.workflow_name = None self.docker_compose_generator = None self.metadata = dict() self.api_key = "dev" # TODO: set the key in config file --> saved for later (master api key access actions are limited) if parameters_dictionary["data"]["parent_directory"] == "" or parameters_dictionary["data"]["parent_directory"] == "/path/to/closest/parent/dir": self.source_data_dir = "/projet/sbr/phaeoexplorer/" # Testing path for phaeoexplorer data else: self.source_data_dir = parameters_dictionary["data"]["parent_directory"] # Directory/subdirectories where data files are located (fasta, gff, ...), point to a directory as close as possible to the source files self.do_update = False # Update the instance (in histories corresponding to the input) instead of creating a new one // TODO: move this variable inside methods self.api_key = "dev" # API key used to communicate with the galaxy instance. Set to "dev" for the moment // TODO: find a way to create, store then use the api key safely # def get_source_data(self, max_depth): # """ # TODO: saved for later just in case # # Find and copy source data files to src_data directory tree # - recursively search for the correct files (within a fixed max depth) # - requires the organism src_data directory tree to already be properly created for the organism (run generate_dir_tree) # - the source files must have "transcripts", "proteins"/"pep", "genome" in their name, and a gff extension # # """ # src_data_dir = os.path.join(self.species_dir, "/src_data") # sp_regex = "(?=\w*V)(?=\w*A)(?=\w*R)(?=\w*I)(?=\w*A)(?=\w*B)(?=\w*L)(?=\w*E)\w+" # example with VARIABLE # # # The regex works using the species attribute (unique) --> regex is probably not necessary # sp_regex = "" # for i in self.species: # sp_regex = sp_regex + "?=\w*" + i + ")" # sp_regex = sp_regex + ")\w+" # re_dict = dict() # re_dict["gff"] = None # re_dict["transcripts"] = None # re_dict["proteins"] = None # re_dict["genome"] = None # reg = None # # for dirpath, dirnames, files in os.walk(self.source_data_dir): # for f in files: # if self.species and self.sex in f: # logging.info("File found") def generate_dir_tree(self): """ Generate the directory tree for an organism and move datasets into src_data TODO: DOCKER -- this is the one the "docker" parts of the script :return: """ os.chdir(self.main_dir) self.main_dir = os.getcwd() + "/" self.species_dir = os.path.join(self.main_dir, self.genus_species) + "/" try: os.mkdir(self.species_dir) except FileExistsError: logging.debug("Directory " + self.species_dir + " already exists") try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.critical("Cannot access " + self.species_dir + ", run with higher privileges") sys.exit() try: os.mkdir("./nginx/") os.mkdir("./nginx/conf") with open(os.path.abspath("./nginx/conf/default.conf"), 'w') as conf: conf.write("server {\n\tlisten 80;\n\tserver_name ~.;\n\tlocation /download/ {\n\t\talias /project_data/; \n\t\tautoindex on;\n\t}\n}") # The species nginx conf except FileExistsError: logging.debug("NginX conf exists") # src_data_folders = ["annotation", "genome"] # The directories to generate not_empty_attributes = filter_empty_not_empty_items([self.genus_lowercase, self.species, self.strain, self.sex])["not_empty"] self.species_folder_name = "_".join(not_empty_attributes) # self.species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex]) organism_annotation_dir, organism_genome_dir = None, None # Create src_data dir tree try: os.mkdir("./src_data") os.mkdir("./src_data/annotation") os.mkdir("./src_data/genome") os.mkdir("./src_data/tracks") os.mkdir("./src_data/annotation/" + self.species_folder_name) os.mkdir("./src_data/genome/" + self.species_folder_name) os.mkdir("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.ogs_version) os.mkdir("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) organism_genome_dir = os.path.abspath("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) except FileExistsError: if self.do_update: logging.info("Updating src_data directory tree") else: logging.debug("The src_data directory tree already exists") except PermissionError: logging.critical("Insufficient permission to create src_data directory tree") sys.exit() # Path to the templates used to generate the custom docker-compose files for an input species stack_template_path = self.script_dir + "/templates/stack-organism.yml" traefik_template_path = self.script_dir + "/templates/traefik.yml" authelia_config_path = self.script_dir + "/templates/authelia_config.yml" authelia_users_path = self.script_dir + "/templates/authelia_users.yml" if self.sex and self.strain: genus_species_strain_sex = self.genus.lower() + "_" + self.species + "_" + self.strain + "_" + self.sex else: genus_species_strain_sex = self.genus.lower() + "_" + self.species with open(stack_template_path, 'r') as infile: organism_content = list() for line in infile: # One-liner to replace placeholders by the genus and species organism_content.append( line.replace("genus_species", str(self.genus.lower() + "_" + self.species)).replace("Genus species", str(self.genus_uppercase + " " + self.species)).replace("Genus/species", str(self.genus_uppercase + "/" + self.species)).replace("gspecies", str( self.genus.lower()[0] + self.species)).replace("genus_species_strain_sex", genus_species_strain_sex)) with open("./docker-compose.yml", 'w') as outfile: for line in organism_content: outfile.write(line) subprocess.call(["python3", self.script_dir + "/create_mounts.py"], cwd=working_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Create mounts for the containers try: os.mkdir("../traefik") os.mkdir("../traefik/authelia") shutil.copy(authelia_config_path, "../traefik/authelia/configuration.yml") shutil.copy(authelia_users_path, "../traefik/authelia/users.yml") # TODO: custom users (add a config file?) subprocess.call(["python3", self.script_dir + "/create_mounts.py"], cwd=working_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Create mounts for the containers except FileExistsError: logging.debug("Traefik directory already exists") try: shutil.copy(traefik_template_path, "../traefik/docker-compose.yml") except FileExistsError: logging.debug("Traefik compose file already exists") subprocess.call(["python3", self.script_dir + "/create_mounts.py"], cwd=working_dir) def get_source_data_files_from_path(self): """ Find all files in source_data directory, to link the matching files in the src_data dir tree :return: """ try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.critical("Cannot access " + self.species_dir + ", run with higher privileges") sys.exit() organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) organism_genome_dir = os.path.abspath("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) for dirpath, dirnames, files in os.walk(self.source_data_dir): if "0" in str(dirpath): # Ensures to take the correct files (other dirs hold files with the correct names, but I don't know if they are the same) #alb for f in files: if "Contaminants" not in str(f): try: if fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".fa"): logging.info("Genome assembly file - " + str(f)) organism_genome_dir = organism_genome_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_genome_dir) organism_genome_dir = os.path.abspath("./src_data/genome/" + self.species_folder_name + "/v" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".gff"): logging.info("GFF file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_transcripts-gff.fa"): logging.info("Transcripts file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_proteins.fa"): logging.info("Proteins file - " + str(f)) organism_annotation_dir = organism_annotation_dir + "/" + f os.symlink(os.path.join(dirpath, f), organism_annotation_dir) organism_annotation_dir = os.path.abspath("./src_data/annotation/" + self.species_folder_name + "/OGS" + self.genome_version) except FileExistsError: logging.warning("Error raised (FileExistsError)") except TypeError: logging.warning("Error raised (TypeError)") except NotADirectoryError: logging.warning("Error raised (NotADirectoryError)") def deploy_stack(self): """ Call the script "deploy.sh" used to initiliaze the swarm cluster if needed and launch/update the stack :return: """ # Launch and update docker stacks (cf docs) # TODO: add a fail condition? subprocess.call(["sh", self.script_dir + "/deploy.sh", self.genus_species, self.main_dir + "/traefik"]) def modify_fasta_headers(self): """ Change the fasta headers before integration. :return: """ try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.info("Cannot access " + self.species_dir + ", run with higher privileges") logging.info("Fatal error: exit") sys.exit() self.source_files = dict() annotation_dir, genome_dir = None, None for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]: if "annotation/" in d: annotation_dir = d for f in os.listdir(d): if f.endswith("proteins.fasta"): self.source_files["proteins_file"] = os.path.join(d, f) elif f.endswith("transcripts-gff.fa"): self.source_files["transcripts_file"] = os.path.join(d, f) elif f.endswith(".gff"): self.source_files["gff_file"] = os.path.join(d, f) elif "genome/" in d: genome_dir = d for f in os.listdir(d): if f.endswith(".fa"): self.source_files["genome_file"] = os.path.join(d, f) logging.debug("source files found:") for k, v in self.source_files.items(): logging.debug("\t" + k + "\t" + v) # Changing headers in the *proteins.fasta file from >mRNA* to >protein* # production version modify_pep_headers = [str(self.main_dir) + "/gga_load_data/ext_scripts/phaeoexplorer-change_pep_fasta_header.sh", self.source_files["proteins_file"]] # test version # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_pep_fasta_header.sh", # self.source_files["proteins_file"]] logging.info("Changing fasta headers: " + self.source_files["proteins_file"]) subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir) # production version modify_pep_headers = [str(self.main_dir) + "/gga_load_data/ext_scripts/phaeoexplorer-change_transcript_fasta_header.sh", self.source_files["proteins_file"]] # test version # modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_transcript_fasta_header.sh", # self.source_files["proteins_file"]] logging.info("Changing fasta headers: " + self.source_files["transcripts_file"]) subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir) # src_data cleaning if os.path.exists(annotation_dir + "outfile"): subprocess.run(["mv", annotation_dir + "/outfile", self.source_files["proteins_file"]], stdout=subprocess.PIPE, cwd=annotation_dir) if os.path.exists(annotation_dir + "gmon.out"): subprocess.run(["rm", annotation_dir + "/gmon.out"], stdout=subprocess.PIPE, cwd=annotation_dir) def generate_blast_banks(self): """ TODO Automatically generate blast banks for a species TODO: auto commit the files? :return: """ def connect_to_instance(self): """ TODO: move in init/access TODO: password Test the connection to the galaxy instance for the current organism Exit if it cannot connect to the instance """ self.instance = galaxy.GalaxyInstance(url=self.instance_url, email="gga@sb-roscoff.fr", password="password", verify=False) logging.info("Connecting to the galaxy instance ...") try: self.instance.histories.get_histories() self.tool_panel = self.instance.tools.get_tool_panel() except bioblend.ConnectionError: logging.critical("Cannot connect to galaxy instance @ " + self.instance_url) sys.exit() else: logging.info("Successfully connected to galaxy instance @ " + self.instance_url) self.instance.histories.create_history(name="FOO") def setup_data_libraries(self): """ - generate blast banks and docker-compose - load data into the galaxy container with the galaxy_data_libs_SI.py script :return: """ try: logging.info("Loading data into the galaxy container") subprocess.run("../serexec genus_species_galaxy /tool_deps/_conda/bin/python /opt/setup_data_libraries.py", shell=True) except subprocess.CalledProcessError: logging.info("Cannot load data into the galaxy container for " + self.full_name) pass else: logging.info("Data successfully loaded into the galaxy container for " + self.full_name) self.get_species_history_id() # self.get_instance_attributes() # # # import all datasets into current history # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["genome_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["gff_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["transcripts_file"]) # self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=self.datasets["proteins_file"]) def get_species_history_id(self): """ Set and return the current species history id in its galaxy instance :return: """ histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] self.instance.histories.show_history(history_id=self.history_id) return self.history_id def create_species_history(self): histories = self.instance.histories.get_histories(name=str(self.full_name)) print("\n" + str(histories) + "\n" + self.full_name + "\n") if not histories: self.instance.histories.create_history(name="FOO") print("Created history!") def get_instance_attributes(self): """ retrieves instance attributes: - working history ID - libraries ID (there should only be one library!) - datasets IDs :return: """ histories = self.instance.histories.get_histories(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.debug("history ID: " + self.history_id) libraries = self.instance.libraries.get_libraries() # normally only one library self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library logging.debug("library ID: " + self.history_id) instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) folders_ids = {} current_folder_name = "" for i in instance_source_data_folders: for k, v in i.items(): if k == "name": folders_ids[v] = 0 current_folder_name = v if k == "id": folders_ids[current_folder_name] = v logging.info("Folders and datasets IDs: ") self.datasets = dict() for k, v in folders_ids.items(): logging.info("\t" + k + ": " + v) if k == "/genome": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: if e["name"].endswith(".fa"): self.datasets["genome_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif k == "/annotation/" + self.genus_species: sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: # TODO: manage several files of the same type and manage versions if e["name"].endswith("transcripts-gff.fa"): self.datasets["transcripts_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("proteins.fasta"): self.datasets["proteins_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith(".gff"): self.datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("MALE"): self.datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) def init_instance(self): """ Galaxy instance startup in preparation for running workflows - remove Homo sapiens from the chado database. - add organism and analyses into the chado database --> separate - get any other existing organisms IDs before updating the galaxy instance --> separate TODO: move the library and analysis/data stuff to a separate function :return: """ self.connect_to_instance() self.get_species_history_id() histories = self.instance.histories.get_histories(name=str(self.full_name)) # Create the first history if not histories: self.instance.histories.create_history(name=str(self.full_name)) self.history_id = histories[0]["id"] logging.debug("history ID: " + self.history_id) # libraries = self.instance.libraries.get_libraries() # routine check: one library # self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library logging.debug("library ID: " + self.history_id) instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) # Delete Homo sapiens from Chado database logging.debug("Getting 'Homo sapiens' ID in instance's chado database") get_sapiens_id_job = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": "Homo", "species": "sapiens"}) get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"] get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output) try: logging.debug("Deleting Homo 'sapiens' in the instance's chado database") get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0] sapiens_id = str( get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2", history_id=self.history_id, tool_inputs={"organism": str(sapiens_id)}) except bioblend.ConnectionError: logging.debug("Homo sapiens isn't in the instance's chado database") except IndexError: logging.debug("Homo sapiens isn't in the instance's chado database") pass # TODO: the following actions should be done in a separate function (in case if the user wants to do everything him/herself -- for EOSC) # Add organism (species) to chado logging.info("Adding organism to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus, "species": self.species, "common": self.common}) # Add OGS analysis to chado logging.info("Adding OGS analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version, "program": "Performed by Genoscope", "programversion": str("OGS" + self.ogs_version), "sourcename": "Genoscope", "date_executed": self.date}) # Add genome analysis to chado logging.info("Adding genome analysis to the instance's chado database") self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version, "program": "Performed by Genoscope", "programversion": str("genome v" + self.genome_version), "sourcename": "Genoscope", "date_executed": self.date}) self.get_organism_and_analyses_ids() logging.info("Finished initializing instance") def run_workflow(self, workflow_name, workflow_parameters, datamap): """ Run the "main" workflow in the galaxy instance - import data to library - load fasta and gff - sync with tripal - add jbrowse + organism - fill in the tripal views TODO: map tool name to step id :param workflow_name: :param workflow_parameters: :param datamap: :return: """ logging.debug("running workflow: " + str(workflow_name)) workflow_ga_file = self.main_dir + "Galaxy-Workflow-" + workflow_name + ".ga" if self.strain != "": custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) else: custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) with open(workflow_ga_file, 'r') as ga_in_file: workflow = str(ga_in_file.readlines()) # ugly fix for the jbrowse parameters workflow = workflow.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}', str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"') workflow = workflow.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"', str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"') workflow = workflow.replace("\\\\", "\\") # to restore the correct amount of backslashes in the workflow string before import # test workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', "http://localhost/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") # production # workflow = workflow.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', # "http://abims--gga.sb-roscoff.fr/sp/" + self.genus_lowercase + "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") workflow = workflow[2:-2] # if the line under doesn't output a correct json # workflow = workflow[:-2] # if the line above doesn't output a correct json workflow_dict = json.loads(workflow) self.instance.workflows.import_workflow_dict(workflow_dict=workflow_dict) self.workflow_name = workflow_name workflow_attributes = self.instance.workflows.get_workflows(name=self.workflow_name) workflow_id = workflow_attributes[0]["id"] show_workflow = self.instance.workflows.show_workflow(workflow_id=workflow_id) logging.debug("Workflow ID: " + workflow_id) logging.debug("Inputs:") logging.debug(show_workflow["Inputs"]) self.instance.workflows.invoke_workflow(workflow_id=workflow_id, history_id=self.history_id, params=workflow_parameters, inputs=datamap, inputs_by="") self.instance.workflows.delete_workflow(workflow_id=workflow_id) def load_data_in_galaxy(self): """ Function to load the src_data folder in galaxy :return: """ logging.info("Loading data in galaxy") return None def get_organism_and_analyses_ids(self): """ Retrieve current organism ID and OGS and genome chado analyses IDs (needed to run some tools as Tripal/Chado doesn't accept organism/analyses names as valid inputs :return: """ # Get the ID for the current organism in chado org = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": self.genus, "species": self.species}) org_job_out = org["outputs"][0]["id"] org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out) try: org_output = json.loads(org_json_output)[0] self.org_id = str(org_output["organism_id"]) # id needs to be a str to be recognized by chado tools except IndexError: logging.debug("no organism matching " + self.full_name + " exists in the instance's chado database") # Get the ID for the OGS analysis in chado ogs_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version}) ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"] ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out) try: ogs_analysis_output = json.loads(ogs_analysis_json_output)[0] self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching OGS analysis exists in the instance's chado database") # Get the ID for the genome analysis in chado genome_analysis = self.instance.tools.run_tool( tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version}) genome_analysis_job_out = genome_analysis["outputs"][0]["id"] genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out) try: genome_analysis_output = json.loads(genome_analysis_json_output)[0] self.genome_analysis_id = str(genome_analysis_output["analysis_id"]) except IndexError: logging.debug("no matching genome analysis exists in the instance's chado database") def clean_instance(self): """ TODO: method to purge the instance from analyses and organisms :return: """ return None def filter_empty_not_empty_items(li): ret = {"empty": [], "not_empty": []} for i in li: if i is None or i == "": ret["empty"].append(i) else: ret["not_empty"].append(i) return ret if __name__ == "__main__": parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction with galaxy instances for GGA" ", following the protocol @ " "http://gitlab.sb-roscoff.fr/abims/e-infra/gga") # Dev arguments, TODO: remove in production branch! parser.add_argument("--full", help="Run everything, from src_data dir tree creation, moving data files (abims) into src_data," "modify headers (abims), generate blast banks (doesn't commit them: TODO), initialize GGA instance, load the data and run," " the main workflow. To update/add data to container, use --update in conjunction to --full (TODO)") parser.add_argument("--init-instance", help="Initialization of galaxy instance. Run first in an empty instance, DEV", action="store_true") parser.add_argument("--load-data", help="Create src_data directory tree, copy datasets to src_data, and load these datasets into the instance, DEV", action="store_true") parser.add_argument("--run-main", help="Run main workflow (load data into chado, sync all with tripal, " "index tripal data, populate materialized view, " "create a jbrowse for the current genus_species_strain_sex and add organism to jbrowse") parser.add_argument("--generate-docker-compose", help="Generate docker-compose.yml for current species, DEV") parser.add_argument("--link-source", help="Find source files in source data dir and copy them to src_data, DEV, OBSOLETE", action="store_true") # Production arguments parser.add_argument("input", type=str, help="Input file (yml)") parser.add_argument("-v", "--verbose", help="Increase output verbosity", action="store_false") parser.add_argument("--update", help="Update an already integrated organisms with new data from input file, docker-compose.yml will not be re-generated" ", assuming the instances for the organisms are already generated and initialized", action="store_false") parser.add_argument("--dir", help="Path of the main directory, either absolute or relative, defaults to current directory", default=os.getcwd()) args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) logging.info("Start") sp_dict_list = parse_input(args.input) for sp_dict in sp_dict_list: al = Autoload(parameters_dictionary=sp_dict, args=args) al.main_dir = os.path.abspath(args.dir) if args.load_data: """ Full workflow TODO: change later (docker side / load data side / galaxy side) """ # al.generate_dir_tree() # logging.info("Successfully generated the directory tree for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # # # al.get_source_data_files_from_path() # logging.info("Successfully retrieved source data files for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # # al.deploy_stack() # logging.info("Successfully deployed containers stack for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # al.connect_to_instance() logging.info("Connected to instance") # # al.create_species_history() # logging.info("Created a history") # # al.setup_data_libraries() # logging.info("Setting up data libraries") # al.init_instance() # logging.info("Successfully initialized instance for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # al.setup_data_libraries() # logging.info("Successfully set up data libraries in galaxy for " + al.genus[0].upper() + ". " + al.species + " " + al.strain + " " + al.sex) # if args.init_instance: # logging.info(" Initializing the galaxy instance") # al.init_instance() # al.get_instance_attributes() # # metadata[genus_species_strain_sex]["initialized"] = True # if args.load_data: # logging.info("Loading data into galaxy") # # al.load_data() # # metadata[genus_species_strain_sex]["data_loaded_in_instance"] = True # if args.run_main: # logging.info("Running main workflow") # al.get_organism_and_analyses_ids() # workflow_parameters = dict() # workflow_parameters["0"] = {} # workflow_parameters["1"] = {} # workflow_parameters["2"] = {} # workflow_parameters["3"] = {} # workflow_parameters["4"] = {"organism": al.org_id, # "analysis_id": al.genome_analysis_id, # "do_update": "true"} # workflow_parameters["5"] = {"organism": al.org_id, # "analysis_id": al.ogs_analysis_id} # workflow_parameters["6"] = {"organism_id": al.org_id} # workflow_parameters["7"] = {"analysis_id": al.ogs_analysis_id} # workflow_parameters["8"] = {"analysis_id": al.genome_analysis_id} # workflow_parameters["9"] = {"organism_id": al.org_id} # workflow_parameters["10"] = {} # workflow_parameters["11"] = {} # # al.datamap = dict() # al.datamap["0"] = {"src": "hda", "id": al.datasets["genome_file"]} # al.datamap["1"] = {"src": "hda", "id": al.datasets["gff_file"]} # al.datamap["2"] = {"src": "hda", "id": al.datasets["proteins_file"]} # al.datamap["3"] = {"src": "hda", "id": al.datasets["transcripts_file"]} # # al.run_workflow(workflow_name="main", workflow_parameters=workflow_parameters, datamap=al.datamap) # # metadata[genus_species_strain_sex]["workflows_run"] = metadata[genus_species_strain_sex]["workflows_run"].append("main") # # if args.link_source: # print('DEV') # al.generate_dir_tree() # print(al.main_dir) # print(al.species_dir) logging.info("Exit") def main(species_data): """ "Main" function :return: """ print("OK")