Newer
Older

Arthur Le Bars
committed
#!/usr/bin/env python3

Arthur Le Bars
committed
import argparse
import os
import subprocess
import logging
import sys

Arthur Le Bars
committed
import fnmatch
import time
import json
import re

Arthur Le Bars
committed
import stat
import shutil

Arthur Le Bars
committed
from bioblend.galaxy.objects import GalaxyInstance
from bioblend import galaxy

Arthur Le Bars
committed
"""

Arthur Le Bars
committed
Usage: $ python3 gga_init.py -i input_example.yml [OPTIONS]

Arthur Le Bars
committed

Arthur Le Bars
committed
Do not call this script before the galaxy container is ready
"""

Arthur Le Bars
committed

Arthur Le Bars
committed
"""

Arthur Le Bars
committed
Child of SpeciesData
Contains methods and attributes to copy data into the src_data subfolders of an organism and then into the
galaxy instance's history of this given organism
Optional fasta headers reformat

Arthur Le Bars
committed
"""

Arthur Le Bars
committed
def goto_species_dir(self):

Arthur Le Bars
committed
"""

Arthur Le Bars
committed
Go to the species directory (starting from the main dir)

Arthur Le Bars
committed
:return:
"""

Arthur Le Bars
committed
os.chdir(self.main_dir)
species_dir = os.path.join(self.main_dir, self.genus_species) + "/"

Arthur Le Bars
committed
try:

Arthur Le Bars
committed
os.chdir(species_dir)

Arthur Le Bars
committed
except OSError:

Arthur Le Bars
committed
logging.critical("Cannot access %s" % species_dir)
sys.exit(0)
return 1

Arthur Le Bars
committed
def batch_modify_fasta_headers(self):

Arthur Le Bars
committed
"""
Change the fasta headers before integration, so that the indexing tool in galaxy interprets the headers

Arthur Le Bars
committed
The function will use the class attribute "source_datasets", pointing to files in the galaxy
library to find the fasta files that need their headers formatted

Arthur Le Bars
committed
:return:
"""

Arthur Le Bars
committed
proteins_file = None
annotation_dir = None

Arthur Le Bars
committed
organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.genome_version))

Arthur Le Bars
committed
self.goto_species_dir()

Arthur Le Bars
committed

Arthur Le Bars
committed
for d in [i[0] for i in os.walk(os.getcwd() + "/src_data")]:

Arthur Le Bars
committed
if "annotation" in d and self.species_folder_name in d and self.ogs_version in d:

Arthur Le Bars
committed
for f in os.listdir(d):

Arthur Le Bars
committed
if "proteins" in f:
proteins_file = os.path.join(d, f)

Arthur Le Bars
committed
proteins_outfile = os.path.join(d, "outfile_proteins.fa")

Arthur Le Bars
committed
annotation_dir = os.path.abspath(d)
# Formatting the headers
if proteins_file is not None:
self.format_fasta_headers(infile=proteins_file,
outfile=proteins_outfile,
pattern="^>mRNA",
repl=">protein")
if os.path.exists(annotation_dir + "/outfile_proteins.fa"):

Arthur Le Bars
committed
subprocess.run(["mv", annotation_dir + "/outfile_proteins.fa", proteins_file],

Arthur Le Bars
committed
stdout=subprocess.PIPE,
cwd=annotation_dir)
# subprocess.run(["rm", annotation_dir + "/outfile_proteins.fa"], stdout=subprocess.PIPE, cwd=annotation_dir)

Arthur Le Bars
committed
else:
logging.warning("Skipping proteins fasta headers formatting (FileNotFound)")

Arthur Le Bars
committed

Arthur Le Bars
committed
def format_fasta_headers(self, infile, outfile, pattern, repl):
"""
Format the fasta headers of a given file, given a matching pattern and a replacement string
:param infile:
:param outfile:
:param pattern:
:param repl:
:return:
"""
infile = open(infile, 'r')
outfile = open(outfile, 'w')
lines = infile.readlines()
for line in lines:
line_out = re.sub(pattern, repl, line)
outfile.write(line_out)
infile.close()
outfile.close()
def get_source_data_files_from_path(self):
"""

Arthur Le Bars
committed
Find source data files in the parent_directory

Arthur Le Bars
committed
Link data files

Arthur Le Bars
committed
TODO: implement search/tests for individual file paths

Arthur Le Bars
committed
:return:
"""
try:
os.chdir(self.species_dir)
except OSError:
logging.critical("Cannot access " + self.species_dir)
sys.exit(0)
organism_annotation_dir = os.path.abspath("./src_data/annotation/{0}/OGS{1}".format(self.species_folder_name, self.genome_version))
organism_genome_dir = os.path.abspath("./src_data/genome/{0}/v{1}".format(self.species_folder_name, self.genome_version))
for dirpath, dirnames, files in os.walk(self.source_data_dir):
if "0" in str(dirpath): # TODO: Ensures to take the correct files (other dirs hold files with valid names), this is for Phaeoexplorer only!
for f in files:
if "Contaminants" not in str(f):
try:
if fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".fa"):
logging.info("Genome assembly file - " + str(f))

