import os
import shutil
import subprocess
import esm_rcfile
import six
import yaml
from esm_calendar import Date
from colorama import Fore, Back, Style, init
import esm_tools
from .batch_system import batch_system
from .filelists import copy_files, log_used_files
from .helpers import end_it_all, evaluate, write_to_log
from .namelists import Namelist
from loguru import logger
#####################################################################
# compute jobs #
#####################################################################
[docs]def run_job(config):
config["general"]["relevant_filetypes"] = [
"bin",
"config",
"forcing",
"input",
"restart_in",
]
config = evaluate(config, "compute", "compute_recipe")
return config
[docs]def compile_model(config):
"""Compiles the desired model before the run starts"""
model = config["general"]["setup_name"]
version = config["general"].get("version") or config[model].get("version")
if not version:
return config
if config.get("general", {}).get("run_number") == 1:
print("First year, checking if we need to compile...")
if not config.get("general", {}).get("use_compiled_model", True):
print(f"Huhu --> compiling {model}-{version}")
subprocess.run(
f"esm_master install-{model}-{version}",
shell=True,
cwd=config["general"]["experiment_src_dir"],
)
config["general"]["model_dir"] = (
config["general"]["experiment_src_dir"] + f"/{model}-{version}"
)
return config
[docs]def all_files_to_copy_append(
config, model, filetype, categ, file_source, file_interm, file_target
):
if file_source:
if not filetype + "_sources" in config[model]:
config[model][filetype + "_sources"] = {}
config[model][filetype + "_sources"][categ] = file_source
if file_interm:
if not filetype + "_intermediate" in config[model]:
config[model][filetype + "_intermediate"] = {}
config[model][filetype + "_intermediate"][categ] = file_interm
if file_target:
if not filetype + "_targets" in config[model]:
config[model][filetype + "_targets"] = {}
config[model][filetype + "_targets"][categ] = file_target
if filetype + "_files" in config[model]:
config[model][filetype + "_files"][categ] = categ
return config
[docs]def add_batch_hostfile(config):
config["general"]["batch"].calc_requirements(config)
config = all_files_to_copy_append(
config,
"general",
"config",
"batchhostfile",
config["general"]["batch"].bs.path,
None,
None,
)
return config
[docs]def prepare_coupler_files(config):
if config["general"]["standalone"] is False:
coupler_filename = config["general"]["coupler"].prepare(
config, config["general"]["coupler_config_dir"]
)
coupler_name = config["general"]["coupler"].name
if coupler_name == 'yac':
couplingfile = "coupling.xml"
else:
couplingfile = "namcouple"
all_files_to_copy_append(
config,
coupler_name,
"config",
couplingfile,
config["general"]["coupler_config_dir"] + "/" + coupler_filename,
None,
None,
)
return config
[docs]def create_new_files(config):
for model in list(config):
for filetype in config["general"]["all_filetypes"]:
if "create_" + filetype in config[model]:
filenames = config[model]["create_" + filetype].keys()
for filename in filenames:
with open(
config[model]["thisrun_" + filetype + "_dir"] + "/" + filename,
"w",
) as createfile:
actionlist = config[model]["create_" + filetype][filename]
for action in actionlist:
if "<--append--" in action:
appendtext = action.replace("<--append--", "")
createfile.write(appendtext.strip() + "\n")
all_files_to_copy_append(
config,
model,
filetype,
filename,
config[model]["thisrun_" + filetype + "_dir"] + "/" + filename,
None,
None,
)
return config
[docs]def modify_files(config):
# for model in config:
# for filetype in config["general"]["all_model_filetypes"]:
# if filetype == "restart":
# nothing = "nothing"
return config
[docs]def modify_namelists(config):
# Load and modify namelists:
if config["general"]["verbose"]:
six.print_("\n" "- Setting up namelists for this run...")
for model in config["general"]["valid_model_names"]:
six.print_("-" * 80)
six.print_("* %s" % config[model]["model"], "\n")
for model in config["general"]["valid_model_names"]:
config[model] = Namelist.nmls_load(config[model])
config[model] = Namelist.nmls_remove(config[model])
if model == "echam":
config = Namelist.apply_echam_disturbance(config)
config[model] = Namelist.nmls_modify(config[model])
config[model] = Namelist.nmls_finalize(
config[model], config["general"]["verbose"]
)
if config["general"]["verbose"]:
print("end of namelist section")
return config
[docs]def copy_files_to_thisrun(config):
if config["general"]["verbose"]:
six.print_("=" * 80, "\n")
six.print_("PREPARING EXPERIMENT")
# Copy files:
six.print_("\n" "- File lists populated, proceeding with copy...")
six.print_("- Note that you can see your file lists in the config folder")
six.print_("- You will be informed about missing files")
log_used_files(config)
config = copy_files(
config, config["general"]["in_filetypes"], source="init", target="thisrun"
)
return config
[docs]def copy_files_to_work(config):
if config["general"]["verbose"]:
six.print_("=" * 80, "\n")
six.print_("PREPARING WORK FOLDER")
config = copy_files(
config, config["general"]["in_filetypes"], source="thisrun", target="work"
)
return config
def _create_folders(config, filetypes):
"""
Generates the experiment file tree. Foldres are created for every filetype
except for "ignore".
"""
for filetype in filetypes:
if not filetype == "ignore":
if not filetype == "work":
if not os.path.exists(config["experiment_" + filetype + "_dir"]):
os.makedirs(config["experiment_" + filetype + "_dir"])
if not os.path.exists(config["thisrun_" + filetype + "_dir"]):
os.makedirs(config["thisrun_" + filetype + "_dir"])
def _create_setup_folders(config):
"""
Creates all folders for an experiment. This is the plugin function called
during the recipe. See also ``_create_folders`` for actual creation
This also creates a small marker file at the top of
the experiment so that the "root" can be found from inside.
"""
_create_folders(config["general"], config["general"]["all_filetypes"])
with open(config["general"]["experiment_dir"] + "/.top_of_exp_tree", "w") as top_marker:
top_marker.write(f"Top of experiment {config['general']['expid']}")
return config
def _create_component_folders(config):
for component in config["general"]["valid_model_names"]:
_create_folders(config[component], config["general"]["all_model_filetypes"])
return config
[docs]def initialize_experiment_logfile(config):
"""
Initializes the log file for the entire experiment.
Creates a file ``${BASE_DIR}/${EXPID}/log/${EXPID}_${setup_name}.log``
to keep track of start/stop times, job id numbers, and so on. Use the
function ``write_to_log`` to put information in this file afterwards.
The user can specify ``experiment_log_file`` under the ``general``
section of the configuration to override the default name. Timestamps
for each message are given by the section
``experiment_log_file_dateformat``, or defaults to ``Tue Mar 17
09:36:38 2020``, i.e. ``"%c"``. Please use ``stftime`` compatable
formats, as described here: https://strftime.org
Parameters
----------
dict :
The experiment configuration
Return
------
dict :
As per convention for the plug-in system; this gives back the
entire config.
Attention
---------
Calling this has some filesystem side effects. If the run number in
the general configuration is set to 1, and a file exists for
``general.exp_log_file``; this file is removed; and re-initialized.
"""
if config["general"]["run_number"] == 1:
if os.path.isfile(config["general"]["experiment_log_file"]):
os.remove(config["general"]["experiment_log_file"])
write_to_log(
config,
["# Beginning of Experiment " + config["general"]["expid"]],
message_sep="",
)
write_to_log(
config,
[
str(config["general"]["jobtype"]),
str(config["general"]["run_number"]),
str(config["general"]["current_date"]),
str(config["general"]["jobid"]),
"- submitted",
],
)
# Write trace-log file now that we know where to do that
if "trace_sink" in dir(logger):
logger.trace_sink.def_path(
config["general"]["experiment_dir"] +
"/log/" +
config["general"]["expid"] +
"_esm_runscripts_" +
config["general"]["run_datestamp"] +
".log"
)
return config
def _write_finalized_config(config):
def date_representer(dumper, date):
return dumper.represent_str("%s" % date.output())
def batch_system_representer(dumper, batch_system):
return dumper.represent_str(f"{batch_system.name}")
def strip_python_tags(s):
result = []
for line in s.splitlines():
idx = line.find("!!python/")
if idx > -1:
line = line[:idx]
result.append(line)
return '\n'.join(result)
yaml.add_representer(Date, date_representer)
yaml.add_representer(batch_system, batch_system_representer)
with open(
config["general"]["thisrun_config_dir"]
+ "/"
+ config["general"]["expid"]
+ "_finished_config.yaml",
"w",
) as config_file:
out = yaml.dump(config)
out = strip_python_tags(out)
config_file.write(out)
return config
[docs]def color_diff(diff):
for line in diff:
if line.startswith('+'):
yield Fore.GREEN + line + Fore.RESET
elif line.startswith('-'):
yield Fore.RED + line + Fore.RESET
elif line.startswith('^'):
yield Fore.BLUE + line + Fore.RESET
else:
yield line
[docs]def update_runscript(fromdir, scriptsdir, tfile, gconfig, file_type):
"""
Updates the script ``tfile`` in the directory ``scriptdir`` with the file in
the directory ``fromdir`` if the update flag (``-U``) is used during the call of
``esm_runscripts``. If that flag is not used and the source and target are different
then raises a user-friendly error recommending to use the ``-U`` flag with the warning
that the files will be overwritten.
Parameters
----------
cls : obj
Compute object.
fromdir : str
Path of the source.
scriptsdir : str
Path of the target.
tfile : str
Name of the script to be updated.
gconfig : dict
Dictionary containing the general information about the compute task.
file_type : str
String specifying the nature of the file, only necessary for printing information
and for the error description.
Exceptions
----------
UserError
If the target and source are different and the ``-U`` flag is not used when calling
``esm_runscripts``, returns an error.
"""
# If the target file in ``scriptsdir`` does not exist, then copy the file
# to the target.
if not os.path.isfile(scriptsdir + "/" + tfile):
oldscript = fromdir + "/" + tfile
print(oldscript)
shutil.copy2(oldscript, scriptsdir)
# If the target path exists compare the two scripts
else:
import difflib
import esm_parser
script_o = open(fromdir + "/" + tfile).readlines()
script_t = open(scriptsdir + "/" + tfile).readlines()
diffobj = difflib.SequenceMatcher(a=script_t, b=script_o)
# If the files are different
if not diffobj.ratio() == 1:
# Find differences
differences = (
f"{fromdir + '/' + tfile} differs from "
+ f"{scriptsdir + '/' + tfile}:\n"
)
for line in color_diff(difflib.unified_diff(script_t, script_o)):
differences += line
# If the --update flag is used, notify that the target script will
# be updated and do update it
if gconfig["update"]:
esm_parser.user_note(
f"Original {file_type} different from target",
differences
+ "\n"
+ f"{scriptsdir + '/' + tfile} will be updated!",
)
oldscript = fromdir + "/" + tfile
print(oldscript)
shutil.copy2(oldscript, scriptsdir)
# If the --update flag is not called, exit with an error showing the
# user how to proceed
else:
esm_parser.user_note(
f"Original {file_type} different from target",
differences
+ "\n"
+ "Note: You can choose to use -U flag in the esm_runscripts call "
+ "to automatically update the runscript (WARNING: This "
+ f"will overwrite your {file_type} in the experiment folder!)\n",
)
correct_input = False
while not correct_input:
update_choice = input(
f"Do you want that {scriptsdir + '/' + tfile} is "
+ "updated with the above changes? (y/n): "
)
if update_choice == "y":
correct_input = True
oldscript = fromdir + "/" + tfile
print(oldscript)
shutil.copy2(oldscript, scriptsdir)
print(f"{scriptsdir + '/' + tfile} updated!")
elif update_choice == "n":
correct_input = True
esm_parser.user_error(
f"Original {file_type} different from target",
differences
+ "\n"
+ "You can choose to -U flag in the esm_runscripts call "
+ "to update the runscript without asking (WARNING: This "
+ f"will overwrite your {file_type} in the experiment folder!)\n\n",
)
else:
print(f"'{update_choice}' is not a valid answer.")
def _copy_preliminary_files_from_experiment_to_thisrun(config):
# I don't like this one bit. DB
filelist = [
(
"scripts",
config["general"]["expid"]
+ "_"
+ config["general"]["setup_name"]
+ ".date",
"copy",
)
]
for filetype, filename, copy_or_link in filelist:
source = config["general"]["experiment_" + filetype + "_dir"]
dest = config["general"]["thisrun_" + filetype + "_dir"]
if copy_or_link == "copy":
method = shutil.copy2
elif copy_or_link == "link":
method = os.symlink
if os.path.isfile(source + "/" + filename):
method(source + "/" + filename, dest + "/" + filename)
this_script = config["general"]["scriptname"]
shutil.copy2("./" + this_script, config["general"]["thisrun_scripts_dir"])
for additional_file in config["general"]["additional_files"]:
shutil.copy2(additional_file, config["general"]["thisrun_scripts_dir"])
return config
def _show_simulation_info(config):
six.print_()
six.print_(80 * "=")
six.print_("STARTING SIMULATION JOB!")
six.print_("Experiment ID = %s" % config["general"]["expid"])
six.print_("Setup = %s" % config["general"]["setup_name"])
if "coupled_setup" in config["general"]:
six.print_("This setup consists of:")
for model in config["general"]["valid_model_names"]:
six.print_("- %s" % model)
six.print_("Experiment is installed in:")
six.print_(
" %s" % config["general"]["base_dir"] + "/" + config["general"]["expid"]
)
six.print_(80 * "=")
six.print_()
return config