Source code for esm_runscripts.esm_sim_objects

"""
Documentation goes here
"""
from datetime import datetime
from io import StringIO
import collections
import logging
import os
import pdb
import shutil
import sys

import f90nml
import six
import tqdm
import yaml
import time


from esm_calendar import Date, Calendar
import esm_parser
from . import esm_coupler
from . import esm_methods
#import .esm_coupler
from esm_profile import *

import pprint

pp = pprint.PrettyPrinter(indent=4)


[docs]def date_representer(dumper, date): return dumper.represent_str("%s" % date.output())
yaml.add_representer(Date, date_representer)
[docs]class SimulationSetup(object): def __init__(self, command_line_config = None, user_config = None): if not command_line_config and not user_config: raise ValueError("SimulationSetup needs to be initialized with either command_line_config or user_config.") if command_line_config: self.command_line_config = command_line_config if not user_config: user_config = self.get_user_config_from_command_line(command_line_config) if user_config["general"].get("debug_obj_init", False): import pdb; pdb.set_trace() self.get_total_config_from_user_config(user_config) def __call__(self, *args, **kwargs): if self.config["general"]["jobtype"] == "compute": self.compute(*args, **kwargs) elif self.config["general"]["jobtype"] == "tidy_and_resubmit": self.tidy(*args, **kwargs) elif self.config["general"]["jobtype"] == "post": self.postprocess(*args, **kwargs) else: print("Unknown jobtype specified! Goodbye...") self.end_it_all() ################################### COMPUTE #############################################################
[docs] def compute(self, kill_after_submit=True): # supposed to be reduced to a stump """ All steps needed for a model computation. Parameters ---------- kill_after_submit : bool Default ``True``. If set, the entire Python instance is killed with a ``sys.exit()`` as the very last after job submission. """ from . import compute Compute = compute(self.config) self.config = Compute.evaluate(self.config) if kill_after_submit: self.end_it_all()
# NOTE(PG): No longer needed...? Defined also in jobclass...?
[docs] def end_it_all(self): import sys if self.config["general"]["profile"]: for line in timing_info: print(line) print("Exiting entire Python process!") sys.exit()
############################################### POSTPROCESS ######################################
[docs] def postprocess(self): from . import esm_batch_system """ Calls post processing routines for this run. """ with open( self.config["general"]["thisrun_scripts_dir"] + "/" + self.config["general"]["expid"] + "_post_" + self.run_datestamp + "_" + str(self.config['general']['jobid']) + ".log", "w", buffering=1, ) as post_file: post_task_list = self._assemble_postprocess_tasks(post_file) self.config["general"]["post_task_list"] = post_task_list esm_batch_system.write_simple_runscript(self.config) self.config = esm_batch_system.submit(self.config)
def _assemble_postprocess_tasks(self, post_file): """ Generates all tasks for post processing which will be written to the sad file. Parameters ---------- post_file File handle to which information should be written. Returns ------- post_task_list : list The list of post commands which will be executed. These are written to the sad file. """ post_task_list = [] for component in self.config["general"]["valid_model_names"]: post_file.write(40*"+ "+"\n") post_file.write("Generating post-processing tasks for: %s \n" % component) post_task_list.append("\n#Postprocessing %s\n" % component) post_task_list.append("cd "+ self.config[component]["experiment_outdata_dir"]+"\n") pconfig_tasks = self.config[component].get('postprocess_tasks', {}) post_file.write("Configuration for post processing: %s \n" % pconfig_tasks) for outfile in pconfig_tasks: post_file.write("Generating task to create: %s \n" % outfile) ofile_config = pconfig_tasks[outfile] # TODO(PG): This can be cleaned up. I probably actually want a # ChainMap here for more than just the bottom... # # Run CDO tasks (default) task_definition = self.config[component].get("postprocess_task_definitions", {}).get(ofile_config['post_process']) method_definition = self.config[component].get("postprocess_method_definitions", {}).get(task_definition['method']) program = method_definition.get("program", task_definition["method"]) possible_args = method_definition.get("possible_args", []) required_args = method_definition.get("required_args", []) possible_flags = method_definition.get("possible_flags", []) required_flags = method_definition.get("required_flags", []) outfile_flags = ofile_config.get("flags") outfile_args = ofile_config.get("args") task_def_flags = task_definition.get("flags") task_def_args = task_definition.get("args") args = collections.ChainMap(outfile_args, task_def_args) flags = outfile_flags + task_def_flags flags = ["-"+flag for flag in flags] # See here: https://stackoverflow.com/questions/21773866/how-to-sort-a-dictionary-based-on-a-list-in-python all_call_things = {"program": program, "outfile": outfile, **args, "flags": flags} print(all_call_things) index_map = {v: i for i, v in enumerate(method_definition["call_order"])} call_list = sorted(all_call_things.items(), key=lambda pair: index_map[pair[0]]) call = [] for call_id, call_part in call_list: if isinstance(call_part, str): call.append(call_part) elif isinstance(call_part, list): call.append(" ".join(call_part)) else: raise TypeError("Something straaaange happened. Consider starting the debugger.") post_file.write(" ".join(call)+"\n") post_task_list.append(" ".join(call)) post_task_list.append("cd -\n") return post_task_list ########################## ASSEMBLE ALL THE INFORMATION ##############################
[docs] def get_user_config_from_command_line(self, command_line_config): try: user_config = esm_parser.initialize_from_yaml(command_line_config["scriptname"]) if not "additional_files" in user_config["general"]: user_config["general"]["additional_files"] = [] except esm_parser.EsmConfigFileError as error: raise error except: user_config = esm_parser.initialize_from_shell_script(command_line_config["scriptname"]) user_config["general"].update(command_line_config) return user_config
[docs] def get_total_config_from_user_config(self, user_config): if "version" in user_config["general"]: version = str(user_config["general"]["version"]) else: setup_name = user_config["general"]["setup_name"] if "version" in user_config[setup_name.replace("_standalone","")]: version = str(user_config[setup_name.replace("_standalone","")]["version"]) else: version = "DEFAULT" self.config = esm_parser.ConfigSetup(user_config["general"]["setup_name"].replace("_standalone",""), version, user_config) self.config["computer"]["jobtype"] = self.config["general"]["jobtype"] self.config["general"]["experiment_dir"] = self.config["general"]["base_dir"] + "/" + self.config["general"]["expid"] self._read_date_file(self.config) esm_parser.choose_blocks(self.config, blackdict=self.config._blackdict) self._initialize_calendar(self.config) esm_parser.choose_blocks(self.config, blackdict=self.config._blackdict) self._add_all_folders() self.set_prev_date() self.config.finalize() self.add_submission_info() self.initialize_batch_system() #esm_parser.pprint_config(self.config) #sys.exit(0) if self.config["general"]["standalone"] == False: self.init_coupler() # Write where the experiment log file should be in the config self.config["general"]["experiment_log_file"] = self.config["general"].get("experiment_log_file", self.config["general"]["experiment_log_dir"] + "/" + self.config["general"]["expid"] + "_" + self.config["general"]["setup_name"] + ".log" )
def _add_all_folders(self): self.all_filetypes = ["analysis", "config", "log", "mon", "scripts", "ignore", "unknown"] self.config["general"]["out_filetypes"] = ["analysis", "log", "mon", "scripts", "ignore", "unknown", "outdata", "restart_out"] self.config["general"]["in_filetypes"] = ["scripts", "input", "forcing", "bin", "config", "restart_in"] self.all_filetypes.append("work") self.config["general"]["thisrun_dir"] = self.config["general"]["experiment_dir"] + "/run_" + self.run_datestamp for filetype in self.all_filetypes: self.config["general"][ "experiment_" + filetype + "_dir" ] = self.config["general"]["experiment_dir"] + "/" + filetype + "/" for filetype in self.all_filetypes: self.config["general"][ "thisrun_" + filetype + "_dir" ] = self.config["general"]["thisrun_dir"] + "/" + filetype + "/" self.config["general"]["work_dir"] = self.config["general"]["thisrun_work_dir"] self.all_model_filetypes = [ "analysis", "bin", "config", "couple", "forcing", "input", "log", "mon", "outdata", "restart_in", "restart_out", "viz", "ignore" ] self.config["general"]["all_model_filetypes"] = self.all_model_filetypes self.config["general"]["all_filetypes"] = self.all_filetypes for model in self.config["general"]["valid_model_names"]: for filetype in self.all_model_filetypes: if "restart" in filetype: filedir = "restart" else: filedir = filetype self.config[model][ "experiment_" + filetype + "_dir" ] = self.config["general"]["experiment_dir"] + "/" + filedir + "/" + model + "/" self.config[model][ "thisrun_" + filetype + "_dir" ] = self.config["general"]["thisrun_dir"] + "/" + filedir + "/" + model + "/" self.config[model]["all_filetypes"] = self.all_model_filetypes @timing def _read_date_file(self, config, date_file=None): if not date_file: date_file = ( config["general"]["experiment_dir"] + "/scripts/" + config["general"]["expid"] + "_" + config["general"]["setup_name"] + ".date" ) if os.path.isfile(date_file): logging.info("Date file read from %s", date_file) with open(date_file) as date_file: date, self.run_number = date_file.readline().strip().split() self.run_number = int(self.run_number) write_file = False else: logging.info("No date file found %s", date_file) logging.info("Initializing run_number=1 and date=18500101") date = config["general"].get("initial_date", "18500101") self.run_number = 1 write_file = True config["general"]["run_number"] = self.run_number self.current_date = date if config["general"]["run_number"] != 1: for model in config["general"]["valid_model_names"]: config[model]["lresume"] = True else: # Did the user give a value? If yes, keep it, if not, first run: for model in config["general"]["valid_model_names"]: if "lresume" in config[model]: user_lresume = config[model]["lresume"] else: user_lresume = False if isinstance(user_lresume, str) and "${" in user_lresume: user_lresume = esm_parser.find_variable(model, user_lresume, self.config, [], []) if type(user_lresume) == str: if user_lresume == "0" or user_lresume.upper() == "FALSE": user_lresume = False elif user_lresume == "1" or user_lresume.upper() == "TRUE": user_lresume = True elif isinstance(user_lresume, int): if user_lresume == 0: user_lresume = False elif user_lresume == 1: user_lresume = True config[model]["lresume"] = user_lresume # needs to happen AFTER a run! # if write_file: # self._write_date_file() logging.info("current_date = %s", self.current_date) logging.info("run_number = %s", self.run_number) ######################### PREPARE EXPERIMENT / WORK ############################# def _create_toplevel_marker_file(self): if not os.path.isfile(self.config['thisrun_']): with open(".top_of_exp_tree") as f: f.write("Top of experiment: "+self.config['general']['expid']) def _dump_final_yaml(self): with open( self.experiment_config_dir + "/" + self.config["general"]["expid"] + "_preconfig.yaml", "w", ) as config_file: yaml.dump(self.config, config_file) def _initialize_calendar(self, config): nyear, nmonth, nday, nhour, nminute, nsecond = 0, 0, 0, 0, 0, 0 nyear = int(config["general"].get("nyear", nyear)) if not nyear: nmonth = int(config["general"].get("nmonth", nmonth)) if not nyear and not nmonth: nday = int(config["general"].get("nday", nday)) if not nyear and not nmonth and not nday: nhour = int(config["general"].get("nhour", nhour)) if not nyear and not nmonth and not nday and not nhour: nminute = int(config["general"].get("nminute", nminute)) if not nyear and not nmonth and not nday and not nhour and not nminute: nsecond = int(config["general"].get("nsecond", nsecond)) if ( not nyear and not nmonth and not nday and not nhour and not nminute and not nsecond ): nyear = 1 # make sure all models agree on leapyear if "leapyear" in self.config["general"]: for model in self.config["general"]["valid_model_names"]: self.config[model]["leapyear"] = self.config["general"]["leapyear"] else: for model in self.config["general"]["valid_model_names"]: if "leapyear" in self.config[model]: for other_model in self.config["general"]["valid_model_names"]: if "leapyear" in self.config[other_model]: if not self.config[other_model]["leapyear"] == self.config[model]["leapyear"]: print ("Models " + model + " and " + other_model + " do not agree on leapyear. Stopping.") sys.exit(43) else: self.config[other_model]["leapyear"] = self.config[model]["leapyear"] self.config["general"]["leapyear"] = self.config[model]["leapyear"] break if not "leapyear" in self.config["general"]: for model in self.config["general"]["valid_model_names"]: self.config[model]["leapyear"] = True self.config["general"]["leapyear"] = True # set the overall calendar if self.config["general"]["leapyear"]: self.calendar = Calendar(1) self.config["general"]["calendar"] = Calendar(1) else: self.calendar = Calendar(0) self.config["general"]["calendar"] = Calendar(0) self.current_date = Date(self.current_date, self.calendar) self.delta_date = (nyear, nmonth, nday, nhour, nminute, nsecond) config["general"]["current_date"] = self.current_date config["general"]["start_date"] = self.current_date config["general"]["initial_date"] = Date(config["general"]["initial_date"], self.calendar) config["general"]["final_date"] = Date(config["general"]["final_date"], self.calendar) #config["general"]["prev_date"] = self.current_date.sub((0, 0, 1, 0, 0, 0)) config["general"]["prev_date"] = self.current_date - (0, 0, 1, 0, 0, 0) config["general"]["next_date"] = self.current_date.add(self.delta_date) config["general"]["last_start_date"] = self.current_date - self.delta_date #config["general"]["end_date"] = config["general"]["next_date"].sub( config["general"]["end_date"] = config["general"]["next_date"] - (0, 0, 1, 0, 0, 0) config["general"]["runtime"] = ( config["general"]["next_date"] - config["general"]["current_date"] ) config["general"]["total_runtime"] = ( config["general"]["next_date"] - config["general"]["initial_date"] ) self.run_datestamp = ( config["general"]["current_date"].format( form=9, givenph=False, givenpm=False, givenps=False ) + "-" + config["general"]["end_date"].format( form=9, givenph=False, givenpm=False, givenps=False ) ) config["general"]["run_datestamp"] = self.run_datestamp self.last_run_datestamp = ( config["general"]["last_start_date"].format( form=9, givenph=False, givenpm=False, givenps=False ) + "-" + config["general"]["prev_date"].format( form=9, givenph=False, givenpm=False, givenps=False ) ) config["general"]["last_run_datestamp"] = self.last_run_datestamp
[docs] def set_prev_date(self): """Sets several variables relevant for the previous date. Loops over all models in ``valid_model_names``, and sets model variables for: * ``prev_date`` * ``parent_expid`` * ``parent_date`` * ``parent_restart_dir`` """ for model in self.config["general"]["valid_model_names"]: if "time_step" in self.config[model] and not (isinstance(self.config[model]["time_step"], str) and "${" in self.config[model]["time_step"]): self.config[model]["prev_date"] = self.current_date - (0, 0, 0, 0, 0, int(self.config[model]["time_step"])) # NOTE(PG, MAM): Here we check if the time step still has a variable which might be set in a different model, and resolve this case elif "time_step" in self.config[model] and (isinstance(self.config[model]["time_step"], str) and "${" in self.config[model]["time_step"]): dt = esm_parser.find_variable(model, self.config[model]["time_step"], self.config, [], []) self.config[model]["prev_date"] = self.current_date - (0, 0, 0, 0, 0, int(dt)) else: self.config[model]["prev_date"] = self.current_date # Check if lresume contains a variable which might be set in a different model, and resolve this case if "lresume" in self.config[model] and isinstance(self.config[model]["lresume"], str) and "${" in self.config[model]["lresume"]: lr = esm_parser.find_variable(model, self.config[model]["lresume"], self.config, [], []) self.config[model]["lresume"] = eval(lr) if self.config[model]["lresume"] == True and self.config["general"]["run_number"] == 1: self.config[model]["parent_expid"] = self.config[model][ "ini_parent_exp_id" ] if "parent_date" not in self.config[model]: self.config[model]["parent_date"] = self.config[model][ "ini_parent_date" ] self.config[model]["parent_restart_dir"] = self.config[model][ "ini_restart_dir" ] else: self.config[model]["parent_expid"] = self.config["general"][ "expid" ] if "parent_date" not in self.config[model]: self.config[model]["parent_date"] = self.config[model][ "prev_date" ] self.config[model]["parent_restart_dir"] = self.config[model][ "experiment_restart_in_dir" ]
#print (model + " " + str(self.config[model]["parent_date"]))
[docs] def init_coupler(self): for model in list(self.config): if model in esm_coupler.known_couplers: self.coupler_config_dir = ( self.config["general"]["base_dir"] + "/" + self.config["general"]["expid"] + "/run_" + self.run_datestamp + "/config/" + model + "/" ) self.config["general"]["coupler_config_dir"] = self.coupler_config_dir self.coupler = esm_coupler.esm_coupler(self.config, model) self.config["general"]["coupler"] = self.coupler break self.coupler.add_files(self.config)
[docs] def initialize_batch_system(self): from . import esm_batch_system self.batch = esm_batch_system(self.config, self.config["computer"]["batch_system"]) self.config["general"]["batch"] = self.batch
################################# TIDY STUFF ###########################################
[docs] def tidy(self): from . import jobclass """ Performs steps for tidying up a simulation after a job has finished and submission of following jobs. This method uses two lists, ``all_files_to_copy`` and ``all_listed_filetypes`` to sort finished data from the **current run folder** back to the **main experiment folder** and submit new **compute** and **post-process** jobs. Files for ``log``, ``mon``, ``outdata``, and ``restart_out`` are gathered. The program waits until the job completes or an error is found (See ~self.wait_and_observe). Then, if necessary, the coupler cleans up it's files (unless it's a standalone run), and the files in the lists are copied from the **work folder** to the **current run folder**. A check for unknown files is performed (see ~self.check_for_unknown_files), files are moved from the the **current run folder** to the **main experiment folder**, and new compute and post process jobs are started. Warning ------- The date is changed during this routine! Be careful where you put any calls that may depend on date information! Note ---- This method is also responsible for calling the next compute job as well as the post processing job! """ called_from = self.config["general"]["last_jobtype"] with open( self.config["general"]["thisrun_scripts_dir"] + "/monitoring_file.out", "w", buffering=1, ) as monitor_file: monitor_file.write("tidy job initialized \n") monitor_file.write("attaching to process " + str(self.config["general"]["launcher_pid"]) + " \n") monitor_file.write("Called from a " + called_from + "job \n") last_jobid = "UNKNOWN" if called_from == "compute": with open(self.config["general"]["experiment_log_file"], "r") as logfile: lastline = [l for l in logfile.readlines() if "compute" in l and "start" in l][-1] last_jobid = lastline.split(" - ")[0].split()[-1] #monitoring_events=self.assemble_monitoring_events() if self.config["general"]["submitted"]: self.wait_and_observe(monitor_file) if self.config["general"]["standalone"] == False: self.coupler.tidy(self.config) monitor_file.write("job ended, starting to tidy up now \n") # Log job completion if called_from != "command_line": jobclass.write_to_log(self.config, [ called_from, str(self.config["general"]["run_number"]), str(self.config["general"]["current_date"]), last_jobid, "- done"]) # Tell the world you're cleaning up: jobclass.write_to_log(self.config, [ str(self.config["general"]["jobtype"]), str(self.config["general"]["run_number"]), str(self.config["general"]["current_date"]), str(self.config["general"]["jobid"]), "- start"]) all_listed_filetypes=["log", "mon", "outdata", "restart_out","bin", "config", "forcing", "input", "restart_in", "ignore"] self.assemble_file_lists() self.finalize_file_lists(all_listed_filetypes) self.config = jobclass.copy_files_from_work_to_thisrun(self.config) import esm_parser import sys esm_parser.pprint_config(self.config) monitor_file.write("Copying stuff to main experiment folder \n") self.copy_all_results_to_exp() do_post = False for model in self.config: if "post_processing" in self.config[model]: if self.config[model]["post_processing"]: do_post = True if do_post: monitor_file.write("Post processing for this run:\n") self.command_line_config["jobtype"] = "post" self.command_line_config["original_command"] = self.command_line_config[ "original_command" ].replace("compute", "post") monitor_file.write("Initializing post object with:\n") monitor_file.write(str(self.command_line_config)) this_post = SimulationSetup(self.command_line_config) monitor_file.write("Post object built; calling post job:\n") this_post() monitor_file.write("writing date file \n") self._increment_date_and_run_number() self._write_date_file() # monitor_file.write("resubmitting \n") self.command_line_config["jobtype"] = "compute" self.command_line_config["original_command"] = self.command_line_config["original_command"].replace("tidy_and_resubmit", "compute") jobclass.write_to_log(self.config, [ str(self.config["general"]["jobtype"]), str(self.config["general"]["run_number"]), str(self.config["general"]["current_date"]), str(self.config["general"]["jobid"]), "- done"]) from . import database_actions database_actions.database_entry_success(self.config) # seb-wahl: end_date is by definition (search for 'end_date') smaller than final_date # hence we have to use next_date = current_date + increment if self.config["general"]["next_date"] >= self.config["general"]["final_date"]: monitor_file.write("Reached the end of the simulation, quitting...\n") jobclass.write_to_log(self.config, ["# Experiment over"], message_sep="") else: monitor_file.write("Init for next run:\n") next_compute = SimulationSetup(self.command_line_config) next_compute(kill_after_submit=False) self.end_it_all()
[docs] def wait_and_observe(self, monitor_file): import time thistime = 0 error_check_list = self.assemble_error_list() while self.job_is_still_running(): monitor_file.write("still running \n") error_check_list = self.check_for_errors(error_check_list, thistime, monitor_file) thistime = thistime + 10 time.sleep(10) thistime = thistime + 100000000 error_check_list = self.check_for_errors(error_check_list, thistime, monitor_file)
[docs] def assemble_error_list(self): gconfig = self.config["general"] known_methods = ["warn", "kill"] stdout = gconfig["thisrun_scripts_dir"] + "/" + gconfig["expid"] + "_compute_" + gconfig["run_datestamp"] + "_" + gconfig["jobid"] + ".log" error_list = [("error", stdout, "warn", 60, 60, "keyword error detected, watch out")] for model in self.config: if "check_error" in self.config[model]: for trigger in self.config[model]["check_error"]: search_file = stdout method = "warn" frequency = 60 message = "keyword " + trigger + " detected, watch out" if isinstance(self.config[model]["check_error"][trigger], dict): if "file" in self.config[model]["check_error"][trigger]: search_file = self.config[model]["check_error"][trigger]["file"] if search_file == "stdout" or search_file == "stderr": search_file = stdout if "method" in self.config[model]["check_error"][trigger]: method = self.config[model]["check_error"][trigger]["method"] if method not in known_methods: method = "warn" if "message" in self.config[model]["check_error"][trigger]: message = self.config[model]["check_error"][trigger]["message"] if "frequency" in self.config[model]["check_error"][trigger]: frequency = self.config[model]["check_error"][trigger]["frequency"] try: frequency = int(frequency) except: frequency = 60 elif isinstance(self.config[model]["check_error"][trigger], str) : pass else: continue error_list.append((trigger, search_file, method, frequency, frequency, message)) return error_list
[docs] def check_for_errors(self, error_check_list, time, monitor_file): import re new_list = [] for (trigger, search_file, method, next_check, frequency, message) in error_check_list: warned = 0 if next_check <= time: if os.path.isfile(search_file): with open(search_file) as origin_file: for line in origin_file: if trigger.upper() in line.upper(): if method == "warn": warned = 1 monitor_file.write("WARNING: " + message + "\n") break elif method == "kill": harakiri = "scancel " + self.config["general"]["jobid"] monitor_file.write("ERROR: " + message + "\n") monitor_file.write("Will kill the run now..." + "\n") monitor_file.flush() print("ERROR: " + message) print("Will kill the run now...", flush=True) from . import database_actions database_actions.database_entry_crashed(self.config) os.system(harakiri) sys.exit(42) next_check += frequency if warned == 0: new_list.append((trigger, search_file, method, next_check, frequency, message)) return new_list
[docs] def job_is_still_running(self): import psutil if psutil.pid_exists(self.config["general"]["launcher_pid"]): return True return False
[docs] def add_submission_info(self): from . import esm_batch_system bs = esm_batch_system(self.config, self.config["computer"]["batch_system"]) submitted = bs.check_if_submitted() if submitted: jobid = bs.get_jobid() else: jobid = os.getpid() self.config["general"]["submitted"] = submitted self.config["general"]["jobid"] = jobid
def _increment_date_and_run_number(self): self.run_number += 1 self.current_date += self.delta_date def _write_date_file(self, date_file=None): if not date_file: date_file = ( self.config["general"]["experiment_scripts_dir"] + "/" + self.config["general"]["expid"] + "_" + self.config["general"]["setup_name"] + ".date" ) with open(date_file, "w") as date_file: date_file.write(self.current_date.output() + " " + str(self.run_number))
[docs] def assemble_file_lists(self): from . import filelists self.config = filelists.rename_sources_to_targets(self.config) self.config = filelists.choose_needed_files(self.config) self.config = filelists.complete_targets(self.config) self.config = filelists.complete_sources(self.config) self.config = filelists.replace_year_placeholder(self.config)
[docs] def finalize_file_lists(self, filetypes): # needs to be called right before copying from . import filelists self.config = filelists.globbing(self.config) self.config = filelists.target_subfolders(self.config) self.config = filelists.assemble_intermediate_files_and_finalize_targets(self.config) self.config = filelists.complete_restart_in(self.config) self.config = filelists.check_for_unknown_files(self.config) self.config = filelists.log_used_files(self.config, filetypes)
[docs] @staticmethod def merge_thisrun_into_experiment(config): import os # to should be thisrun, work or experiment for filetype in config["general"]["all_model_filetypes"]: for model in config["general"]["valid_model_names"]: from_dir = config[model]["thisrun_" + filetype + "dir"] to_dir = config[model]["experiment_" + filetype + "dir"] + "/" + config["general"]["run_datestamp"] os.rename(from_dir, to_dir) for filetype in config["general"]["all_filetypes"]: from_dir = config["general"]["thisrun_" + filetype + "dir"] to_dir = config["general"]["experiment_" + filetype + "dir"] + "/" + config["general"]["run_datestamp"] os.rename(from_dir, to_dir) return config
[docs] def copy_all_results_to_exp(self): import filecmp for root, dirs, files in os.walk(self.config["general"]["thisrun_dir"], topdown=False): print ("Working on folder: " + root) if root.startswith(self.config["general"]["thisrun_work_dir"]) or root.endswith("/work"): print ("Skipping files in work.") continue for name in files: source = os.path.join(root, name) print ("File: " + source) destination = source.replace(self.config["general"]["thisrun_dir"], self.config["general"]["experiment_dir"]) destination_path = destination.rsplit("/", 1)[0] if not os.path.exists(destination_path): os.makedirs(destination_path) if not os.path.islink(source): if os.path.isfile(destination): if filecmp.cmp(source, destination): print ("File " + source + " has not changed, skipping.") continue else: if os.path.isfile(destination + "_" + self.run_datestamp): print ("Don't know where to move " + destination +", file exists") continue else: if os.path.islink(destination): os.remove(destination) else: os.rename(destination, destination + "_" + self.last_run_datestamp) newdestination = destination + "_" + self.run_datestamp print ("Moving file " + source + " to " + newdestination) os.rename(source, newdestination) os.symlink(newdestination, destination) continue try: print ("Moving file " + source + " to " + destination) os.rename(source, destination) except: print(">>>>>>>>> Something went wrong moving " + source + " to " + destination) else: linkdest = os.path.realpath(source) newlinkdest = destination.rsplit("/", 1)[0] + "/" + linkdest.rsplit("/", 1)[-1] if os.path.islink(destination): os.remove(destination) if os.path.isfile(destination): os.rename(destination, destination + "_" + self.last_run_datestamp) os.symlink(newlinkdest, destination)