Arthur Le Bars
committed
# os.symlink(os.path.join(dirpath, f), organism_genome_dir)
# organism_genome_dir = os.path.abspath("./src_data/genome/" +
# self.species_folder_name + "/v" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_genome_dir, f))

Arthur Le Bars
committed
elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + ".gff"):
logging.info("GFF file - " + str(f))

Arthur Le Bars
committed
# os.symlink(os.path.join(dirpath, f), organism_annotation_dir)
# organism_annotation_dir = os.path.abspath("./src_data/annotation/" +
# self.species_folder_name + "/OGS" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_annotation_dir, f))

Arthur Le Bars
committed
elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" +
self.sex.upper() + "_transcripts-gff.fa"):
logging.info("Transcripts file - " + str(f))

Arthur Le Bars
committed
# os.symlink(os.path.join(dirpath, f), organism_annotation_dir)
# organism_annotation_dir = os.path.abspath("./src_data/annotation/" +
# self.species_folder_name + "/OGS" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_annotation_dir, f))

Arthur Le Bars
committed
elif fnmatch.fnmatch(f, "*" + self.species[1:] + "_" + self.sex.upper() + "_proteins.fa"):
logging.info("Proteins file - " + str(f))

Arthur Le Bars
committed
# os.symlink(os.path.join(dirpath, f), organism_annotation_dir)
# organism_annotation_dir = os.path.abspath("./src_data/annotation/" +
# self.species_folder_name + "/OGS" +
# self.genome_version)
shutil.copyfile(os.path.join(dirpath, f), os.path.join(organism_annotation_dir, f))
except FileExistsError as exc:

Arthur Le Bars
committed
logging.warning("Error raised (FileExistsError)")

Arthur Le Bars
committed
logging.warning(exc)
except TypeError as exc:

Arthur Le Bars
committed
logging.warning("Error raised (TypeError)")

Arthur Le Bars
committed
logging.warning(exc)
except NotADirectoryError as exc:

Arthur Le Bars
committed
logging.warning("Error raised (NotADirectoryError)")

Arthur Le Bars
committed
logging.warning(exc)

Arthur Le Bars
committed
os.chdir(self.main_dir)
def set_get_history(self):
"""

Arthur Le Bars
committed
Create or set the working history to the current species one
TODO move to utilities

Arthur Le Bars
committed
:return:
"""
try:
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.info("History for {0}: {1}".format(self.full_name, self.history_id))
except IndexError:
logging.info("Creating history for %s" % self.full_name)
self.instance.histories.create_history(name=str(self.full_name))
histories = self.instance.histories.get_histories(name=str(self.full_name))
self.history_id = histories[0]["id"]
logging.info("History for {0}: {1}".format(self.full_name, self.history_id))
return self.history_id

Arthur Le Bars
committed
def remove_homo_sapiens_from_db(self):

Arthur Le Bars
committed
"""

Arthur Le Bars
committed
Run the GMOD tool to remove the "Homo sapiens" default organism from the original database
Will do nothing if H. sapiens isn't in the database

Arthur Le Bars
committed
"""
logging.debug("Getting 'Homo sapiens' ID in instance's chado database")
get_sapiens_id_job = self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_get_organisms/organism_get_organisms/2.3.2",
history_id=self.history_id,
tool_inputs={"genus": "Homo", "species": "sapiens"})
get_sapiens_id_job_output = get_sapiens_id_job["outputs"][0]["id"]
get_sapiens_id_json_output = self.instance.datasets.download_dataset(dataset_id=get_sapiens_id_job_output)
try:
logging.debug("Deleting Homo 'sapiens' in the instance's chado database")
get_sapiens_id_final_output = json.loads(get_sapiens_id_json_output)[0]
sapiens_id = str(
get_sapiens_id_final_output["organism_id"]) # needs to be str to be recognized by the chado tool
self.instance.tools.run_tool(
tool_id="toolshed.g2.bx.psu.edu/repos/gga/chado_organism_delete_organisms/organism_delete_organisms/2.3.2",
history_id=self.history_id,
tool_inputs={"organism": str(sapiens_id)})
except bioblend.ConnectionError:
logging.debug("Homo sapiens isn't in the instance's chado database")
except IndexError:
logging.debug("Homo sapiens isn't in the instance's chado database")
pass

