Source code for esm_master.task

import os, sys, subprocess
import shlex  # contains shlex.split that respects quoted strings

from .software_package import software_package

from .cli import verbose

import esm_environment

# deniz: it is better to use more pathlib in the future so that dir/path
# operations will be more portable (supported since Python 3.4, 2014)
import pathlib

######################################################################################
################################# class "task" #######################################
######################################################################################


[docs]class Task: """What you can do with a software_package, e.g. comp-awicm-2.0""" def __init__(self, raw, setup_info, vcs, general, complete_config): if raw == "default": raw = "" if raw == "drytestall": # panic for package in setup_info.all_packages: for todo in package.targets: try: print(todo + "-" + package.raw_name) newtask = Task(todo + "-" + package.raw_name, setup_info, vcs) newtask.output_steps() except: print("Problem found with target " + newtask.raw_name) sys.exit(1) sys.exit(0) if type(raw) == str: ( self.todo, kind, model, version, self.only_subtask, self.raw_name, ) = setup_info.split_raw_target(raw, setup_info) self.package = software_package( (kind, model, version), setup_info, vcs, general ) else: # tupel: (self.todo, kind, model, version, self.only_subtask) = raw self.package = software_package( (kind, model, version), setup_info, vcs, general ) self.raw_name = setup_info.assemble_raw_name( self.todo, kind, model, version ) if kind == "components": self.env = esm_environment.esm_environment.EnvironmentInfos( "compiletime", complete_config, model ) else: self.env = None if not self.todo in setup_info.meta_todos: self.check_if_target(setup_info) self.subtasks = self.get_subtasks(setup_info, vcs, general, complete_config) self.only_subtask = self.validate_only_subtask() self.ordered_tasks = self.order_subtasks(setup_info, vcs, general) self.will_download = self.check_if_download_task(setup_info) self.folders_after_download = self.download_folders() self.binaries_after_compile = self.compile_binaries() self.dir_list = self.list_required_dirs() self.command_list, self.shown_command_list = self.assemble_command_list() if verbose > 1: self.output()
[docs] def get_subtasks(self, setup_info, vcs, general, complete_config): subtasks = [] if self.todo in setup_info.meta_todos: todos = setup_info.meta_command_order[self.todo] else: todos = [self.todo] for todo in todos: for subpackage in self.package.subpackages: if todo in subpackage.targets: subtasks.append( Task( ( todo, subpackage.kind, subpackage.model, subpackage.version, None, ), setup_info, vcs, general, complete_config, ) ) if subtasks == [] and self.todo in setup_info.meta_todos: # if self.todo in setup_info.meta_todos: for todo in todos: if todo in self.package.targets: subtasks.append( Task( ( todo, self.package.kind, self.package.model, self.package.version, None, ), setup_info, vcs, general, complete_config, ) ) return subtasks
[docs] def validate_only_subtask(self): only = None if self.only_subtask: only = [] for task in self.subtasks: if task.package.raw_name.startswith(self.only_subtask): only.append(task) if self.package.raw_name.startswith(self.only_subtask): self.subtasks = [] return None if only == []: print() print( "Given subtask " + self.only_subtask + " is not a valid subtask of package " + self.raw_name + "." ) print() sys.exit(0) return only
[docs] def order_subtasks(self, setup_info, vcs, general): subtasks = self.subtasks if self.only_subtask: if self.only_subtask == "NONE": return [] elif type(self.only_subtask) == str: return [self.only_subtask] else: subtasks = self.only_subtask if subtasks == []: return [self] if self.todo in setup_info.meta_todos: todos = setup_info.meta_command_order[self.todo] else: todos = [self.todo] ordered_tasks = [] for todo in todos: for task in subtasks: if task.todo == todo and task.package.bin_type == "lib": ordered_tasks.append(task) for task in subtasks: if task.todo == todo and not task.package.bin_type == "lib": ordered_tasks.append(task) # if self.package.kind == "components" and not self.only_subtask: ordered_tasks.append(self) return ordered_tasks
[docs] def check_if_download_task(self, setup_info): if self.todo == "get": return True if self.todo in setup_info.meta_todos: if "get" in setup_info.meta_command_order[self.todo]: return True return False
[docs] def download_folders(self): # if self.package.kind in ["setups", "couplings"]: if self.package.subpackages: dir_list = [self.package.raw_name] for task in self.ordered_tasks: if ( self.package.raw_name + "/" + task.package.destination not in dir_list ): dir_list.append( self.package.raw_name + "/" + task.package.destination ) else: dir_list = [] for task in self.ordered_tasks: if task.package.destination not in dir_list: dir_list.append(task.package.destination) return dir_list
[docs] def compile_binaries(self): file_list = [] for task in self.ordered_tasks: for binfile in task.package.bin_names: if ( self.package.raw_name + "/" + task.package.bin_type + "/" + binfile.split("/")[-1] not in file_list ): file_list.append( self.package.raw_name + "/" + task.package.bin_type + "/" + binfile.split("/")[-1] ) return file_list
[docs] def list_required_dirs(self): toplevel = self.package.raw_name if self.package.kind in ["setups", "couplings"] and self.will_download: dir_list = [self.package.raw_name] else: dir_list = [] for task in self.ordered_tasks: if task.todo == "comp": if task.package.bin_names: newdir = toplevel + "/" + task.package.bin_type if newdir not in dir_list: dir_list.append(newdir) return dir_list
[docs] def assemble_command_list(self): command_list = [] toplevel = self.package.destination # if self.package.kind in ["setups", "couplings"]: if self.package.subpackages: # ??? command_list.append("mkdir -p " + toplevel) command_list.append("cd " + toplevel) toplevel = "." real_command_list = command_list.copy() for task in self.ordered_tasks: if task.todo in ["get"]: if task.package.command_list[task.todo] is not None: for command in task.package.command_list[task.todo]: command_list.append(command) real_command_list.append(command) if self.package.coupling_changes: for change in self.package.coupling_changes: command_list.append(change) real_command_list.append(change) for task in self.ordered_tasks: if task.todo not in ["get"]: if task.todo in ["conf", "comp"]: # if self.package.kind in ["setups", "couplings"]: if task.package.kind not in ["setups", "couplings"]: if self.package.subpackages: real_command_list.append( "cp ../" + task.raw_name + "_script.sh ." ) real_command_list.append("./" + task.raw_name + "_script.sh") else: if task.package.command_list[task.todo] is not None: for command in task.package.command_list[task.todo]: real_command_list.append(command) if task.package.command_list[task.todo] is not None: for command in task.package.command_list[task.todo]: command_list.append(command) if task.todo == "comp": if task.package.bin_names: command_list.append( "mkdir -p " + toplevel + "/" + task.package.bin_type ) real_command_list.append( "mkdir -p " + toplevel + "/" + task.package.bin_type ) for binfile in task.package.bin_names: # PG: Only copy if source and dest aren't the same! # (Prevents cp: ‘/temp/test.txt’ and # ‘/temp/test/test.txt’ are the same file) toplevel_bin_path = ( toplevel + "/" + task.package.bin_type + "/" + binfile.split("/")[-1] ) # MA: If there are already commands that will clean the # bin folder, the following `if` is required to be true clean_command_list = ["rm -f " + toplevel_bin_path] clean_command = any( cc in command_list for cc in clean_command_list ) # deniz: bug fix for the fesom compilation issue # at some point these string concats need to be # replaced by pathlib bin_path_target = pathlib.Path( toplevel + "/" + task.package.bin_type ) binary_file_path = pathlib.Path( task.package.destination + "/" + binfile ) binary_file_parent = binary_file_path.parent # path2bin_origin = "/".join(binary_file_path.split('/')[:-1]) # deniz: don't copy the files if the paths are same. # I think pathlib.Path.resolve() is a better option # than simple string comparison should_copy_files = ( binary_file_parent.resolve() != bin_path_target.resolve() ) # add the remaining conditions should_copy_files = should_copy_files and ( not os.path.exists(toplevel_bin_path) or clean_command ) if should_copy_files: # deniz: I think " ".join() is a more Pythonic # way to construct the command strings cmd_str = " ".join( ["cp", str(binary_file_path), str(bin_path_target)] ) command_list.append(cmd_str) real_command_list.append(cmd_str) elif task.todo == "clean": if task.package.bin_names: for binfile in task.package.bin_names: command_list.append( "rm -f " + toplevel + "/" + task.package.bin_type + "/" + binfile.split("/", -1)[-1] ) real_command_list.append( "rm -f " + toplevel + "/" + task.package.bin_type + "/" + binfile.split("/", -1)[-1] ) if self.package.kind in ["setups", "couplings"]: command_list.append("cd ..") real_command_list.append("cd ..") return real_command_list, command_list
[docs] def cleanup_script(self): try: os.remove("./dummy_script.sh") except OSError: print("No dummy script to remove!") for task in self.ordered_tasks: if task.todo in ["conf", "comp"]: try: os.remove("./" + task.raw_name + "_script.sh") except OSError: print("No file to remove for ", task.raw_name)
[docs] def check_if_target(self, setup_info): if not setup_info.has_target2(self.package, self.todo): setup_info.output_available_targets(self.raw_name) sys.exit(0)
[docs] def check_requirements(self): if self.will_download: return True requirements = self.folders_after_download for folder in requirements: if not os.path.isdir(folder): print() print( "Missing folder " + folder + " detected. Please run 'make get-" + self.package.raw_name + "' first." ) print() sys.exit(0) return True
[docs] def validate(self): self.check_requirements()
[docs] def execute(self): for task in self.ordered_tasks: if task.todo in ["conf", "comp"]: if task.package.kind == "components": task.env.write_dummy_script() newfile = task.env.add_commands( task.package.command_list[task.todo], task.raw_name ) if os.path.isfile(newfile): os.chmod(newfile, 0o755) for command in self.command_list: if command.startswith("mkdir"): # os.system(command) subprocess.run(command.split(), check=True) elif command.startswith("cp "): subprocess.run(command.split(), check=True) elif command.startswith("cd ") and ";" not in command: os.chdir(command.replace("cd ", "")) else: # os.system(command) for command in command.split(";"): # seb-wahl: use shlex split as sed commands that use spaces # need to be quoted, shlex split doesn't split quoted # strings on spaces # example: sed -i '/COUPLENEMOFOCI = /s/.FALSE./.TRUE./g' oifs-43r3-foci/src/ifs/module/yommcc.F90 # will fail if the "'" is removed subprocess.run( shlex.split(command), check=True, shell=(command.startswith("./") and command.endswith(".sh")), )
[docs] def output(self): print() subtasklist = [] osubtasklist = [] self.package.output() print(" Todo: ", self.todo) if self.only_subtask: if self.only_subtask == "NONE": print(" NO VALID SUBTASKS!!!") else: for subtask in self.only_subtask: print(" Only Subtask:", subtask.package.raw_name) for subtask in self.subtasks: subtasklist.append(subtask.raw_name) for osubtask in self.ordered_tasks: osubtasklist.append(osubtask.raw_name) if not subtasklist == []: print(" Subtasks:", subtasklist) if not osubtasklist == []: print(" Ordered Subtasks:", osubtasklist) self.output_steps() if not self.folders_after_download == []: print(" The following folders should exist after download:") for folder in self.folders_after_download: print(" ", folder) if not self.binaries_after_compile == []: print(" The following files should exist after compiling:") for binfile in self.binaries_after_compile: print(" ", binfile)
[docs] def output_steps(self): if not self.command_list == []: print(" Executing commands in this order:") for command in self.shown_command_list: print(" ", command)