-
Arthur Le Bars authored
jinja templating, more workflow changes, trying fixes for the jbrowse gmod tool using API (in progress)
dc2e8f3a
utilities.py 3.74 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
import yaml
import logging
import sys
import os
import subprocess
def parse_config(config_file):
"""
Parse a config file containing users and password used by the different services (tripal, galaxy, chado, ...)
:param config_file:
:return:
"""
config_variables = {}
logging.debug("Using config: %s" % os.path.abspath(config_file))
try:
with open(config_file, 'r') as stream:
yaml_dict = yaml.safe_load(stream)
for k, v in yaml_dict.items():
for k2, v2 in v.items():
config_variables[k2] = v2 # Add a key:value pair to variables for replacement in the compose template file
except FileNotFoundError:
logging.critical("The config file specified doesn't exist (%s)" % config_file)
sys.exit()
except OSError:
logging.critical("The config file specified cannot be read (%s)" % config_file)
sys.exit()
return config_variables
def parse_input(input_file):
"""
Parse the yml input file to extract data to create the SpeciesData objects
Return a list of dictionaries. Each dictionary contains data tied to a species
:param input_file:
:return:
"""
parsed_sp_dict_list = []
try:
with open(input_file, 'r') as stream:
try:
yaml_dict = yaml.safe_load(stream)
for k, v in yaml_dict.items():
parsed_sp_dict_list.append(v)
except yaml.YAMLError as err:
logging.critical("Input file is not in YAML format")
sys.exit(err)
except FileNotFoundError:
logging.critical("The specified input file doesn't exist (%s)" % input_file)
sys.exit()
except OSError:
logging.critical("The specified input file cannot be read (%s)" % input_file)
sys.exit()
return parsed_sp_dict_list
def filter_empty_not_empty_items(li):
"""
Separate a list between empty items and non empty items.
Return a dict with 2 keys: empty values (items) and non empty values (items)
:param li:
:return:
"""
filtered_dict = {"empty": [], "not_empty": []}
for i in li:
if i is None or i == "":
filtered_dict["empty"].append(i)
else:
filtered_dict["not_empty"].append(i)
return filtered_dict
def check_galaxy_state(genus_lowercase, species, script_dir):
"""
Read the logs of the galaxy container for the current species to check if the service is "ready"
:param genus_lowercase:
:param species:
:return:
"""
# Run supervisorctl status in the galaxy container via serexec
galaxy_logs = subprocess.run(["%s/serexec" % script_dir, "{0}_{1}_galaxy".format(genus_lowercase, species),
"supervisorctl", "status", "galaxy:"], capture_output=True)
if "galaxy:galaxy_web RUNNING" in str(galaxy_logs.stdout) \
and "galaxy:handler0 RUNNING" in str(galaxy_logs.stdout) \
and "galaxy:handler1 RUNNING" in str(galaxy_logs.stdout):
return 1
else:
return 0
def get_species_history_id(instance, full_name):
"""
Set and return the current species history id in its galaxy instance
:param instance:
:param full_name:
:return:
"""
histories = instance.histories.get_histories(name=str(full_name))
history_id = histories[0]["id"]
show_history = instance.histories.show_history(history_id=history_id)
return [history_id, show_history]
def write_metadata(metadata_file, metadata_dict):
"""
:param metadata_file:
:param metadata_dict:
:return:
"""
ret = 0
return metadata_file, metadata_dict, ret