From 4114dc008eb9ed35c22719e10acb4a9e16baba3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loraine=20Gu=C3=A9guen?= <loraine-gueguen@users.noreply.github.com> Date: Fri, 28 May 2021 15:18:15 +0200 Subject: [PATCH] run_wf (WIP) --- phaoexplorer_constants.py | 2 + run_workflow_phaeoexplorer.py | 218 ++++++++++++++++++---------------- 2 files changed, 120 insertions(+), 100 deletions(-) diff --git a/phaoexplorer_constants.py b/phaoexplorer_constants.py index 68d6c88..1cecdb1 100644 --- a/phaoexplorer_constants.py +++ b/phaoexplorer_constants.py @@ -5,7 +5,9 @@ import constants ### Workflows +WORKFLOWS_PATH = "workflows_phaeoexplorer/" WORKFLOW_LOAD_FASTA_GFF_JBROWSE = "load_fasta_gff_jbrowse" +WORKFLOW_LOAD_FASTA_GFF_JBROWSE_FILE_1_ORG = "Galaxy-Workflow-chado_load_tripal_synchronize_jbrowse_1org_v4.ga" WORKFLOW_BLAST = "blast" WORKFLOW_INTERPRO = "interpro" WORKFLOW_VALID_TYPES = [WORKFLOW_LOAD_FASTA_GFF_JBROWSE, WORKFLOW_BLAST, WORKFLOW_INTERPRO] diff --git a/run_workflow_phaeoexplorer.py b/run_workflow_phaeoexplorer.py index 1193b95..2c863e1 100755 --- a/run_workflow_phaeoexplorer.py +++ b/run_workflow_phaeoexplorer.py @@ -24,12 +24,39 @@ gga_init.py Usage: $ python3 gga_init.py -i input_example.yml --config [config file] [OPTIONS] """ -class RunWorkflowParam: +class StrainWorkflowParam: - def __init__(self, genus_species, strain_sex, attributes_dict): + def __init__(self, genus_species, strain_sex, genus, species, sex, strain, full_name, species_folder_name, org_id, + genome_analysis_id, ogs_analysis_id, blastp_analysis_id, interpro_analysis_id, hda_ids, history_id, + instance, instance_url, email, password): self.genus_species = genus_species self.strain_sex = strain_sex - self.param_dict = attributes_dict + self.genus = genus + self.species = species + self.full_name = full_name + self.species_folder_name = species_folder_name + self.sex = sex + self.strain = strain + self.org_id = org_id + self.genome_analysis_id = genome_analysis_id + self.ogs_analysis_id = ogs_analysis_id + self.blastp_analysis_id = blastp_analysis_id + self.interpro_analysis_id = interpro_analysis_id + self.hda_ids = hda_ids + self.history_id = history_id + self.instance = instance + self.instance_url = instance_url + self.email = email + self.password = password + + def check_param_for_workflow_load_fasta_gff_jbrowse(self, param): + # Look for empty parameters values, throw a critical error if a parameter value is invalid + if param is None or param == "": + logging.critical( + "Empty parameter value found for organism {0} (parameter: {1}, parameter value: {2})".format( + org_full_name, param_name, param_value)) + sys.exit() + class RunWorkflow(speciesData.SpeciesData): """ @@ -749,24 +776,26 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): hda_ids = run_workflow_for_current_organism.import_datasets_into_history() - # Create the dictionary holding all attributes needed to connect to the galaxy instance - param = {"genus": run_workflow_for_current_organism.genus, - "species": run_workflow_for_current_organism.species, - "genus_species": run_workflow_for_current_organism.genus_species, - "full_name": run_workflow_for_current_organism.full_name, - "species_folder_name": run_workflow_for_current_organism.species_folder_name, - "sex": run_workflow_for_current_organism.sex, - "strain": run_workflow_for_current_organism.strain, - "org_id": org_id, - "genome_analysis_id": genome_analysis_id, - "ogs_analysis_id": ogs_analysis_id, - "hda_ids": hda_ids, - "history_id": history_id, - "instance": run_workflow_for_current_organism.instance, - "instance_url": run_workflow_for_current_organism.instance_url, - "email": config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - "password": config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD]} - + # Create the StrainWorkflowParam object holding all attributes needed for the workflow + sp_wf_param = StrainWorkflowParam( + genus_species=run_workflow_for_current_organism.genus_species, + strain_sex=run_workflow_for_current_organism.strain_sex, + genus=run_workflow_for_current_organism.genus, + species=species, + full_name=run_workflow_for_current_organism.full_name, + species_folder_name=run_workflow_for_current_organism.species_folder_name, + sex=run_workflow_for_current_organism.sex, + strain=run_workflow_for_current_organism.strain, + org_id=org_id, + genome_analysis_id=genome_analysis_id, + ogs_analysis_id=ogs_analysis_id, + hda_ids=hda_ids, + history_id=history_id, + instance=run_workflow_for_current_organism.instance, + instance_url=run_workflow_for_current_organism.instance_url, + email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], + password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] + ) if workflow_type == "blast": @@ -776,22 +805,25 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): blastp_analysis_id = ids["blastp_analysis_id"] hda_ids = run_workflow_for_current_organism.import_datasets_into_history() - # Create the dictionary holding all attributes needed to connect to the galaxy instance - param = {"genus": run_workflow_for_current_organism.genus, - "species": run_workflow_for_current_organism.species, - "genus_species": run_workflow_for_current_organism.genus_species, - "full_name": run_workflow_for_current_organism.full_name, - "species_folder_name": run_workflow_for_current_organism.species_folder_name, - "sex": run_workflow_for_current_organism.sex, - "strain": run_workflow_for_current_organism.strain, - "org_id": org_id, - "blastp_analysis_id": blastp_analysis_id, - "hda_ids": hda_ids, - "history_id": history_id, - "instance": run_workflow_for_current_organism.instance, - "instance_url": run_workflow_for_current_organism.instance_url, - "email": config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - "password": config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD]} + # Create the StrainWorkflowParam object holding all attributes needed for the workflow + sp_wf_param = StrainWorkflowParam( + genus_species=run_workflow_for_current_organism.genus_species, + strain_sex=run_workflow_for_current_organism.strain_sex, + genus=run_workflow_for_current_organism.genus, + species=species, + full_name=run_workflow_for_current_organism.full_name, + species_folder_name=run_workflow_for_current_organism.species_folder_name, + sex=run_workflow_for_current_organism.sex, + strain=run_workflow_for_current_organism.strain, + org_id=org_id, + blastp_analysis_id=genome_analysis_id, + hda_ids=hda_ids, + history_id=history_id, + instance=run_workflow_for_current_organism.instance, + instance_url=run_workflow_for_current_organism.instance_url, + email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], + password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] + ) if workflow_type == "interpro": @@ -802,29 +834,27 @@ def get_sp_workflow_param(sp_dict, main_dir, config, workflow_type): interpro_analysis_id = ids["interpro_analysis_id"] hda_ids = run_workflow_for_current_organism.import_datasets_into_history() - # Create the dictionary holding all attributes needed to connect to the galaxy instance - param = {"genus": run_workflow_for_current_organism.genus, - "species": run_workflow_for_current_organism.species, - "genus_species": run_workflow_for_current_organism.genus_species, - "full_name": run_workflow_for_current_organism.full_name, - "species_folder_name": run_workflow_for_current_organism.species_folder_name, - "sex": run_workflow_for_current_organism.sex, - "strain": run_workflow_for_current_organism.strain, - "org_id": org_id, - "interpro_analysis_id": interpro_analysis_id, - "hda_ids": hda_ids, - "history_id": history_id, - "instance": run_workflow_for_current_organism.instance, - "instance_url": run_workflow_for_current_organism.instance_url, - "email": config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], - "password": config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD]} - - - sp_wf_param = RunWorkflowParam( - genus_species=run_workflow_for_current_organism.genus_species, - strain_sex=run_workflow_for_current_organism.strain_sex, - param_dict=param - ) + # Create the StrainWorkflowParam object holding all attributes needed for the workflow + sp_wf_param = StrainWorkflowParam( + genus_species=run_workflow_for_current_organism.genus_species, + strain_sex=run_workflow_for_current_organism.strain_sex, + genus=run_workflow_for_current_organism.genus, + species=species, + full_name=run_workflow_for_current_organism.full_name, + species_folder_name=run_workflow_for_current_organism.species_folder_name, + sex=run_workflow_for_current_organism.sex, + strain=run_workflow_for_current_organism.strain, + org_id=org_id, + interpro_analysis_id=interpro_analysis_id, + hda_ids=hda_ids, + history_id=history_id, + instance=run_workflow_for_current_organism.instance, + instance_url=run_workflow_for_current_organism.instance_url, + email=config[constants.CONF_GALAXY_DEFAULT_ADMIN_EMAIL], + password=config[constants.CONF_GALAXY_DEFAULT_ADMIN_PASSWORD] + ) + + return sp_wf_param @@ -942,65 +972,53 @@ if __name__ == "__main__": for sp_dict in sp_dict_list: # Add and retrieve all analyses/organisms for the current input species and add their IDs to the input dictionary - sp_workflow_attributes = get_sp_workflow_param( + sp_workflow_param = get_sp_workflow_param( sp_dict, main_dir=main_dir, config=config, workflow_type=phaoexplorer_constants.WORKFLOW_LOAD_FASTA_GFF_JBROWSE) - current_sp_genus_species = sp_workflow_attributes.genus_species - current_sp_strain_sex = sp_workflow_attributes.strain_sex - current_sp_strain_sex_attributes_dict = sp_workflow_attributes.param_dict + current_sp_genus_species = sp_workflow_param.genus_species + current_sp_strain_sex = sp_workflow_param.strain_sex # Add the species dictionary to the complete dictionary # This dictionary contains every organism present in the input file # Its structure is the following: # {genus species: {strain1_sex1: {variables_key: variables_values}, strain1_sex2: {variables_key: variables_values}}} if not current_sp_genus_species in all_sp_workflow_dict.keys(): - all_sp_workflow_dict[current_sp_genus_species] = {current_sp_strain_sex: current_sp_strain_sex_attributes_dict} + all_sp_workflow_dict[current_sp_genus_species] = {current_sp_strain_sex: sp_workflow_param} else: if not current_sp_strain_sex in all_sp_workflow_dict[current_sp_genus_species].keys(): - all_sp_workflow_dict[current_sp_genus_species][current_sp_strain_sex] = current_sp_strain_sex_attributes_dict + all_sp_workflow_dict[current_sp_genus_species][current_sp_strain_sex] = sp_workflow_param else: logging.error("Duplicate organism with 'genus_species' = '{0}' and 'strain_sex' = '{1}'".format(current_sp_genus_species, current_sp_strain_sex)) for species, strains in all_sp_workflow_dict.items(): - if len(list(strains.keys())) == 1: + strains_count = len(list(strains.keys())) + if strains_count == 1: logging.info("Input species %s: 1 strain detected in input dictionary" % species) + strain_sex = strains.keys()[0] + sp_workflow_param = strains[strain_sex] # Set workflow path (1 organism) - workflow_path = os.path.join(os.path.abspath(script_dir), "workflows_phaeoexplorer/Galaxy-Workflow-chado_load_tripal_synchronize_jbrowse_1org_v4.ga") - - # Instance object required variables - instance_url, email, password = None, None, None - - # Set the galaxy instance variables - for k2, v2 in strains.values(): - instance_url = v2["instance_url"] - email = v2["email"] - password = v2["password"] - - instance = galaxy.GalaxyInstance(url=instance_url, email=email, password=password) + workflow_path = os.path.join(os.path.abspath(script_dir), phaoexplorer_constants.WORKFLOWS_PATH, phaoexplorer_constants.WORKFLOW_LOAD_FASTA_GFF_JBROWSE_FILE_1_ORG) # Check if the versions of tools specified in the workflow are installed in galaxy - install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=instance) - - organisms_key_name = list(strains.keys()) - org_dict = strains[organisms_key_name[0]] + install_changesets_revisions_from_workflow(workflow_path=workflow_path, instance=sp_workflow_param.instance) - history_id = org_dict["history_id"] + history_id = sp_workflow_param.history_id # Organism 1 attributes - org_genus = org_dict["genus"] - org_species = org_dict["species"] - org_genus_species = org_dict["genus_species"] - org_species_folder_name = org_dict["species_folder_name"] - org_full_name = org_dict["full_name"] - org_strain = org_dict["sex"] - org_sex = org_dict["strain"] - org_org_id = org_dict["org_id"] - org_genome_analysis_id = org_dict["genome_analysis_id"] - org_ogs_analysis_id = org_dict["ogs_analysis_id"] + org_genus = sp_workflow_param.genus + org_species = sp_workflow_param.species + org_genus_species = sp_workflow_param.genus_species + org_species_folder_name = sp_workflow_param.species_folder_name + org_full_name = sp_workflow_param.full_name + org_strain = sp_workflow_param.strain + org_sex = sp_workflow_param.strain + org_org_id = sp_workflow_param.org_id + org_genome_analysis_id = sp_workflow_param.genome_analysis_id + org_ogs_analysis_id = sp_workflow_param.ogs_analysis_id org_genome_hda_id = org_dict["hda_ids"]["genome_hda_id"] org_transcripts_hda_id = org_dict["hda_ids"]["transcripts_hda_id"] org_proteins_hda_id = org_dict["hda_ids"]["proteins_hda_id"] @@ -1344,10 +1362,10 @@ if __name__ == "__main__": for sp_dict in sp_dict_list: # Add and retrieve all analyses/organisms for the current input species and add their IDs to the input dictionary - sp_workflow_attributes = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workfow_type="blast") + sp_workflow_param = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workfow_type="blast") - current_sp_genus_species = list(sp_workflow_attributes.keys())[0] - current_sp_genus_species_dict = list(sp_workflow_attributes.values())[0] + current_sp_genus_species = list(sp_workflow_param.keys())[0] + current_sp_genus_species_dict = list(sp_workflow_param.values())[0] current_sp_strain_sex = list(current_sp_genus_species_dict.keys())[0] current_sp_strain_sex_attributes_dict = list(current_sp_genus_species_dict.values())[0] @@ -1614,10 +1632,10 @@ if __name__ == "__main__": for sp_dict in sp_dict_list: # Add and retrieve all analyses/organisms for the current input species and add their IDs to the input dictionary - sp_workflow_attributes = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workfow_type="blast") + sp_workflow_param = get_sp_workflow_param(sp_dict, main_dir=args.main_directory, config=config, workfow_type="blast") - current_sp_genus_species = list(sp_workflow_attributes.keys())[0] - current_sp_genus_species_dict = list(sp_workflow_attributes.values())[0] + current_sp_genus_species = list(sp_workflow_param.keys())[0] + current_sp_genus_species_dict = list(sp_workflow_param.values())[0] current_sp_strain_sex = list(current_sp_genus_species_dict.keys())[0] current_sp_strain_sex_attributes_dict = list(current_sp_genus_species_dict.values())[0] -- GitLab