from bioblend import galaxy import bioblend import argparse import os import subprocess import sys import json import yaml import numpy import pandas import logging import re class Autoload: """ Cleaner version for gga_auto_load (to use in production). This class possesses most useful parameters to interact with GGA as attributes (as defined in __init__), so new methods can be more easily implemented by copying already existing ones (i.e add new analysis, run a workflow, ...) To run the workflows, place them in the same directory as this script, and add the method + the workflow parameters in the main invocation (at the end of the file) """ def __init__(self, species_parameters_dictionary: dict): self.species_parameters_dictionary = species_parameters_dictionary self.species = species_parameters_dictionary["species"] self.genus = species_parameters_dictionary["genus"] self.strain = species_parameters_dictionary["strain"] self.sex = species_parameters_dictionary["sex"] self.common = species_parameters_dictionary["common"] self.date = species_parameters_dictionary["date"] self.performed = species_parameters_dictionary["performed by"] self.genome_version = species_parameters_dictionary["genome version"] self.ogs_version = species_parameters_dictionary["ogs version"] self.genus_lowercase = self.genus[0].lower() + self.genus[1:] self.full_name = " ".join([self.genus_lowercase, self.species, self.strain, self.sex]) self.abbreviation = " ".join([self.genus_lowercase[0], self.species, self.strain, self.sex]) self.genus_species = self.genus_lowercase + "_" + self.species self.instance_url = "http://localhost/sp/" + self.genus_lowercase + "_" + self.species + "/galaxy/" self.instance: galaxy = None self.history_id = None self.library_id = None self.main_dir = None self.species_dir = None self.org_id = None self.genome_analysis_id = None self.ogs_analysis_id = None self.tool_panel = None # Test the connection to the galaxy instance for the current species # Additionally set some class attributes # TODO: auth issues with nginx self.instance = galaxy.GalaxyInstance(url=self.instance_url, key="3b36455cb16b4d0e4348e2c42f4bb934", email="alebars@sb-roscoff.fr", password="pouet", verify=True) logging.info("testing connection to the galaxy instance ...") try: self.instance.histories.get_histories() self.tool_panel = self.instance.tools.get_tool_panel() except bioblend.ConnectionError: logging.info("cannot connect to galaxy instance @ " + self.instance_url) sys.exit() else: logging.info("successfully connected to galaxy instance @ " + self.instance_url) self.main_dir = os.getcwd() + "/" self.species_dir = os.path.join(self.main_dir, self.genus_species) + "/" def load_data_in_galaxy(self, method): """ - create the src_data directory tree for the species - change headers for pep file - load data into the galaxy container with the galaxy_data_libs_SI.py script :param method: :return: """ os.chdir(self.main_dir) try: os.mkdir(self.species_dir) except FileExistsError: logging.debug("directory " + self.species_dir + " already exists") try: os.chdir(self.species_dir) working_dir = os.getcwd() except OSError: logging.info("cannot access " + self.species_dir + ", run with higher privileges") sys.exit() src_data_folders = ["annotation", "genome"] species_folder_name = "_".join([self.genus_lowercase, self.species, self.strain, self.sex]) try: os.mkdir("./src_data") os.mkdir("./src_data/annotation") os.mkdir("./src_data/genome") os.mkdir("./src_data/annotation/" + species_folder_name) os.mkdir("./src_data/genome/" + species_folder_name) except FileExistsError: logging.debug("src_data directory tree already exists") except PermissionError: logging.debug("insufficient permission to create src_data directory tree") # Data import into galaxy source_files = dict() annotation_dir, genome_dir = None, None for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]: if "annotation/" in d: annotation_dir = d for f in os.listdir(d): if f.endswith("proteins.fasta"): source_files["proteins_file"] = os.path.join(d, f) elif f.endswith("transcripts-gff.fa"): source_files["transcripts_file"] = os.path.join(d, f) elif f.endswith(".gff"): source_files["gff_file"] = os.path.join(d, f) elif "genome/" in d: genome_dir = d for f in os.listdir(d): if f.endswith(".fa"): source_files["genome_file"] = os.path.join(d, f) logging.debug("source files found:") for k, v in source_files.items(): logging.debug("\t" + k + "\t" + v) # Changing headers in the *proteins.fasta file from >mRNA* to >protein* # production version modify_pep_headers = ["/usr/local/genome2/mmo/scripts/phaeoexplorer/phaeoexplorer-change_pep_fasta_header.sh", source_files["proteins_file"]] # test version modify_pep_headers = ["/home/alebars/gga/phaeoexplorer-change_pep_fasta_header.sh", source_files["proteins_file"]] logging.info("changing fasta headers in " + source_files["proteins_file"]) subprocess.run(modify_pep_headers, stdout=subprocess.PIPE, cwd=annotation_dir) # src_data cleaning if os.path.exists(annotation_dir + "outfile"): subprocess.run(["mv", annotation_dir + "/outfile", source_files["proteins_file"]], stdout=subprocess.PIPE, cwd=annotation_dir) if os.path.exists(annotation_dir + "gmon.out"): subprocess.run(["rm", annotation_dir + "/gmon.out"], stdout=subprocess.PIPE, cwd=annotation_dir) setup_data_libraries = "docker-compose exec galaxy /tool_deps/_conda/bin/python /opt/setup_data_libraries.py" try: logging.info("loading data into the galaxy container") subprocess.run(setup_data_libraries, stdout=subprocess.PIPE, shell=True) except subprocess.CalledProcessError: logging.info("cannot load data into container for " + self.full_name) pass else: logging.info("data successfully loaded into docker container for " + self.full_name) # gi.histories.create_history(name=str(genus_species_strain + "_" + genome_version)) histories = self.instance.histories.get_histories(name=str(self.full_name + "_" + self.genome_version)) self.history_id = histories[0]["id"] libraries = self.instance.libraries.get_libraries() # normally only one library self.library_id = self.instance.libraries.get_libraries()[0]["id"] # project data folder/library instance_source_data_folders = self.instance.libraries.get_folders(library_id=self.library_id) folders_ids = {} current_fo_name = "" # folders ids: access to data to run the first tools for i in instance_source_data_folders: for k, v in i.items(): if k == "name": folders_ids[v] = 0 current_fo_name = v if k == "id": folders_ids[current_fo_name] = v logging.info("folders and datasets IDs: ") datasets = dict() for k, v in folders_ids.items(): logging.info("\t" + k + ": " + v) if k == "/genome": sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: if e["name"].endswith(".fa"): datasets["genome_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif k == "/annotation/" + self.genus_species: sub_folder_content = self.instance.folders.show_folder(folder_id=v, contents=True) for k2, v2 in sub_folder_content.items(): for e in v2: if type(e) == dict: # TODO: manage several files of the same type and manage versions if e["name"].endswith("transcripts-gff.fa"): datasets["transcripts_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("proteins.fasta"): datasets["proteins_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith(".gff"): datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) elif e["name"].endswith("MALE"): datasets["gff_file"] = e["ldda_id"] logging.info("\t\t" + e["name"] + ": " + e["ldda_id"]) self.history_id = self.instance.histories.get_current_history()["id"] logging.debug("history ID: " + self.history_id) # import all datasets into current history self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=datasets["genome_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=datasets["gff_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=datasets["transcripts_file"]) self.instance.histories.upload_dataset_from_library(history_id=self.history_id, lib_dataset_id=datasets["proteins_file"]) def run_workflow(self, workflow_name, workflow_parameters): """ :param workflow_ga_file: :param workflow_parameters: :return: """ logging.debug("running workflow: " + str(workflow_name)) workflow_ga_file = self.main_dir + "Galaxy-Workflow-" + workflow_name + ".ga" if self.strain != "": custom_ga_file = "_".join([self.genus, self.species, self.strain]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) else: custom_ga_file = "_".join([self.genus, self.species]) + "_workflow.ga" custom_ga_file_path = os.path.abspath(custom_ga_file) with open(workflow_ga_file, 'r') as ga_in_file: ga_in = str(ga_in_file.readlines()) ga_in = ga_in.replace('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"UNIQUE_ID\\\\\\\\\\\\"}', str('{\\\\\\\\\\\\"unique_id\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus + " " + self.species) + '\\\\\\\\\\\\"') ga_in = ga_in.replace('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"NAME\\\\\\\\\\\\"', str('\\\\\\\\\\\\"name\\\\\\\\\\\\": \\\\\\\\\\\\"' + self.genus.lower()[0] + self.species) + '\\\\\\\\\\\\"') ga_in = ga_in.replace("\\\\", "\\") # to restore the correct amount of backslashes in the workflow string before import # test ga_in = ga_in.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', "http://localhost/sp/" + self.genus_lowercase+ "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") # production # ga_in = ga_in.replace('http://localhost/sp/genus_species/feature/Genus/species/mRNA/{id}', # "http://abims--gga.sb-roscoff.fr/sp/" + self.genus_lowercase + "_" + self.species + "/feature/" + self.genus + "/mRNA/{id}") ga_in = ga_in[2:-2] # if the line under doesn't outputs a correct json # ga_in = ga_in[:-2] # if the line above doesn't outputs a correct json def init_instance(self): """ Galaxy instance startup in preparation for running workflows - remove Homo sapiens from the chado database. - add organism and analyses into the chado database - get any other existing organisms IDs (mainly used for testing) :return: """ # Delete Homo sapiens from Chado database get_sapiens_id_job = self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", tool_inputs={"genus": "Homo", "species": "species"}, history=self.history_id) get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"] get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output) try: get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0] sapiens_id = str(get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2", history_id=self.history_id, tool_inputs={"organism": str(sapiens_id)}) except bioblend.ConnectionError: logging.debug("homo sapiens isn't in the database") except IndexError: pass # Add organism (species) to chado self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_add_organism/organism_add_organism/2.3.2", history_id=self.history_id, tool_inputs={"abbr": self.abbreviation, "genus": self.genus, "species": self.species, "common": self.common}) # Add OGS analysis to chado self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version, "program": "Performed by Genoscope", "programversion": str("OGS" + self.ogs_version), "sourcename": "Genoscope", "date_executed": self.date}) # Add genome analysis to chado self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_add_analysis/analysis_add_analysis/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version, "program": "Performed by Genoscope", "programversion": str("genome v" + self.genome_version), "sourcename": "Genoscope", "date_executed": self.date}) # Get the ID from OGS analysis in chado org = self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2", history_id=self.history_id, tool_inputs={"genus": self.genus, "species": self.species}) org_job_out = org["outputs"][0]["id"] org_json_output = self.instance.datasets.download_dataset(dataset_id=org_job_out) try: org_output = json.loads(org_json_output)[0] self.org_id = str(org_output["organism_id"]) # needs to be str to be recognized by chado tools except IndexError: logging.debug("no organism matching " + self.full_name + " exists in the Chado database") ogs_analysis = self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " OGS" + self.ogs_version}) ogs_analysis_job_out = ogs_analysis["outputs"][0]["id"] ogs_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=ogs_analysis_job_out) try: ogs_analysis_output = json.loads(ogs_analysis_json_output)[0] self.ogs_analysis_id = str(ogs_analysis_output["analysis_id"]) # needs to be str to be recognized by chado tools except IndexError: logging.debug("no matching OGS analysis exists in the Chado database") genome_analysis = self.instance.tools.run_tool(tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_analysis_get_analyses/analysis_get_analyses/2.3.2", history_id=self.history_id, tool_inputs={"name": self.genus + " " + self.species + " genome v" + self.genome_version}) genome_analysis_job_out = genome_analysis["outputs"][0]["id"] genome_analysis_json_output = self.instance.datasets.download_dataset(dataset_id=genome_analysis_job_out) try: genome_analysis_output = json.loads(genome_analysis_json_output)[0] self.genome_analysis_id = str(genome_analysis_output["analysis_id"]) # needs to be str to be recognized by chado tools except IndexError: logging.debug("no matching genome analysis exists in the Chado database") logging.info("finished initializing instance") def clean_instance(self): """ TODO: function to purge the instance from analyses and organisms :return: """ return None if __name__ == "main": parser = argparse.ArgumentParser(description="Input genus, species, strain, version") parser.add_argument("json", type=str, help="Input JSON file") parser.add_argument("-v", "--verbose", help="Increase output verbosity") parser.add_argument("--load-data", help="Create src_data directory tree and load data into galaxy") parser.add_argument("--main-workflow", help="Run main workflow (initialize galaxy instance, load data into chado," "sync with tripal, create jbrowse and add organism to jbrowse") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) sp_dict_list = list() with open(args.json, 'r') as infile: json_sp_dict = json.load(infile) json_sp_dump = json.dumps(json_sp_dict, indent=4, sort_keys=True) for json_sp in json_sp_dict: sp_dict_list.append(json_sp) for sp_dict in sp_dict_list: al = Autoload(species_parameters_dictionary=sp_dict) if args.main_workflow: workflow_parameters = dict() workflow_parameters["0"] = {} workflow_parameters["1"] = {} workflow_parameters["2"] = {} workflow_parameters["3"] = {} workflow_parameters["4"] = {"organism": al.org_id, "analysis_id": al.genome_analysis_id, "do_update": "true"} # the do_update parameter is to prevent assertion errors when loading the file, should always be set to "true" workflow_parameters["5"] = {"organism": al.org_id, "analysis_id": al.ogs_analysis_id} workflow_parameters["6"] = {"organism_id": al.org_id} workflow_parameters["7"] = {"analysis_id": al.ogs_analysis_id} workflow_parameters["8"] = {"analysis_id": al.genome_analysis_id} workflow_parameters["9"] = {"organism_id": al.org_id} al.run_workflow(workflow_name="main", workflow_parameters=workflow_parameters)