Arthur Le Bars
committed

Arthur Le Bars
committed
def purge_histories(self):
"""
Delete all histories in the instance

Arthur Le Bars
committed
For testing purposes

Arthur Le Bars
committed
:return:
"""
histories = self.instance.histories.get_histories()
self.instance.histories.get_histories(deleted=False)
for h in histories:
self.instance.histories.delete_history(history_id=h["id"])
return histories
def setup_library(self):
"""
Create a "Project Data" library in galaxy, mirroring the "src_data" folder of the current organism
directory tree

Arthur Le Bars
committed
Credits go to Anthony Bretaudeau for the initial code

Arthur Le Bars
committed
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
:return:
"""
self.goto_species_dir()
# Delete pre-existing lib (probably created by a previous call)
gio = GalaxyInstance(url=self.instance_url,
email=self.config["custom_galaxy_default_admin_email"],
password=self.config["custom_galaxy_default_admin_password"])
folders = dict()
post_renaming = {}
for root, dirs, files in os.walk("./src_data", followlinks=True):
file_list = [os.path.join(root, filename) for filename in files]
folders[root] = file_list
if folders:
# Delete pre-existing lib (probably created by a previous call)
existing = gio.libraries.get_previews(name='Project Data')
for lib in existing:
if not lib.deleted:
logging.info('Pre-existing "Project Data" library %s found, removing it' % lib.id)
gio.libraries.delete(lib.id)
logging.info("Creating new 'Project Data' library")
prj_lib = gio.libraries.create('Project Data', 'Data for current genome annotation project')
self.library_id = prj_lib.id # project data folder/library
logging.info("Library for {0}: {1}".format(self.full_name, self.library_id))
for fname, files in folders.items():
if fname and files:
folder_name = fname[len("./src_data") + 1:]
logging.info("Creating folder: %s" % folder_name)
folder = self.create_deep_folder(prj_lib, folder_name)
for single_file in files:
ftype = 'auto'
clean_name = os.path.basename(single_file)
clean_name = clean_name.replace('_', ' ')
if single_file.endswith('.bam'):
ftype = 'bam'
bam_label = self.get_bam_label(fname, os.path.basename(single_file))
if bam_label:
clean_name = bam_label
else:
clean_name = os.path.splitext(clean_name)[0]
if clean_name.endswith("Aligned.sortedByCoord.out"): # Stupid thing for many local bam files
clean_name = clean_name[:-25]
elif single_file.endswith('.fasta') or single_file.endswith('.fa') or single_file.endswith(
'.faa') or single_file.endswith('.fna'):
ftype = 'fasta'
elif single_file.endswith('.gff') or single_file.endswith('.gff3'):
ftype = 'gff3'
clean_name = os.path.splitext(clean_name)[0]
elif single_file.endswith('.xml'):
ftype = 'xml'
elif single_file.endswith('.bw'):
ftype = 'bigwig'
elif single_file.endswith('.gaf'):
ftype = 'tabular'
elif single_file.endswith('_tree.txt'):
# We don't want to pollute the logs with 20000 useless lines
logging.debug("Skipping useless file '%s'" % single_file)
continue
elif single_file.endswith('.tar.gz') and 'newick' in fname:
ftype = 'tar'
elif single_file.endswith('.bai') or single_file.endswith('.tar.gz') or single_file.endswith(
'.tar.bz2') or single_file.endswith('.raw') or single_file.endswith('.pdf'):
logging.info("Skipping useless file '%s'" % single_file)
continue
logging.info("Adding file '%s' with type '%s' and name '%s'" % (single_file, ftype, clean_name))
datasets = prj_lib.upload_from_local(
path=single_file,
folder=folder,
file_type=ftype
)
# Rename dataset
# Need to do it AFTER the datasets import is finished, otherwise the new names are not kept by galaxy
# (erased by metadata generation I guess)

