Skip to content
Snippets Groups Projects
utilities.py 9.33 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-

import yaml
import logging
import sys
import constants

def load_yaml(yaml_file):

    try:
        with open(yaml_file, 'r') as stream:
            try:
                data = yaml.safe_load(stream)
            except yaml.YAMLError as err:
                logging.critical("Input file %s is not in YAML format" % yaml_file)
                sys.exit(err)
    except FileNotFoundError:
        logging.critical("Input file doesn't exist (%s)" % yaml_file)
        sys.exit()
    except OSError:
        logging.critical("Input file cannot be read (%s)" % yaml_file)
        sys.exit()

    return data

def parse_config(config_file):
    """
    Parse a config file containing users and password used by the different services (tripal, galaxy, chado, ...)

    :param config_file:
    :return:
    """

    config_dict = load_yaml(config_file)
    if not isinstance(config_dict, dict):
        logging.critical("Config yaml file is not a dictionary (%s)" % config_file)
        sys.exit()
    else:
        for required_parameter in constants.REQUIRED_PARAMETERS:
            if not config_dict[required_parameter]:
                logging.critical("{0} parameter improperly configured in config file {1}".format(required_parameter, config_file))
                sys.exit()

        return config_dict

def parse_input(input_file):
    """
    Parse the yml input file to extract data to create the SpeciesData objects
    Return a list of dictionaries. Each dictionary contains data tied to a species

    :param input_file:
    :return:
    """

    sp_dict_list = load_yaml(input_file)
    if isinstance(sp_dict_list, list):
        return sp_dict_list
    else:
        logging.critical("Input organisms yaml file is not a list" % input_file)
        sys.exit()

def filter_empty_not_empty_items(li):
    """
    Separate a list between empty items and non empty items.
    Return a dict with 2 keys: empty values (items) and non empty values (items)

    :param li:
    :return:
    """
    filtered_dict = {"empty": [], "not_empty": []}
    for i in li:
        if i is None or i == "":
            filtered_dict["empty"].append(i)
        else:
            filtered_dict["not_empty"].append(i)
    return filtered_dict

def no_empty_items(li):

    empty = True
    for i in li:
        if i is None or i == "":
            empty = False
    return empty

def get_gspecies_string_from_sp_dict(sp_dict):

    genus = sp_dict[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_GENUS]
    species = sp_dict[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_SPECIES]
    gspecies = genus.lower() + "_" + species.lower()
    return gspecies

def get_unique_species_str_list(sp_dict_list):
    """
    Find and return which species (i.e genus species) are not duplicated in the input dictionary used by gga scripts
    Returns a list of species (directories) for which to interact only once (i.e deploying a stack or loading the library)
    This aims to reduce the number of deployments/loading for a single species

    :param sp_dict_list:
    :return:
    """

    unique_species_li = []

    for sp in sp_dict_list:
        sp_gspecies = get_gspecies_string_from_sp_dict(sp)
        if sp_gspecies not in unique_species_li and sp_gspecies != "":
            unique_species_li.append(sp_gspecies)
                
    return unique_species_li

def get_unique_species_dict_list(sp_dict_list):
    """
    Filter the species dictionary list to return only unique genus_species combinations
    The default organism  is always the first encountered in the list of species input list of dictionaries
    Used in gga_init.py to write the docker-compose files for the input organisms
    
    :param sp_dict_list:
    :return:
    """

    unique_species_dict = {}
    unique_species_list_of_dict = []

    for sp in sp_dict_list:
        gspecies = get_gspecies_string_from_sp_dict(sp)
        if gspecies not in unique_species_dict.keys() or ( constants.ORG_PARAM_DESC_MAIN_SPECIES in sp[constants.ORG_PARAM_DESC].keys() and
                                                           sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_MAIN_SPECIES] == True ) :
            unique_species_dict[gspecies] = sp

    for k, v in unique_species_dict.items():
        unique_species_list_of_dict.append(v)

    return unique_species_list_of_dict

