import copy
import glob
import logging
import os
import pdb
import shutil
import sys
from datetime import datetime
import esm_rcfile
import six
import tqdm
import esm_plugin_manager
[docs]class jobclass:
relevant_files = []
filetype_specific_dict = {}
def __init__(self, job_type, recipe_steps=None):
self.recipefile = esm_rcfile.FUNCTION_PATH + "/esm_software/esm_runscripts/esm_runscripts.yaml"
self.pluginsfile = esm_rcfile.FUNCTION_PATH + "/esm_software/esm_runscripts/esm_plugins.yaml"
self.framework_recipe = esm_plugin_manager.read_recipe(self.recipefile, {"job_type": job_type})
if recipe_steps:
self.framework_recipe["recipe"] = recipe_steps
self.framework_plugins = esm_plugin_manager.read_plugin_information(self.pluginsfile, self.framework_recipe)
esm_plugin_manager.check_plugin_availability(self.framework_plugins)
[docs] def evaluate(self, config):
config = esm_plugin_manager.work_through_recipe(self.framework_recipe, self.framework_plugins, config)
return config
#########################################################################################
# general stuff #
#########################################################################################
[docs] def assemble_file_lists(self, config, filetypes):
all_files_to_copy = []
six.print_("\n" "- Generating file lists for this run...")
for model in config["general"]["valid_model_names"]:
six.print_("-" * 80)
six.print_("* %s" % config[model]["model"], "\n")
all_component_files, filetype_specific_dict = (
self.really_assemble_file_list(config, model, filetypes)
)
self.filetype_specific_dict[model] = filetype_specific_dict
all_files_to_copy += all_component_files
return all_files_to_copy
[docs] @staticmethod
def print_used_files(config):
self = config["general"]["jobclass"]
for model in self.filetype_specific_dict:
with open(
config[model]["thisrun_config_dir"]
+ "/"
+ config["general"]["expid"]
+ "_filelist_"
+ config["general"]["run_datestamp"],
"w",
) as flist:
flist.write(
"These files are used for \nexperiment %s\ncomponent %s\ndate %s"
% (
config["general"]["expid"],
config[model]["model"],
config["general"]["run_datestamp"],
)
)
flist.write("\n")
flist.write(80 * "-")
for filetype in self.filetype_specific_dict[model]:
flist.write("\n" + filetype.upper() + ":\n")
for source, exp_tree, exp_name, work_dir_name, subfolder in self.filetype_specific_dict[model][filetype]:
flist.write("\nSource: " + source)
flist.write("\nExp Tree: " + exp_tree + subfolder + exp_name)
flist.write("\nWork Dir: " + subfolder + work_dir_name)
flist.write("\n")
print ("- " + subfolder + work_dir_name +": " + source)
flist.write("\n")
flist.write(80 * "-")
[docs] def really_assemble_file_list(self, config, model, filetypes):
modelconfig = config[model]
general_config = config["general"]
#print (model)
#import esm_parser
#esm_parser.pprint_config(modelconfig)
#sys.exit(0)
all_files_to_process = []
filetype_files_for_list = {}
for filetype in filetypes:
filetype_files = []
if filetype == "restart_in" and not modelconfig["lresume"]:
six.print_("- restart files do not make sense for a cold start, skipping...")
continue
if filetype + "_sources" not in modelconfig:
continue
####### start globbing here
inverted_dict = {}
if filetype + "_files" in modelconfig:
for k, v in six.iteritems(modelconfig[filetype + "_files"]):
inverted_dict[v] = k
sources_dict = copy.deepcopy(modelconfig[filetype + "_sources"])
for file_descriptor, file_source in six.iteritems(
sources_dict
):
if "*" in file_source:
#esm_parser.pprint_config(self.config)
# restart_out* and outdata* entries in yaml files are provided without their path
# as the path generated automagically. We need to add the path here so files can
# be found with glob.glob(file_source)
if filetype == "restart_in" and not file_source.startswith("/"):
# don't use basename on restart_in as restarts can be in subfolders,
# relative to parent_restart_dir, example: oifs.yaml
file_source = modelconfig["parent_restart_dir"] + "/" + file_source
elif filetype == "restart_out" or filetype == "outdata" or filetype == 'log':
file_source = modelconfig["thisrun_work_dir"] + "/" + os.path.basename(file_source)
if glob.glob(file_source):
file_category = None
subfolder = None
if filetype + "_files" in modelconfig:
if file_descriptor in modelconfig[filetype + "_files"]:
file_category = inverted_dict[file_descriptor]
if filetype + "_in_work" in modelconfig:
if file_descriptor in modelconfig[filetype + "_in_work"]:
subfolder = modelconfig[filetype + "_in_work"][file_descriptor].replace("*", "")
if not subfolder.endswith("/"):
subfolder = subfolder + "/"
all_file_sources = glob.glob(file_source)
running_index = 0
# loop through files found with glob.glob(file_source) and add
# each of them to config dict with and index added to the file descriptor
for new_source in all_file_sources:
running_index += 1
new_descriptor = file_descriptor + "_" + str(running_index)
modelconfig[filetype + "_sources"][new_descriptor] = new_source
if file_category:
new_category = file_category + "_" + str(running_index)
modelconfig[filetype + "_files"][new_category] = new_descriptor
if subfolder:
new_in_work = subfolder + new_source.rsplit("/", 1)[-1]
modelconfig[filetype + "_in_work"][new_descriptor] = new_in_work
del modelconfig[filetype + "_sources"][file_descriptor]
if file_category:
del modelconfig[filetype + "_files"][file_category]
if subfolder:
del modelconfig[filetype + "_in_work"][file_descriptor]
else:
print("jobclass.py: globbing failed for FILE SOURCE: ",file_source)
######## end globbing stuff
filedir_intermediate = modelconfig["thisrun_" + filetype + "_dir"]
for file_descriptor, file_source in six.iteritems(
modelconfig[filetype + "_sources"]
):
if filetype == "restart_in" and not file_source.startswith("/"):
# don't use basename on restart_in as restarts can be in subfolders,
# relative to parent_restart_dir, example: oifs.yaml
file_source = modelconfig["parent_restart_dir"] + "/" + file_source
logging.debug(
"file_descriptor=%s, file_source=%s", file_descriptor, file_source
)
if filetype + "_files" in modelconfig:
if file_descriptor not in modelconfig[filetype + "_files"].values():
continue
else:
inverted_dict = {}
for k, v in six.iteritems(modelconfig[filetype + "_files"]):
inverted_dict[v] = k
file_category = inverted_dict[file_descriptor]
else:
file_category = file_descriptor
logging.debug(type(file_source))
# should be generalized to all sorts of dates on day
all_years = [general_config["current_date"].year]
if (
filetype + "_additional_information" in modelconfig
and file_category in modelconfig[filetype + "_additional_information"]
):
if (
"need_timestep_before" in modelconfig[filetype + "_additional_information"][file_category]
):
all_years.append(general_config["prev_date"].year)
if (
"need_timestep_after" in modelconfig[filetype + "_additional_information"][file_category]
):
all_years.append(general_config["next_date"].year)
if (
"need_year_before" in modelconfig[filetype + "_additional_information"][file_category]
):
all_years.append(general_config["current_date"].year - 1)
if (
"need_year_after" in modelconfig[filetype + "_additional_information"][file_category]
):
all_years.append(general_config["current_date"].year + 1 )
all_years = list(dict.fromkeys(all_years)) # removes duplicates
if (
filetype + "_in_work" in modelconfig
and file_category in modelconfig[filetype + "_in_work"].keys()
):
target_name = modelconfig[filetype + "_in_work"][file_category]
else:
target_name = os.path.basename(file_source)
for year in all_years:
this_target_name=target_name.replace("@YEAR@", str(year))
# deniz: fix the bug due to @YEAR@ substitution. This fix
# also performs the substitution on the dictionary keys.
if isinstance(file_source, dict):
file_source_new = {}
for key, value in six.iteritems(file_source):
key_new = key.replace("@YEAR@", str(year))
file_source_new[key_new] = value
del file_source
file_source = copy.deepcopy(file_source_new)
# deniz: end fix
source_name=self.find_correct_source(file_source, year)
file_target = (
filedir_intermediate + "/" + this_target_name
)
if "/" in this_target_name:
subfolder = this_target_name.rsplit("/", 1)[0] + "/"
else:
subfolder = ""
filetype_files.append(
(
source_name,
filedir_intermediate, os.path.basename(source_name),
this_target_name.rsplit("/", 1)[-1],
subfolder
)
)
filetype_files_for_list[filetype] = filetype_files
all_files_to_process += filetype_files
return all_files_to_process, filetype_files_for_list
[docs] def find_correct_source(self, file_source, year):
if isinstance(file_source, dict):
logging.debug(
"Checking which file to use for this year: %s",
year,
)
for fname, valid_years in six.iteritems(file_source):
logging.debug("Checking %s", fname)
min_year = float(valid_years.get("from", "-inf"))
max_year = float(valid_years.get("to", "inf"))
logging.debug("Valid from: %s", min_year)
logging.debug("Valid to: %s", max_year)
logging.debug(
"%s <= %s --> %s",
min_year,
year,
min_year <= year,
)
logging.debug(
"%s <= %s --> %s",
year,
max_year,
year <= max_year,
)
if (
min_year <= year
and year <= max_year
):
return fname
else:
continue
return file_source
[docs] @staticmethod
def end_it_all(config, silent=False):
if config["general"]["profile"]:
for line in timing_info:
print(line)
if not silent:
print("Exiting entire Python process!")
sys.exit()
[docs] @staticmethod
def write_to_log(config, message, message_sep=None):
"""
Puts a message into the experiment log file
Parameters
----------
message : list
A list of the message elements; which is joined by either (highest
to lowest): 1) the message_sep argument passed to the method, 2)
The user's chosen seperator, as written in
``self.config["general"]["experiment_log_file_message_sep"]``, 3)
An empty space ``" "``.
message_sep : None
The hard-coded message seperator to use; which ignores user choices.
Note
----
The user can control two things regarding the logfile format:
1) The datestamp formatting, whjich is taken from the config
section ``general.experiment_log_file_dateformat``.
2) The message seperators; taken from
``general.experiment_log_file_message_sep``. Note that if the
programmer passes a ``message_sep`` argument; this one wins over
the user choice.
"""
try:
with open(config["general"]["experiment_log_file"], "a+") as logfile:
line = jobclass.assemble_log_message(config, message, message_sep)
logfile.write(line + "\n")
except KeyError:
print("Sorry; couldn't find 'experiment_log_file' in config['general']...")
esm_parser.pprint_config(self.config["general"])
raise
[docs] @staticmethod
def assemble_log_message(config, message, message_sep=None, timestampStr_from_Unix=False):
"""Assembles message for log file. See doc for write_to_log"""
message = [str(i) for i in message]
dateTimeObj = datetime.now()
strftime_str = config["general"].get("experiment_log_file_dateformat", "%c")
if message_sep is None:
message_sep = config["general"].get("experiment_log_file_message_sep", " ")
if timestampStr_from_Unix:
timestampStr = "$(date +"+strftime_str+")"
else:
timestampStr = dateTimeObj.strftime(strftime_str)
# TODO: Do we want to be able to specify a timestamp seperator as well?
line = timestampStr + " : " + message_sep.join(message)
return line
[docs] @staticmethod
def copy_files_from_work_to_thisrun(config, target = "thisrun", source = "work"):
# idea is to unify all the copy routines by giving a parameter that tells from where to where stuff is to be copied
# source = "init", "thisrun", "work"
# target = "thisrun", "work", "experiment"
six.print_("=" * 80, "\n")
six.print_("COPYING STUFF FROM " + source.upper() + " TO " + target.upper() + " FOLDERS")
successful_files = []
missing_files = {}
# TODO: Check if we are on login node or elsewhere for the progress
# bar, it doesn't make sense on the compute nodes:
relevant_filetypes = config["general"]["all_model_filetypes"]
if target == "work" or source == "init":
relevant_filetypes = config["general"]["in_filetypes"]
else:
relevant_filetypes = config["general"]["out_filetypes"]
for filetype in relevant_filetypes:
for model in config["general"]["valid_model_names"] + ["general"]:
if filetype + "_sources" in config[model] and not filetype == "ignore":
for categ in config[model][filetype + "_sources"]:
file_source = config[model][filetype + "_sources"][categ]
if target == "thisrun":
file_target = config[model][filetype + "_intermediate"][categ]
else:
file_target = config[model][filetype + "_targets"][categ]
dest_dir = file_target.rsplit("/", 1)[0]
try:
if not os.path.isdir(dest_dir):
os.makedirs(dest_dir)
shutil.copy2(file_source, file_target)
print ("Copying " + file_source)
print (" ---> " + file_target)
successful_files.append(file_source)
except IOError:
missing_files.update({file_target: file_source})
if missing_files:
if not "files_missing_when_preparing_run" in config["general"]:
config["general"]["files_missing_when_preparing_run"] = {}
six.print_("--- WARNING: These files were missing:")
for missing_file in missing_files:
print( " - " + missing_file + ": " + missing_files[missing_file])
config["general"]["files_missing_when_preparing_run"].update(missing_files)
return config
[docs] @staticmethod
def copy_files(config, flist, source, target):
self = config["general"]["jobclass"]
# idea is to unify all the copy routines by giving a parameter that tells from where to where stuff is to be copied
# source = "init", "thisrun", "work"
# target = "thisrun", "work", "experiment"
successful_files = []
missing_files = {}
# TODO: Check if we are on login node or elsewhere for the progress
# bar, it doesn't make sense on the compute nodes:
for ftuple in tqdm.tqdm(
flist,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]",
):
logging.debug(ftuple)
(file_init, filedir_intermediate, filename_intermediate, filename_work, subfolder) = ftuple
if source == "init":
file_source = file_init
elif source == "thisrun":
file_source = filedir_intermediate + "/" + subfolder + filename_work
if target == "thisrun":
file_target = filedir_intermediate + "/" + subfolder + filename_work
dest_dir = filedir_intermediate + "/" + subfolder
elif target == "work":
file_target = config["general"]["thisrun_work_dir"] + "/" + subfolder + filename_work
dest_dir = config["general"]["thisrun_work_dir"] + "/" + subfolder
if not os.path.isdir(file_source):
try:
if not os.path.isdir(dest_dir):
os.mkdir(dest_dir)
shutil.copy2(file_source, file_target)
successful_files.append(file_source)
except IOError:
missing_files.update({file_target: file_source})
if missing_files:
if not "files_missing_when_preparing_run" in config["general"]:
config["general"]["files_missing_when_preparing_run"] = {}
six.print_("--- WARNING: These files were missing:")
for missing_file in missing_files:
print( " - " + missing_file + ": " + missing_files[missing_file])
config["general"]["files_missing_when_preparing_run"].update(missing_files)
return config
[docs] @staticmethod
def report_missing_files(config):
if "files_missing_when_preparing_run" in config["general"]:
if not config["general"]["files_missing_when_preparing_run"] == {}:
print ()
print ("========================================================")
print ("MISSING FILES:")
for missing_file in config["general"]["files_missing_when_preparing_run"]:
print ("-- " + missing_file +": ")
print (" --> " + config["general"]["files_missing_when_preparing_run"][missing_file] )
if not config["general"]["files_missing_when_preparing_run"] == {}:
print ("========================================================")
return config