Arthur Le Bars
committed
# Doesn't work for some reason (LibraryDataset not subscriptable, __getitem__() not implemented)

Arthur Le Bars
committed
# post_renaming[datasets[0]] = clean_name
time.sleep(1)
# Wait for uploads to complete
logging.info("Waiting for import jobs to finish... please wait")

Arthur Le Bars
committed

Arthur Le Bars
committed
# Checking job state (only necessary if ran using SLURM)
# while True:
# try:
# # "C" state means the job is completed, no need to wait for it
# ret = subprocess.check_output("squeue | grep -v \"C debug\" | grep -v \"JOBID\" || true",
# shell=True)
# if not len(ret):
# break
# time.sleep(3)
# except subprocess.CalledProcessError as inst:
# if inst.returncode == 153: # queue is empty
# break
# else:
# raise

Arthur Le Bars
committed
time.sleep(10)

Arthur Le Bars
committed
# Batch renaming --> Throws a critical error at the moment

Arthur Le Bars
committed
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# logging.info("Import finished, now renaming datasets with pretty names")
# for dataset in post_renaming:
# dataset.update(name=post_renaming[dataset])
logging.info("Finished importing data")
def create_deep_folder(self, prj_lib, path, parent_folder=None, deep_name=""):
"""
Create a folder inside a folder in a galaxy library
Recursive
:param prj_lib:
:param path:
:param parent_folder:
:param deep_name:
:return:
"""
segments = path.split(os.sep)
deeper_name = os.sep.join([deep_name, segments[0]])
if deeper_name in self.existing_folders_cache:
new_folder = self.existing_folders_cache[deeper_name]
else:
new_folder = prj_lib.create_folder(segments[0], base_folder=parent_folder)
self.existing_folders_cache[deeper_name] = new_folder
if len(segments) > 1:
new_folder = self.create_deep_folder(prj_lib, os.sep.join(segments[1:]), new_folder, deeper_name)
return new_folder

Arthur Le Bars
committed
def setup_data_libraries(self):
"""

Arthur Le Bars
committed
Load data into the galaxy container with the galaxy_data_libs_SI.py script written by A. Bretaudeau
DEPRECATED

Arthur Le Bars
committed
:return:
"""

Arthur Le Bars
committed
self.goto_species_dir()

Arthur Le Bars
committed
try:
logging.info("Loading data into the galaxy container")

Arthur Le Bars
committed
subprocess.call(["../serexec","{0}_{1}_galaxy".format(self.genus_lowercase, self.species),
"/tool_deps/_conda/bin/python",
"/opt/galaxy_data_libs_SI.py"])

Arthur Le Bars
committed
except subprocess.CalledProcessError:
logging.info("Cannot load data into the galaxy container for " + self.full_name)
pass
else:
logging.info("Data successfully loaded into the galaxy container for " + self.full_name)
def generate_blast_banks(self):
"""

Arthur Le Bars
committed
TODO: Automatically generate blast banks for a species and commit

Arthur Le Bars
committed
:return:
"""

Arthur Le Bars
committed

Arthur Le Bars
committed
def connect_to_instance(self):
"""
Test the connection to the galaxy instance for the current organism

Arthur Le Bars
committed
Exit if we cannot connect to the instance

Arthur Le Bars
committed
"""

Arthur Le Bars
committed
self.instance = galaxy.GalaxyInstance(url=self.instance_url,
email=self.config["custom_galaxy_default_admin_email"],
password=self.config["custom_galaxy_default_admin_password"]
)
logging.info("Connecting to the galaxy instance...")

Arthur Le Bars
committed
try:
self.instance.histories.get_histories()
except bioblend.ConnectionError:
logging.critical("Cannot connect to galaxy instance @ " + self.instance_url)
sys.exit()
else:
logging.info("Successfully connected to galaxy instance @ " + self.instance_url)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automatic data loading in containers and interaction "
"with galaxy instances for GGA"
", following the protocol @ "
"http://gitlab.sb-roscoff.fr/abims/e-infra/gga")
parser.add_argument("input",
type=str,
help="Input file (yml)")
parser.add_argument("-v", "--verbose",
help="Increase output verbosity",
action="store_false")

Arthur Le Bars
committed
parser.add_argument("--config",
type=str,
help="Config path, default to the 'config' file inside the script repository")
parser.add_argument("--main-directory",
type=str,
help="Where the stack containers will be located, defaults to working directory")

