Source code for esm_runscripts.tidy

import filecmp
import os
import sys
import re
import time
import pathlib

import psutil
import shutil

from . import coupler, database_actions, helpers
from .filelists import copy_files, resolve_symlinks


[docs]def run_job(config): config["general"]["relevant_filetypes"] = [ "log", "mon", "outdata", "restart_out", #"bin", #"config", #"forcing", #"input", #"restart_in", "ignore", "unknown", ] helpers.evaluate(config, "tidy", "tidy_recipe") return config
[docs]def init_monitor_file(config): called_from = config["general"]["last_jobtype"] monitor_file = config["general"]["monitor_file"] monitor_file.write("tidy job initialized \n") monitor_file.write( "attaching to process " + str(config["general"]["launcher_pid"]) + " \n" ) monitor_file.write("Called from a " + called_from + "job \n") return config
[docs]def get_last_jobid(config): called_from = config["general"]["last_jobtype"] last_jobid = "UNKNOWN" if called_from == "compute": with open(config["general"]["experiment_log_file"], "r") as logfile: lastline = [ l for l in logfile.readlines() if "compute" in l and "start" in l ][-1] last_jobid = lastline.split(" - ")[0].split()[-1] config["general"]["last_jobid"] = last_jobid return config
[docs]def copy_stuff_back_from_work(config): config = copy_files( config, config["general"]["relevant_filetypes"], "work", "thisrun" ) return config
[docs]def wait_and_observe(config): if config["general"]["submitted"]: monitor_file = config["general"]["monitor_file"] thistime = 0 error_check_list = assemble_error_list(config) while job_is_still_running(config): monitor_file.write("still running \n") config["general"]["next_test_time"] = thistime config = check_for_errors(config) thistime = thistime + 10 time.sleep(10) thistime = thistime + 100000000 config["general"]["next_test_time"] = thistime config = check_for_errors(config) return config
[docs]def tidy_coupler(config): if config["general"]["standalone"] == False: config["general"]["coupler"].tidy(config) return config
[docs]def wake_up_call(config): called_from = config["general"]["last_jobtype"] monitor_file = config["general"]["monitor_file"] last_jobid = config["general"]["last_jobid"] monitor_file.write("job ended, starting to tidy up now \n") # Log job completion if called_from != "command_line": helpers.write_to_log( config, [ called_from, str(config["general"]["run_number"]), str(config["general"]["current_date"]), last_jobid, "- done", ], ) # Tell the world you're cleaning up: helpers.write_to_log( config, [ str(config["general"]["jobtype"]), str(config["general"]["run_number"]), str(config["general"]["current_date"]), str(config["general"]["jobid"]), "- start", ], ) return config
[docs]def assemble_error_list(config): gconfig = config["general"] known_methods = ["warn", "kill"] stdout = ( gconfig["thisrun_scripts_dir"] + "/" + gconfig["expid"] + "_compute_" + gconfig["run_datestamp"] + "_" + gconfig["jobid"] + ".log" ) error_list = [ ("error", stdout, "warn", 60, 60, "keyword error detected, watch out") ] for model in config: if "check_error" in config[model]: for trigger in config[model]["check_error"]: search_file = stdout method = "warn" frequency = 60 message = "keyword " + trigger + " detected, watch out" if isinstance(config[model]["check_error"][trigger], dict): if "file" in config[model]["check_error"][trigger]: search_file = config[model]["check_error"][trigger]["file"] if search_file == "stdout" or search_file == "stderr": search_file = stdout if "method" in config[model]["check_error"][trigger]: method = config[model]["check_error"][trigger]["method"] if method not in known_methods: method = "warn" if "message" in config[model]["check_error"][trigger]: message = config[model]["check_error"][trigger]["message"] if "frequency" in config[model]["check_error"][trigger]: frequency = config[model]["check_error"][trigger]["frequency"] try: frequency = int(frequency) except: frequency = 60 elif isinstance(config[model]["check_error"][trigger], str): pass else: continue error_list.append( (trigger, search_file, method, frequency, frequency, message) ) config["general"]["error_list"] = error_list return config
[docs]def check_for_errors(config): new_list = [] error_check_list = config["general"]["error_list"] monitor_file = config["general"]["monitor_file"] time = config["general"]["next_test_time"] for ( trigger, search_file, method, next_check, frequency, message, ) in error_check_list: warned = 0 if next_check <= time: if os.path.isfile(search_file): with open(search_file) as origin_file: for line in origin_file: if trigger.upper() in line.upper(): if method == "warn": warned = 1 monitor_file.write("WARNING: " + message + "\n") break elif method == "kill": harakiri = "scancel " + config["general"]["jobid"] monitor_file.write("ERROR: " + message + "\n") monitor_file.write("Will kill the run now..." + "\n") monitor_file.flush() print("ERROR: " + message) print("Will kill the run now...", flush=True) database_actions.database_entry_crashed(config) os.system(harakiri) sys.exit(42) next_check += frequency if warned == 0: new_list.append( (trigger, search_file, method, next_check, frequency, message) ) config["general"]["error_list"] = new_list return config
[docs]def job_is_still_running(config): if psutil.pid_exists(config["general"]["launcher_pid"]): return True return False
def _increment_date_and_run_number(config): config["general"]["run_number"] += 1 config["general"]["current_date"] += config["general"]["delta_date"] return config def _write_date_file(config): # self, date_file=None): monitor_file = config["general"]["monitor_file"] # if not date_file: date_file = ( config["general"]["experiment_scripts_dir"] + "/" + config["general"]["expid"] + "_" + config["general"]["setup_name"] + ".date" ) with open(date_file, "w") as date_file: date_file.write( config["general"]["current_date"].output() + " " + str(config["general"]["run_number"]) ) monitor_file.write("writing date file \n") return config
[docs]def clean_run_dir(config): """ This plugin allows you to clean up the ``run_${DATE}`` folders. To do that you can use the following variables under the ``general`` section of your runscript (documentation follows order of code as it is executed): * ``clean_runs``: **This is the most important variable for most users**. It can take the following values: * ``True``: removes the ``run_`` directory after each run (**overrides every other** ``clean_`` **option**). * ``False``: does not remove any ``run_`` directory (default) if no ``clean_`` variable is defined. * ``<int>``: giving an integer as a value results in deleting the ``run_`` folders except for the last <int> runs (recommended option as it allows for debugging of crashed simulations). .. Note:: ``clean_runs: (bool)`` is incompatible with ``clean_this_rundir`` and ``clean_runs: (int)`` is incompatible with ``clean_old_rundirs_except`` (an error will be raised after the end of the first simulation). The functionality of ``clean_runs`` variable **alone will suffice most of the standard user requirements**. If finer tunning for the removal of ``run_`` directories is required you can used the following variables instead of ``clean_runs``. * ``clean_this_rundir``: (bool) Removes the entire run directory (equivalent to ``clean_runs: (bool)``). ``clean_this_rundir: True`` **overrides every other** ``clean_`` **option**. * ``clean_old_rundirs_except``: (int) Removes the entire run directory except for the last <x> runs (equivalent to ``clean_runs: (int)``). * ``clean_old_rundirs_keep_every``: (int) Removes the entire run directory except every <x>th run. Compatible with ``clean_old_rundirs_except`` or ``clean_runs: (int)``. * ``clean_<filetype>_dir``: (bool) Erases the run directory for a specific filetype. Compatible with all the other options. * ``clean_size``: (int or float) Erases all files with size greater than ``clean_size``, must be specified in bytes! Compatible with all the other options. Example ------- To delete all the ``run_`` directories in your experiment include this into your runscript: .. code-block:: yaml general: clean_runs: True To keep the last 2 ``run_`` directories: .. code-block:: yaml general: clean_runs: 2 To keep the last 2 runs and every 5 runs: .. code-block:: yaml general: clean_old_rundirs_except: 2 clean_old_rundirs_keep_every: 5 """ _clean_run_determine_user_choice(config) _clean_this_rundir(config) _clean_old_rundirs_except(config) _clean_old_runs_filetypes(config) _clean_old_runs_size(config) return config
def _clean_run_determine_user_choice(config): """ Determine user choice from a simple switch. The user sets:: general: clean_runs: <x> where ``x`` can be one of: * ``True`` Removes the current run dir * ``False`` Keeps run dir * ``int`` (must be >= 0) keep last ``x`` run dirs """ user_clean = config["general"].get("clean_runs") # TODO(PG): It might be nice if these sorts of checks happened earlier # in the job, before it even gets to this function if user_clean is None: return # Skip the rest of the function if isinstance(user_clean, bool): if "clean_this_rundir" not in config["general"]: config["general"]["clean_this_rundir"] = user_clean else: print("------------------------------------------") print("You have set both in your config:") print() print("general:") print(" clean_this_rundir: ", config["general"]["clean_this_rundir"]) print(" clean_runs: ", user_clean) print() print("Please only use one of these!") print("------------------------------------------") sys.exit(1) elif isinstance(user_clean, int): if "clean_old_rundirs_except" not in config["general"]: config["general"]["clean_old_rundirs_except"] = user_clean else: print("------------------------------------------") print("You have set both in your config:") print() print("general:") print( " clean_old_rundirs_except: ", config["general"]["clean_old_rundirs_except"], ) print(" clean_runs: ", user_clean) print() print("Please only use one of these!") print("------------------------------------------") sys.exit(1) else: print("------------------------------------------") print("Type Error!") print("You have set this in your config:") print("general:") print(" clean_runs: ", user_clean) print() print("This is of type: ", type(user_clean)) print("However, only the following types are valid:") print(" * boolean") print(" * integer (greater or equal to 0!)") print("Please correct that") print("------------------------------------------") sys.exit(1) def _clean_this_rundir(config): if config["general"].get("clean_this_rundir", False): rm_r(config["general"]["thisrun_dir"]) def _clean_old_rundirs_except(config): all_run_folders_in_experiment = RunFolders(config) number_rundirs_keep_every = config["general"].get("clean_old_rundirs_keep_every") runs_to_keep_via_keepevery = [] if number_rundirs_keep_every: try: assert isinstance(number_rundirs_keep_every, int) assert number_rundirs_keep_every >= 1 except AssertionError: print("Please ensure that you use an integer in your configuration:") print("-------------------------------------------------------------") print() print("general:") print(" clean_old_rundirs_keep_every: <x>") print() print("-------------------------------------------------------------") print("<x> **MUST** be an integer greater or equal than 1!") sys.exit(1) runs_to_keep_via_keepevery = all_run_folders_in_experiment[ ::number_rundirs_keep_every ] number_rundirs_to_keep = config["general"].get("clean_old_rundirs_except") runs_to_keep_via_end_select = [] if number_rundirs_to_keep: try: assert isinstance(number_rundirs_to_keep, int) assert number_rundirs_to_keep > 1 except AssertionError: print("Please ensure that you use an integer in your configuration:") print("-------------------------------------------------------------") print() print("general:") print(" clean_old_rundirs_except: <x>") print() print("-------------------------------------------------------------") print("<x> **MUST** be an integer greater than 1!") sys.exit(1) runs_to_keep_via_end_select = all_run_folders_in_experiment[ -number_rundirs_to_keep: ] if number_rundirs_keep_every or number_rundirs_to_keep: runs_to_keep = set(runs_to_keep_via_keepevery + runs_to_keep_via_end_select) else: runs_to_keep = set(all_run_folders_in_experiment) runs_to_clean = set(all_run_folders_in_experiment) - runs_to_keep for run in list(runs_to_clean): rm_r(run) def _clean_old_runs_filetypes(config): all_filetypes = config["general"]["all_filetypes"] for filetype in all_filetypes: if config["general"].get("clean_" + filetype + "_dir", False): rm_r(config["general"]["thisrun_" + filetype + "_dir"]) def _clean_old_runs_size(config): rmsize = config["general"].get("clean_size", False) if rmsize: flist = [] for root, _, files in os.walk(config["general"]["thisrun_dir"]): for file_ in files: size = os.path.getsize(root + "/" + file_) if size >= rmsize: flist.append(root + "/" + file_) for file_ in flist: os.remove(file_)
[docs]def start_various_jobtypes_after_compute(config): monitor_file = config["general"]["monitor_file"] # Jobs that should be started directly from the compute job: next_jobs = ["post"] # Later also: "viz", "couple", ("analysis"...?) for jobtype in next_jobs: do_jobtype = False for model in config: # Allows for both "do_post: True" or "post: True" in config: if ( config[model].get(f"do_{jobtype}", False) or config[model].get(jobtype, False) ): do_jobtype = True if do_jobtype: monitor_file.write(f"{jobtype} for this run:\n") command_line_config = config["general"]["command_line_config"] command_line_config["jobtype"] = jobtype command_line_config["original_command"] = command_line_config[ "original_command" ].replace("compute", jobtype) monitor_file.write(f"Initializing {jobtype} object with:\n") monitor_file.write(str(command_line_config)) # NOTE(PG) Non top level import to avoid circular dependency: from .sim_objects import SimulationSetup jobtype_obj = SimulationSetup(command_line_config) monitor_file.write("f{jobtype} object built....\n") if f"{jobtype}_update_compute_config_before_resubmit" in jobtype_obj.config: monitor_file.write(f"{jobtype} object needs to update the calling job config:\n") # FIXME(PG): This might need to be a deep update...? config.update(jobtype.config[f"{jobtype}_update_compute_config_before_resubmit"]) monitor_file.write(f"Calling {jobtype} job:\n") jobtype_obj() return config
[docs]def start_post_job(config): monitor_file = config["general"]["monitor_file"] do_post = False for model in config: if "post_processing" in config[model]: if config[model]["post_processing"]: do_post = True if do_post: monitor_file.write("Post processing for this run:\n") command_line_config = config["general"]["command_line_config"] command_line_config["jobtype"] = "post" command_line_config["original_command"] = command_line_config[ "original_command" ].replace("compute", "post") monitor_file.write("Initializing post object with:\n") monitor_file.write(str(command_line_config)) # NOTE(PG) Non top level import to avoid circular dependency: from .sim_objects import SimulationSetup this_post = SimulationSetup(command_line_config) monitor_file.write("Post object built; calling post job:\n") this_post() return config
[docs]def all_done(config): helpers.write_to_log( config, [ str(config["general"]["jobtype"]), str(config["general"]["run_number"]), str(config["general"]["current_date"]), str(config["general"]["jobid"]), "- done", ], ) database_actions.database_entry_success(config) return config
[docs]def signal_tidy_completion(config): helpers.write_to_log( config, [ str(config["general"]["jobtype"]), str(config["general"]["run_number"]), str(config["general"]["current_date"]), str(config["general"]["jobid"]), "- done", ], ) return config
[docs]def maybe_resubmit(config): monitor_file = config["general"]["monitor_file"] monitor_file.write("resubmitting \n") command_line_config = config["general"]["command_line_config"] command_line_config["jobtype"] = "compute" command_line_config["original_command"] = command_line_config[ "original_command" ].replace("tidy_and_resubmit", "compute") # seb-wahl: end_date is by definition (search for 'end_date') smaller than final_date # hence we have to use next_date = current_date + increment if config["general"]["next_date"] >= config["general"]["final_date"]: monitor_file.write("Reached the end of the simulation, quitting...\n") helpers.write_to_log(config, ["# Experiment over"], message_sep="") else: monitor_file.write("Init for next run:\n") # NOTE(PG) Non top level import to avoid circular dependency: from .sim_objects import SimulationSetup next_compute = SimulationSetup(command_line_config) next_compute(kill_after_submit=False) return config
# DONT LIKE THE FOLLOWING PART... # I wish it was closer to the copy_files routine in filelists, # but as it is really a different thing - moving everything # found compared to copying everything in filelists - a second # implementation might be OK... (DB)
[docs]def throw_away_some_infiles(config): if config["general"]["run_number"] == 1: return config monitor_file = config["general"]["monitor_file"] monitor_file.write("throwing away restart_in files \n") for model in config["general"]["valid_model_names"]: print(f"{model}") if "thisrun_restart_in_dir" in config[model]: if os.path.isdir(config[model]["thisrun_restart_in_dir"]): for root, dirs, files in os.walk(config[model]["thisrun_restart_in_dir"]): for name in files: source = os.path.join(root, name) os.remove(source) print(f"Removing {source}") return config
[docs]def copy_all_results_to_exp(config): monitor_file = config["general"]["monitor_file"] monitor_file.write("Copying stuff to main experiment folder \n") for root, dirs, files in os.walk(config["general"]["thisrun_dir"], topdown=False): if config["general"]["verbose"]: print("Working on folder: " + root) if root.startswith(config["general"]["thisrun_work_dir"]) or root.endswith( "/work" ): if config["general"]["verbose"]: print("Skipping files in work.") continue for name in files: source = os.path.join(root, name) if not os.stat(source).st_size > 0: # skip empty files continue if config["general"]["verbose"]: print("File: " + source) destination = source.replace( config["general"]["thisrun_dir"], config["general"]["experiment_dir"] ) destination_path = destination.rsplit("/", 1)[0] if not os.path.exists(destination_path): os.makedirs(destination_path) if not os.path.islink(source): if os.path.isfile(destination): if filecmp.cmp(source, destination): if config["general"]["verbose"]: print("File " + source + " has not changed, skipping.") continue else: if os.path.isfile( destination + "_" + config["general"]["run_datestamp"] ): print( "Don't know where to move " + destination + ", file exists" ) continue else: if os.path.islink(destination): os.remove(destination) else: os.rename( destination, destination + "_" + config["general"]["last_run_datestamp"], ) newdestination = ( destination + "_" + config["general"]["run_datestamp"] ) if config["general"]["verbose"]: print("Moving file " + source + " to " + newdestination) os.rename(source, newdestination) os.symlink(newdestination, destination) continue try: if config["general"]["verbose"]: print("Moving file " + source + " to " + destination) try: os.rename(source, destination) except: # Fill is still open... create a hard (!) link instead os.link(source, destination) except: print( ">>>>>>>>> Something went wrong moving " + source + " to " + destination ) else: linkdest = resolve_symlinks(source) #newlinkdest = ( # destination.rsplit("/", 1)[0] + "/" + linkdest.rsplit("/", 1)[-1] #) if os.path.islink(destination): destdest = resolve_symlinks(source) if linkdest == destdest: # both links are identical, skip continue #os.remove(destination) if os.path.isfile(destination): os.rename( destination, destination + "_" + config["general"]["last_run_datestamp"], ) os.symlink(linkdest, destination) return config
# Utility functions:
[docs]def rm_r(path): """ Python equivalent of rm -r Parameters ---------- path : str Path or directory to remove """ if not os.path.exists(path): return if os.path.isfile(path) or os.path.islink(path): os.unlink(path) else: shutil.rmtree(path)
[docs]def size_bytes_to_human(num, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, "Yi", suffix)
# PG: BROKEN!!!
[docs]def size_human_to_bytes(s, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: num = float(s.replace(unit, "")) if abs(num) < 1024.0: return num num *= 1024.0 return num
[docs]class RunFolders(list): """ Logs the ``run_`` directories in ``<experiment_id>/log/run_folders.log``, updating it with new folders. The resulting object is a list of ``run_`` paths that exist or existed during the run time (even if they got deleted). This is useful for indexing operations such as ``<object_name>[::<interval>]`` used when removing ``run_`` folders. Notes ----- It keeps the folder names sorted so there is no need of sorting out of the object, and it also prevents the existence of duplicates. """ def __init__(self, config): """ The initialization of the object: * Loads the existing paths of the ``run_`` folders * Loads previous ``run_`` folder names from the logging file * Adds the current folder names to the logging file * Returns a list of ``pathlib.Path`` folder paths """ # Load paths from ``config`` self.exp_dir = config["general"]["experiment_dir"] self.log_path = self.exp_dir + "/log/run_folders.log" # Load existing folders self.current_folders = [ folder for folder in os.listdir(self.exp_dir) if folder.startswith("run_") ] self.current_folders = [ self.exp_dir + "/" + folder for folder in self.current_folders ] # Check if the ``run_folders.log`` file exists, and if not, create it if not os.path.exists(self.log_path): with open(self.log_path, "w") as log_file: pass # Load previous run names from ``run_folders.log`` self.folders = [] self.load() # Add current folders self.update() # Add folders to the list for folder in self.folders: super().append(pathlib.Path(folder))
[docs] def load(self): """ Loads the existing paths of the ``run_`` folders. """ with open(self.log_path, "r") as log_file: for folder in log_file.readlines(): self.folders.append(folder.strip())
[docs] def save(self): """ Saves all folder names. """ with open(self.log_path, "w") as log_file: log_file.writelines([folder + "\n" for folder in self.folders])
[docs] def update(self): """ Updates the folders read from the log file with the currently existing folders, removes duplicates, sorts them and save them into the log file. """ # Update with ``self.curren_folders`` for folder in self.current_folders: self.folders.append(folder) # Remove duplicates self.folders = list(dict.fromkeys(self.folders)) # Sort folders self.folders.sort() # Save to the log file self.save()