#!/usr/bin/env python import argparse import logging as log import os import subprocess import time from subprocess import CalledProcessError from bioblend.galaxy.objects import GalaxyInstance import yaml class DataLibLoader: def __init__(self): self.existing_folders_cache = {} self.bam_metadata_cache = {} def create_deep_folder(self, prj_lib, path, parent_folder=None, deep_name=""): segments = path.split(os.sep) deeper_name = os.sep.join([deep_name, segments[0]]) if deeper_name in self.existing_folders_cache: new_folder = self.existing_folders_cache[deeper_name] else: new_folder = prj_lib.create_folder(segments[0], base_folder=parent_folder) self.existing_folders_cache[deeper_name] = new_folder if len(segments) > 1: new_folder = self.create_deep_folder(prj_lib, os.sep.join(segments[1:]), new_folder, deeper_name) return new_folder def get_bam_label(self, dirname, bam_file): bam_id = bam_file if bam_id.endswith('.bam'): bam_id = bam_id[:-4] if dirname in self.bam_metadata_cache: if bam_id in self.bam_metadata_cache[dirname] and 'label' in self.bam_metadata_cache[dirname][bam_id] and self.bam_metadata_cache[dirname][bam_id]['label']: return self.bam_metadata_cache[dirname][bam_id]['label'] else: return None else: meta_file = os.path.join(dirname, 'metadata.yml') if os.path.exists(meta_file): with open(meta_file) as f: self.bam_metadata_cache[dirname] = yaml.safe_load(f) log.info("Found metadata in %s " % meta_file) else: self.bam_metadata_cache[dirname] = {} log.info("Did not find metadata in %s " % meta_file) return self.get_bam_label(dirname, bam_file) def main(self, args, data_dir_root='/project_data'): """ Load files into a Galaxy data library. """ log.info("Importing data libraries.") url = "http://localhost" # The environment variables are set by the parent container admin_email = os.environ.get('GALAXY_DEFAULT_ADMIN_USER', 'admin@galaxy.org') admin_pass = os.environ.get('GALAXY_DEFAULT_ADMIN_PASSWORD', 'password') # Establish connection to galaxy instance gio = GalaxyInstance(url=url, email=admin_email, password=admin_pass) log.info("Looking for project data in %s" % data_dir_root) folders = dict() for root, dirs, files in os.walk(data_dir_root, followlinks=True): file_list = [os.path.join(root, filename) for filename in files] folders[root] = file_list post_renaming = {} if folders: # Delete pre-existing lib (probably created by a previous call) existing = gio.libraries.get_previews(name='Project Data') for lib in existing: if not lib.deleted: log.info('Pre-existing "Project Data" library %s found, removing it.' % lib.id) gio.libraries.delete(lib.id) log.info("Creating new 'Project Data' library.") prj_lib = gio.libraries.create('Project Data', 'Data for current genome annotation project') for fname, files in folders.items(): if fname and files: folder_name = fname[len(data_dir_root) + 1:] log.info("Creating folder: %s" % folder_name) folder = None if not args.dry_run: folder = self.create_deep_folder(prj_lib, folder_name) for single_file in files: ftype = 'auto' clean_name = os.path.basename(single_file) clean_name = clean_name.replace('_', ' ') if single_file.endswith('.bam'): ftype = 'bam' bam_label = self.get_bam_label(fname, os.path.basename(single_file)) if bam_label: clean_name = bam_label else: clean_name = os.path.splitext(clean_name)[0] if clean_name.endswith("Aligned.sortedByCoord.out"): # Stupid thing for many local bam files clean_name = clean_name[:-25] elif single_file.endswith('.fasta') or single_file.endswith('.fa') or single_file.endswith('.faa') or single_file.endswith('.fna'): ftype = 'fasta' elif single_file.endswith('.gff') or single_file.endswith('.gff3'): ftype = 'gff3' clean_name = os.path.splitext(clean_name)[0] elif single_file.endswith('.xml'): ftype = 'xml' elif single_file.endswith('.bw'): ftype = 'bigwig' elif single_file.endswith('.gaf'): ftype = 'tabular' elif single_file.endswith('_tree.txt'): # We don't want to pollute the logs with 20000 useless lines log.debug("Skipping useless file '%s'." % single_file) continue elif single_file.endswith('.tar.gz') and 'newick' in fname: ftype = 'tar' elif single_file.endswith('.bai') or single_file.endswith('.tar.gz') or single_file.endswith('.tar.bz2') or single_file.endswith('.raw') or single_file.endswith('.pdf'): log.info("Skipping useless file '%s'." % single_file) continue log.info("Adding file '%s' with type '%s' and name '%s'." % (single_file, ftype, clean_name)) if not args.dry_run: datasets = prj_lib.upload_from_galaxy_fs( single_file, folder=folder, link_data_only='link_to_files', file_type=ftype, tag_using_filenames=False ) # Rename dataset # Need to do it AFTER the datasets import is finished, otherwise the new names are not kept by galaxy # (erased by metadata generation I guess) post_renaming[datasets[0]] = clean_name time.sleep(1) if args.dry_run: log.info("Finished in DRY RUN mode") return # Wait for uploads to complete log.info("Waiting for import jobs to finish... please wait") while True: try: # "C" state means the job is completed, no need to wait for it ret = subprocess.check_output("squeue | grep -v \"C debug\" | grep -v \"JOBID\" || true", shell=True) if not len(ret): break time.sleep(3) except CalledProcessError as inst: if inst.returncode == 153: # queue is empty break else: raise time.sleep(10) log.info("Import finished, now renaming datasets with pretty names") for dataset in post_renaming: dataset.update(name=post_renaming[dataset]) log.info("Finished importing data.") if __name__ == '__main__': parser = argparse.ArgumentParser( description='Populate the Galaxy data library with files.' ) parser.add_argument("-v", "--verbose", help="Increase output verbosity.", action="store_true") parser.add_argument("-d", "--dry-run", help="Don't update the data library, just show what it would do.", action="store_true") args = parser.parse_args() if args.verbose: log.basicConfig(level=log.DEBUG) else: log.basicConfig(level=log.INFO) dll = DataLibLoader() dll.main(args)