def get_sp_picture(sp_dict_list):
    """
    Get the picture for each species: the picture of the main strain if exists, other strain if not
    """

    sp_picture_dict = {}

    for sp in sp_dict_list:

        genus_species = get_gspecies_string_from_sp_dict(sp)
        # logging.debug("picture path for {0} {1}: {2}".format(genus_species,
        #                                                      sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_STRAIN],
        #                                                      sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH]))

        # no picture stored yet for this genus_species
        if genus_species not in sp_picture_dict.keys() \
            or (genus_species in sp_picture_dict.keys()
                and sp_picture_dict[genus_species] == None):

            if constants.ORG_PARAM_DESC_PICTURE_PATH in sp[constants.ORG_PARAM_DESC].keys() \
                and sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH] != None \
                and sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH] != "":
                sp_picture_dict[genus_species] = sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH]
            else:
                sp_picture_dict[genus_species] = None

        # overwrite stored picture if a picture exists for the main strain
        if (constants.ORG_PARAM_DESC_MAIN_SPECIES in sp[constants.ORG_PARAM_DESC].keys()
            and sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_MAIN_SPECIES] == True )\
            and (constants.ORG_PARAM_DESC_PICTURE_PATH in sp[constants.ORG_PARAM_DESC].keys()
                and sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH] != None
                and sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH] != ""):

            sp_picture_dict[genus_species] = sp[constants.ORG_PARAM_DESC][constants.ORG_PARAM_DESC_PICTURE_PATH]

    # logging.debug("picture dict: %s" % sp_picture_dict)

    return sp_picture_dict

def get_sp_jbrowse_links(org_list):
    """
    Get the jbrowse links from all strains for each species
    """

    jbrowse_links_dict = {}

    for org in org_list:
        gspecies = org.genus_species
        if org.contig_prefix is not None and org.contig_prefix != "":
            if gspecies not in jbrowse_links_dict.keys():
                jbrowse_links_dict[gspecies] = org.contig_prefix + ">" + org.species_folder_name
            else:
                jbrowse_links_dict[gspecies] = jbrowse_links_dict[gspecies] + ";" + org.contig_prefix + ">" + org.species_folder_name
        else:
            jbrowse_links_dict[gspecies] = None

    return jbrowse_links_dict

def create_org_param_dict_from_constants():
    """
    Create a dictionary of variables containing the keys needed to render the organisms.yml.j2 (NOT the values)
    Created from the constants
    """

    org_param_dict={}
    org_param_dict["org_param_name"] = constants.ORG_PARAM_NAME
    org_param_dict["org_param_desc"] = constants.ORG_PARAM_DESC
    org_param_dict["org_param_desc_genus"] = constants.ORG_PARAM_DESC_GENUS
    org_param_dict["org_param_desc_species"] = constants.ORG_PARAM_DESC_SPECIES
    org_param_dict["org_param_desc_sex"] = constants.ORG_PARAM_DESC_SEX
    org_param_dict["org_param_desc_strain"] = constants.ORG_PARAM_DESC_STRAIN
    org_param_dict["org_param_desc_common_name"] = constants.ORG_PARAM_DESC_COMMON_NAME
    org_param_dict["org_param_desc_origin"] = constants.ORG_PARAM_DESC_ORIGIN
    org_param_dict["org_param_desc_picture_path"] = constants.ORG_PARAM_DESC_PICTURE_PATH
    org_param_dict["org_param_desc_main_species"] = constants.ORG_PARAM_DESC_MAIN_SPECIES
    org_param_dict["org_param_desc_database"] = constants.ORG_PARAM_DESC_DATABASE
    org_param_dict["org_param_data"] = constants.ORG_PARAM_DATA
    org_param_dict["org_param_data_genome_path"] = constants.ORG_PARAM_DATA_GENOME_PATH
    org_param_dict["org_param_data_genome_contig_prefix"] = constants.ORG_PARAM_DATA_GENOME_CONTIG_PREFIX
    org_param_dict["org_param_data_transcripts_path"] = constants.ORG_PARAM_DATA_TRANSCRIPTS_PATH
    org_param_dict["org_param_data_proteins_path"] = constants.ORG_PARAM_DATA_PROTEINS_PATH
    org_param_dict["org_param_data_gff_path"] = constants.ORG_PARAM_DATA_GFF_PATH
    org_param_dict["org_param_data_interpro_path"] = constants.ORG_PARAM_DATA_INTERPRO_PATH
    org_param_dict["org_param_data_orthofinder_path"] = constants.ORG_PARAM_DATA_ORTHOFINDER_PATH
    org_param_dict["org_param_data_blastp_path"] = constants.ORG_PARAM_DATA_BLASTP_PATH
    org_param_dict["org_param_data_blastx_path"] = constants.ORG_PARAM_DATA_BLASTX_PATH
    org_param_dict["org_param_data_genome_version"] = constants.ORG_PARAM_DATA_GENOME_VERSION
    org_param_dict["org_param_data_ogs_version"] = constants.ORG_PARAM_DATA_OGS_VERSION
    org_param_dict["org_param_services"] = constants.ORG_PARAM_SERVICES
    org_param_dict["org_param_services_blast"] = constants.ORG_PARAM_SERVICES_BLAST
    org_param_dict["org_param_services_go"] = constants.ORG_PARAM_SERVICES_GO

    return org_param_dict