Source code for esm_database.esm_database

import sys, os
from . import getch
import datetime

[docs]class DisplayDatabase(): def __init__(self, tablename = None): from . import location_database query = location_database.session.query(location_database.database_location) results = query.all() if len(results) == 0: print("No tables registered, sorry...") sys.exit(-1) all_tablenames = [result.table_name for result in results] if not tablename: pass elif tablename not in all_tablenames: print ("Unknown table name.") tablename = None if not tablename: print ("Please choose one of the following tables:") for result in results: print (result) while True: choice = input("ID (q to quit): ") if choice in ["q", "Q"]: sys.exit(0) try: choice = int(choice) tablename = [result.table_name for result in results if result.id == choice] if tablename == []: print ("ID not valid.") else: tablename = str(tablename[0]) break except: print ("ID needs to be of type integer, please try again...") print (tablename) table = [table for table in results if tablename == table.table_name] table = table[0] package = __import__(table.class_in) database = getattr(package, "database") self.entry_type = getattr(database, tablename) #sys.exit(0) # # if tablename == "experiments": # that needs to disappear ## from esm_runscripts import database # self.entry_type = database.experiment # else: # print ("Unknown table, quitting...") # sys.exit(-1) # self.session = database.session query = database.session.query(self.entry_type) results = query.all() if not type(results) == list: results=[results] self.query = query self.verbose = False self.all_results = results self.results = results table = getattr(self.entry_type, "__table__") columns = getattr(table, "columns") self.columns = [str(column).replace(tablename+".", "") for column in columns] while True: self.output_writer() self.decision_maker()
[docs] def output_writer(self): lines_of_terminal = os.get_terminal_size().lines for line in range(0, lines_of_terminal): print() if not self.verbose: getattr(self.entry_type, "topline")() for result in self.results: if self.verbose: getattr(self.entry_type, "nicer_output")(result) else: print (result) print ("------------------------------------------------------------------------------------------------------------------------------") print ("[S]elect Entry [V]erbose on/off [R]eset selection [E]dit Entry [D]elete Entry [Q]uit") print ("------------------------------------------------------------------------------------------------------------------------------")
[docs] def ask_column(self): find = input("Which column " + str(self.columns) + "? ").lower() if find == "": return None for column in self.columns: if column.startswith(find): find_column = column break if not find_column: print ("Unknown column.") return None return find_column
[docs] def ask_dataset(self): edit_id = None if len(self.results) == 0: return None elif len(self.results) == 1: print ("Select entry id=" + str(self.results[0].id) + " [y/n]?") yesno = getch.get_one_of(["y", "Y", "n", "N"]) if yesno.lower() == "y": edit_id = self.results[0].id edit_dataset = self.results[0] if not edit_id: edit_id = input("Please enter dataset id: ") if edit_id == "": return None try: edit_id = int(edit_id) except: print ("id needs to be of type int.") return None edit_dataset = [result for result in self.all_results if result.id == edit_id] if edit_dataset == []: print ("Unknown dataset id.") return None else: edit_dataset = edit_dataset[0] return edit_dataset
[docs] def decision_maker(self): char = getch.get_one_of(["q", "Q", "v", "V", "s", "S", "e", "E", "r", "R", "D"]) if char.lower() == "q": sys.exit(0) elif char.lower() == "v": self.verbose = not self.verbose elif char.lower() == "s": self.select_stuff() elif char.lower() == "e": self.edit_dataset() elif char.lower() == "r": self.results = self.all_results elif char == "D": self.remove_datasets()
[docs] def remove_datasets(self): all_ids = [result.id for result in self.results] print ("Really remove entry/entries " + str(all_ids) + " [Y/n]? ") really = getch.get_one_of(["Y", "n", "N"]) if really == "Y": for this_id in all_ids: self.query.filter_by(id = this_id).delete() self.session.commit() self.all_results = [result for result in self.all_results if result.id not in all_ids] self.results = self.all_results
[docs] def edit_dataset(self): edit_dataset = self.ask_dataset() if not edit_dataset: return edit_column = self.ask_column() if not edit_column: return column_type = type(getattr(self.all_results[0], edit_column)) edit_entry = input("Change to? ") if edit_entry == "": return # try to convert user input if column_type == int: try: edit_entry = int(edit_entry) except: print("Wrong data type, should be int!") return elif column_type == datetime.datetime: try: edit_entry = datetime.datetime(edit_entry) except: print("Wrong data type, should be datetime.datetime!") return this_entry = self.query.filter_by(id = edit_dataset.id).first() try: setattr(this_entry, edit_column, edit_entry) self.session.commit() except: print("Problem changing column " + edit_column + " of dataset " + str(edit_dataset.id) + ". ") return new_results = [] new_all_results = [] for result in self.all_results: need_this = False if result in self.results: need_this = True if result == edit_dataset: setattr(result, edit_column, edit_entry) new_all_results += [result] if need_this: new_results += [result] self.results = new_results self.all_results = new_all_results
[docs] def select_stuff(self): find_column = self.ask_column() if not find_column: return find_value = input("Search for? ") if find_value == "": return if find_column and find_value: if find_column == "id": self.results = [result for result in self.results if find_value == str(getattr(result, find_column))] else: self.results = [result for result in self.results if find_value in str(getattr(result, find_column))]