Arthur Le Bars
committed
parser.add_argument("--load-history",
help="Load the found files into history, will ask for confirmation if a file with the same name is already found in an history",
action="store_false")

Arthur Le Bars
committed
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Arthur Le Bars
committed
# Parsing the config file if provided, using the default config otherwise

Arthur Le Bars
committed
if not args.config:
args.config = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "config")
else:
args.config = os.path.abspath(args.config)
if not args.main_directory:
args.main_directory = os.getcwd()
else:
args.main_directory = os.path.abspath(args.main_directory)
sp_dict_list = utilities.parse_input(args.input)

Arthur Le Bars
committed
for sp_dict in sp_dict_list:

Arthur Le Bars
committed
# Creating an instance of load_data_for_current_species object
load_data_for_current_species = LoadData(parameters_dictionary=sp_dict)
# Starting
logging.info("gga_load_data.py called for %s" % load_data_for_current_species.full_name)
# Setting some of the instance attributes
load_data_for_current_species.main_dir = args.main_directory
load_data_for_current_species.species_dir = os.path.join(load_data_for_current_species.main_dir,
load_data_for_current_species.genus_species +
"/")

Arthur Le Bars
committed
# Parse the config yaml file

Arthur Le Bars
committed
load_data_for_current_species.config = utilities.parse_config(args.config)

Arthur Le Bars
committed
# Set the instance url attribute
for env_variable, value in load_data_for_current_species.config.items():
if env_variable == "custom_host":
# TODO:

Arthur Le Bars
committed
load_data_for_current_species.instance_url = "http://{0}:8888/sp/{1}_{2}/galaxy/".format(
value, load_data_for_current_species.genus_lowercase, load_data_for_current_species.species)
break
else:
load_data_for_current_species.instance_url = "http://localhost:8888/sp/{0}_{1}/galaxy/".format(
load_data_for_current_species.genus_lowercase,
load_data_for_current_species.species)

Arthur Le Bars
committed
# Change serexec permissions in repo
try:
os.chmod("%s/serexec" % load_data_for_current_species.script_dir, 0o0777)
except PermissionError:
logging.critical("Cannot access %s, exiting" % load_data_for_current_species.script_dir)

Arthur Le Bars
committed

Arthur Le Bars
committed
# Check the galaxy container state and proceed if the galaxy services are up and running
if utilities.check_galaxy_state(genus_lowercase=load_data_for_current_species.genus_lowercase,
species=load_data_for_current_species.species,
script_dir=load_data_for_current_species.script_dir):

Arthur Le Bars
committed
# Load config file
load_data_for_current_species.config = utilities.parse_config(args.config)
# Testing connection to the instance
load_data_for_current_species.connect_to_instance()

Arthur Le Bars
committed
# Retrieve datasets
logging.info("Finding and copying datasets for %s" % load_data_for_current_species.full_name)
load_data_for_current_species.get_source_data_files_from_path()
logging.info("Sucessfully copied datasets for %s" % load_data_for_current_species.full_name)
# # Format fasta headers (proteins)
logging.info("Formatting fasta files headers %s " % load_data_for_current_species.full_name)
load_data_for_current_species.batch_modify_fasta_headers()
logging.info("Successfully formatted files headers %s " % load_data_for_current_species.full_name)
# Load the datasets into a galaxy library
logging.info("Setting up library for %s" % load_data_for_current_species.full_name)
load_data_for_current_species.setup_library()
logging.info("Successfully set up library in galaxy for %s" % load_data_for_current_species.full_name)

Arthur Le Bars
committed

Arthur Le Bars
committed
# Set or get the history for the current organism
load_data_for_current_species.set_get_history()
# Remove H. sapiens from database if here TODO: set a dedicated history for removing H. sapiens (instead of doing it into a species history)
load_data_for_current_species.remove_homo_sapiens_from_db()

Arthur Le Bars
committed

Arthur Le Bars
committed
# logging.info("Importing datasets into history for %s" % load_data_for_current_species.full_name)
# load_data_for_current_species.import_datasets_into_history()

Arthur Le Bars
committed

Arthur Le Bars
committed
# load_data_for_current_species.purge_histories() # Testing purposes

Arthur Le Bars
committed

Arthur Le Bars
committed
logging.info("Data successfully loaded and imported for %s" % load_data_for_current_species.full_name)

Arthur Le Bars
committed
else:
logging.critical("The galaxy container for %s is not ready yet!" % load_data_for_current_species.full_name)