Skip to content
Snippets Groups Projects
utilities.py 5.62 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-

import yaml
import logging
import sys
import os
import subprocess


def parse_config(config_file):
    """
    Parse a config file containing users and password used by the different services (tripal, galaxy, chado, ...)

    :param config_file:
    :return:
    """

    config_variables = {}
    logging.debug("Using config: %s" % os.path.abspath(config_file))
    try:
        with open(config_file, 'r') as stream:
            yaml_dict = yaml.safe_load(stream)
            for k, v in yaml_dict.items():
                for k2, v2 in v.items():
                    config_variables[k2] = v2  # Add a key:value pair to variables for replacement in the compose template file

    except FileNotFoundError:
        logging.critical("The config file specified doesn't exist (%s)" % config_file)
        sys.exit()
    except OSError:
        logging.critical("The config file specified cannot be read (%s)" % config_file)
        sys.exit()

    return config_variables


def parse_input(input_file):
    """
    Parse the yml input file to extract data to create the SpeciesData objects
    Return a list of dictionaries. Each dictionary contains data tied to a species

    :param input_file:
    :return:
    """

    sp_dict_list = []

    try:
        with open(input_file, 'r') as stream:
            try:
                sp_dict_list = yaml.safe_load(stream)
            except yaml.YAMLError as err:
                logging.critical("Input file is not in YAML format")
                sys.exit(err)
    except FileNotFoundError:
        logging.critical("The specified input file doesn't exist (%s)" % input_file)
        sys.exit()
    except OSError:
        logging.critical("The specified input file cannot be read (%s)" % input_file)
        sys.exit()

    return sp_dict_list


def filter_empty_not_empty_items(li):
    """
    Separate a list between empty items and non empty items.
    Return a dict with 2 keys: empty values (items) and non empty values (items)

    :param li:
    :return:
    """
    filtered_dict = {"empty": [], "not_empty": []}
    for i in li:
        if i is None or i == "":
            filtered_dict["empty"].append(i)
        else:
            filtered_dict["not_empty"].append(i)
    return filtered_dict


def check_galaxy_state(genus_lowercase, species, script_dir):
    """
    Read the logs of the galaxy container for the current species to check if the service is "ready"

    :param genus_lowercase:
    :param species:
    :param script_dir:
    :return:
    """

    # Run supervisorctl status in the galaxy container via serexec
    # Change serexec permissions in repo
    try:
        os.chmod("%s/serexec" % script_dir, 0o0755)
    except PermissionError:
        logging.warning("serexec permissions incorrect in %s" % script_dir)
    galaxy_logs = subprocess.run(["%s/serexec" % script_dir, "{0}_{1}_galaxy".format(genus_lowercase, species),
                                  "supervisorctl", "status", "galaxy:"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if "galaxy:galaxy_web                RUNNING" in str(galaxy_logs.stdout) \
            and "galaxy:handler0                  RUNNING" in str(galaxy_logs.stdout) \
            and "galaxy:handler1                  RUNNING" in str(galaxy_logs.stdout):
        return 1
    else:
        return 0


def get_species_history_id(instance, full_name):
    """
    Set and return the current species history id in its galaxy instance

    :param instance:
    :param full_name:
    :return:
    """

    histories = instance.histories.get_histories(name=str(full_name))
    history_id = histories[0]["id"]
    show_history = instance.histories.show_history(history_id=history_id)

    return [history_id, show_history]

def get_gspecies_string_from_sp_dict(sp_dict):

    gspecies = ""
    for k, v in sp_dict.items():
        for k2, v2 in v.items():
            if k2 == "genus":
                gspecies = gspecies.lower() + v2
            elif k2 == "species":
                gspecies = gspecies.lower() + "_" + v2
    return gspecies

def get_unique_species_str_list(sp_dict_list):
    """
    Find and return which species (i.e genus species) are not duplicated in the input dictionary used by gga scripts
    Returns a list of species (directories) for which to interact only once (i.e deploying a stack or loading the library)
    This aims to reduce the number of deployments/loading for a single species

    :param sp_dict_list:
    :return:
    """

    unique_species_li = []

    for sp in sp_dict_list:
        sp_gspecies = get_gspecies_string_from_sp_dict(sp)
        if sp_gspecies not in unique_species_li and sp_gspecies != "":
            unique_species_li.append(sp_gspecies)
                
    return unique_species_li


def get_unique_species_dict_list(sp_dict_list):
    """
    Filter the species dictionary list to return only unique genus_species combinations
    The default organism  is always the first encountered in the list of species input list of dictionaries
    Used in gga_init.py to write the docker-compose files for the input organisms
    
    :param sp_dict_list:
    :return:
    """

    unique_species_dict = {}
    unique_species_list_of_dict = []
    unique_species_genus_species = get_unique_species_str_list(sp_dict_list=sp_dict_list)

    for sp in sp_dict_list:
        gspecies = get_gspecies_string_from_sp_dict(sp)
        if gspecies not in unique_species_dict.keys() or sp["description"]["main_species"] == True :
            unique_species_dict[gspecies] = sp
        else:
            continue

    for k, v in unique_species_dict.items():
        unique_species_list_of_dict.append(v)

    return unique_species_list_of_dict