Source code for kvalikirstu2.gui_model

"""The model part of the MVC architecture.
"""
from copy import deepcopy
from enum import Enum
import os
import subprocess
from pubsub import pub
from kvalikirstu2 import argument_parser
from kvalikirstu2 import exceptions
from kvalikirstu2 import kvali_interface
from kvalikirstu2 import utils
from kvalikirstu2.localization import _
from kvalikirstu2 import localization


[docs]class Messages: """ Contains the identifiers for the messages sent by the GUI model. """ UNSAVED_CHANGES = "unsaved_changes" # Is there any unsaved changes to the headers? STUDY_EXISTS = "study_exists" # Does the study exist? INDEX_EXISTS = "index_exists" # Does the index exist? TEMP_EXISTS = "temp_exists" # Do temp files exist? LANGUAGES = "languages" # Gives the list of languages ALL_HEADERS_CHANGED = "all_headers_changed" # Updated all headers HEADER_CHANGED = "header_changed" # A single header changed UNDO_REDO_CHANGED = "undo_redo_changed" # Can program undo/redo? HEADER_TYPE_SHOW = "header_type_shown" # Updates what header types are shown in the GUI TASK_PROGRESS = "task_progress" # A task made progress, sends an info message TASK_FINISHED = "task_finished" # A task finished, sends an info message
[docs]class HeaderType(Enum): """Enum class representing in which type a header belongs in. """ NORMAL = 1 BUILTIN = 2 NOT_HEADER = 3
# pylint: disable=R0201, R0904
[docs]class GUIModel: """A class that contains the program state for the GUI. """ def __init__(self): self.visible_headers = [] self.all_headers = {} self.types_visible = {HeaderType.NORMAL: True, HeaderType.BUILTIN: False, HeaderType.NOT_HEADER: False} self.unsaved_changes = False self.move_rows_active = False self.undo_stack = [] self.redo_stack = [] self.header_info = None
[docs] def broadcast_initial_messages(self): """Broadcasts initial messages about program state after the GUI has been initialized. Has to be called manually. """ args = argument_parser.get_args() pub.sendMessage(Messages.ALL_HEADERS_CHANGED, headers=[]) pub.sendMessage(Messages.INDEX_EXISTS, exists=False) pub.sendMessage(Messages.LANGUAGES, languages=localization.get_languages(), selected_language=args.index_lang) pub.sendMessage(Messages.STUDY_EXISTS, exists=False) pub.sendMessage(Messages.TEMP_EXISTS, exists=False) pub.sendMessage(Messages.UNDO_REDO_CHANGED, redo=False, undo=False) kvali_interface.check_archive_interface()
# Header stuff
[docs] def set_row_to_headerline(self, header_line): """Replace header. :param HeaderLine header_line: The new header. """ self._change_headers([header_line])
[docs] def set_row_name(self, index, value): """Sets the name for a header. :param index: The index of the row. :param str value: New name of the header. """ header = deepcopy(self.visible_headers[index]) header.edited_header = value self._change_headers([header])
[docs] def select_index_language(self, lang): """Select language for the index. :param str lang: The language to set the index language to. """ argument_parser.change_setting("index_lang", lang) if kvali_interface.study_exists(): kvali_interface.parse_metadata()
[docs] def check_changes_valid(self): """Checks if the current changes are valid. """ headers = self.all_headers.values() selected_headers = [header for header in headers if header.get_header_type() == HeaderType.NORMAL] daf_headers = [header for header in headers if header.status == HeaderStatus.DAF_HEADER] if len(selected_headers) <= 1: raise exceptions.HeaderEditException(_("Not enough headers selected!")) if len(daf_headers) > 1: raise exceptions.HeaderEditException(_("Can only select one daF header!"))
[docs] def save_changes(self): """Saves changes to headers. """ if not self.unsaved_changes: return False self.check_changes_valid() kvali_interface.save_header_info(self.header_info) pub.sendMessage(Messages.TEMP_EXISTS, exists=kvali_interface.temp_files_exist()) self._set_unsaved_changes(False) return True
[docs] def save_changes_to_daf(self): """Saves changes to data files. """ self.check_changes_valid() kvali_interface.save_header_info(self.header_info) kvali_interface.rename_headers_in_data_files() self._read_headers() self._broadcast_study() self._set_unsaved_changes(False) pub.sendMessage(Messages.TASK_FINISHED, msg=_("Saved changes to daF files."))
[docs] def undo(self): """Undos top-most header change in the undo stack. """ headers = self.undo_stack.pop() self._change_headers(headers, True, False)
[docs] def redo(self): """Redos top-most header change in the top stack. """ headers = self.redo_stack.pop() self._change_headers(headers, False, True)
[docs] def toggle_move_rows(self): """Toggles attribute move_rows_active. """ self.move_rows_active ^= True
def _swap_headers(self, index1, index2): index1_header = deepcopy(self.visible_headers[index1]) index2_header = deepcopy(self.visible_headers[index2]) index1_header.index = index2 index2_header.index = index1 return index1_header, index2_header
[docs] def move_up(self, index): """Moves header in the index upwards by one. :param int index: The index of the header to move. """ if index > 0 and self.move_rows_active: index1, index2 = self._swap_headers(index, index-1) self._change_headers([index1, index2])
[docs] def move_down(self, index): """Moves header in the index downwards by one. :param int index: The index of the header to move. """ if index < len(self.visible_headers) - 1 and self.move_rows_active: index1, index2 = self._swap_headers(index, index+1) self._change_headers([index1, index2])
[docs] def set_type_shown(self, header_type, should_show): """Sets whether or not header type should be shown in the list. :param HeaderType header_type: The header type. :param bool should_show: Should the header type be shown in the GUI? """ self.types_visible[header_type] = should_show self._update_headers_from_headerinfo() self._broadcast_study()
[docs] def get_headerline(self, index): """Returns the HeaderLine in given index. :param int index: The index of the header to get. :rtype: HeaderLine """ return self.visible_headers[index]
[docs] def change_header_property(self, indexes, change_func, no_action_condition=None): """Changes a property of the headers selected on the listctrl. :param indexes: A list of indexes. :param callable change_func: A function that changes a header property. :param callable no_action_condition: When called with a HeaderLine object, :return: True if the property should not be changed. """ new_headers = [] for index in indexes: header_line = deepcopy(self.get_headerline(index)) if not no_action_condition or not no_action_condition(header_line): change_func(header_line) new_headers.append(header_line) self._change_headers(new_headers)
def _change_headers(self, headers, is_undo=False, is_redo=False): """Changes headers. :param list indexes: List of indexes for the headers. :param dict headers: A dictionary of headers from index to header. :param bool is_undo: Is the change an undo operation? """ old_headers = [] for header in headers: old_header = self.all_headers[header.original_header] old_headers.append(old_header) if is_undo: self.redo_stack.append(old_headers) elif is_redo: self.undo_stack.append(old_headers) else: self.redo_stack.clear() self.undo_stack.append(old_headers) self._update_headerinfo(headers) self._set_unsaved_changes(True) self._update_headers_from_headerinfo() self._broadcast_study() pub.sendMessage(Messages.UNDO_REDO_CHANGED, undo=bool(self.undo_stack), redo=bool(self.redo_stack)) # Generation
[docs] def generate_tempfiles(self): """Generates tempfiles. """ kvali_interface.set_header_info(self.header_info) kvali_interface.generate_tempfiles()
[docs] def generate_output(self): """Generates index and other output files. """ self.save_changes() kvali_interface.generate_output() pub.sendMessage(Messages.INDEX_EXISTS, exists=True) pub.sendMessage(Messages.TASK_FINISHED, msg=_("Index created successfully."))
[docs] def generate_citation_requirement(self): """Generates a citation requirement for all files. """ kvali_interface.generate_citreq() pub.sendMessage(Messages.TASK_FINISHED, msg=_("Citation requirements were generated without errors."))
# Edit daF files
[docs] def get_daf_data(self): """Returns a list of dafData instances, where content is a string that corresponds to the text content of the file. The content is an empty string if the file cannot be read. """ data = kvali_interface.get_daf_data() self._update_index_exists() return data
[docs] def add_language_codes(self, mapping): """Adds language codes to files. :param mapping: A dictionary from old file paths to language codes. """ kvali_interface.add_language_codes(mapping) self._update_index_exists() pub.sendMessage(Messages.TASK_FINISHED, msg=_("Renamed daF files with language codes."))
[docs] def get_language_codes(self): """Gets the different language codes available. """ return argument_parser.get_languages()
[docs] def rename_daf_files(self): """Renames data files. """ kvali_interface.rename_daf_files() self._update_index_exists() pub.sendMessage(Messages.TASK_FINISHED, msg=_("Data files renamed successfully."))
[docs] def set_study_path(self, path): """Sets study path to given path. :param path: The directory path for the study. """ kvali_interface.set_study_path(path) self._read_headers() self._set_unsaved_changes(False) self._broadcast_study() return kvali_interface.convertable_files_exist()
[docs] def add_text_to_dafs(self, text): """Adds text to data files. :param text: Text to be added. """ kvali_interface.add_text_to_dafs(text) msg = _("Added text \"{}\" to all files.").format(text) pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def replace_text(self, old_text, new_text): """Replace text in files. :param old_text: Text to replace. :param new_text: Text to replace the old text with. """ file_list = kvali_interface.replace_text(old_text, new_text) file_list = {os.path.basename(filen): count for filen, count in file_list.items()} files = "\n".join(["{} ({})".format(filename, count) for filename, count in file_list.items() if count > 0]) return files
[docs] def get_file_count(self): """Returns the number of files in the current study. """ return kvali_interface.get_file_count()
[docs] def get_file_paths(self): """Returns the paths of all files in the current study. """ return kvali_interface.get_data_filepaths()
[docs] def get_preview(self, path): """Returns a string with the first <20 lines of a file. :param path: The path of the file to preview. """ pre = kvali_interface.get_rows_from_file(path, 20) return pre
[docs] def convert_txt_encoding(self): """Convert txt encoding. """ success = False count = 0 try: for converted in kvali_interface.convert_txt_encoding(): filename = os.path.basename(converted) count += 1 message = "{}: {}".format(_("Converting file"), filename) pub.sendMessage(Messages.TASK_PROGRESS, msg=message) kvali_interface.set_study_and_header_info() self._read_headers() success = True except Exception as exc: raise exc finally: msg = _("Converting files completed") if success else "" pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def convert_to_odt(self): """Convert to odt. """ count = 0 file_ct = self.get_file_count() success = False try: for converted in kvali_interface.convert_to_odt(): filename = os.path.basename(converted) count += 1 message = "{}: {} ({}/{})".format(_("Converting file"), filename, count, file_ct) pub.sendMessage(Messages.TASK_PROGRESS, msg=message) kvali_interface.set_study_and_header_info() self._read_headers() success = True except Exception as exc: raise exc finally: msg = _("Converting files completed") if success else "" pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def convert_to_txt(self): """Convert to txt. """ count = 0 file_ct = self.get_file_count() success = False try: for converted in kvali_interface.convert_to_txt(): filename = os.path.basename(converted) count += 1 message = "{}: {} ({}/{})".format(_("Converting file"), filename, count, file_ct) pub.sendMessage(Messages.TASK_PROGRESS, msg=message) kvali_interface.set_study_and_header_info() self._read_headers() success = True except Exception as exc: raise exc finally: msg = _("Converting files completed") if success else "" pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def get_tempfile_content(self, path): """Gets the content from a temporary file for editing purposes. :param path: The path of the tempfile. """ return kvali_interface.get_textfile_content(path)
[docs] def save_tempfile_content(self, path, content): """Saves tempfile with content. :param path: The path of the tempfile. :param content: The content to save into the tempfile. """ kvali_interface.save_tempfile_content(path, content)
# Miscellaneous
[docs] def open_index(self): """Opens the index. """ os.system("start \"\" \"%s\"" % kvali_interface.get_index_path())
[docs] def open_data_folder(self): """Opens the explorer at the directory of the current study data. """ subprocess.Popen("explorer {}".format(kvali_interface.get_data_path()))
[docs] def check_unsaved_changes(self): """Returns whether or not model has unsaved changes. """ return self.unsaved_changes
[docs] def delete_tempfiles(self): """Deletes tempfiles. """ kvali_interface.delete_tempfiles() pub.sendMessage(Messages.TASK_FINISHED, msg=_("Temporary files deleted.")) pub.sendMessage(Messages.TEMP_EXISTS, exists=False)
[docs] def delete_index(self): """Deletes the index. """ kvali_interface.delete_index() pub.sendMessage(Messages.INDEX_EXISTS, exists=False)
[docs] def daf_container_exists(self): """Returns True if a header list file exists. """ return os.path.exists(kvali_interface.get_daf_container_path())
[docs] def setup_external_file_study(self, headers): """Sets up an external file study. :param headers: List of headers. """ kvali_interface.setup_external_file_study(headers) self._read_headers() self._broadcast_study() pub.sendMessage(Messages.TASK_FINISHED, msg=_("Created header list."))
[docs] def get_backups(self): """Returns the backups that the study state can be restored to. """ return kvali_interface.get_backups()
[docs] def restore_backup(self, index): """Restore backup in index. :param index: The index of the backup to restore to. """ kvali_interface.restore_backup(index) self._read_headers() self._broadcast_study() pub.sendMessage(Messages.TASK_FINISHED, msg=_("Restored backup number {}").format(index))
[docs] def get_metadata(self): """Gets the metadata for a study. """ return kvali_interface.get_metadata()
[docs] def get_study_path(self): """Gets the study path. """ return kvali_interface.get_study_path()
[docs] def get_data_path(self): """Returns the data path. """ return kvali_interface.get_data_path()
[docs] def index_exists(self): """Does the index exist? """ return kvali_interface.index_exists()
[docs] def get_commands(self): """Returns a list of commands. :return: A list of commands. """ return kvali_interface.get_commands()
[docs] def execute_command(self, index): """Executes a command. :param index: Index of the command. """ commands = kvali_interface.get_commands() commands[index].execute()
# Protected internal functions def _update_index_exists(self): pub.sendMessage(Messages.INDEX_EXISTS, exists=kvali_interface.index_exists()) def _read_headers(self): """Reads headers from kvali_interface. """ self.header_info = deepcopy(kvali_interface.get_header_info()) self.redo_stack.clear() self.undo_stack.clear() self._update_headers_from_headerinfo() def _update_headerinfo(self, headers): for header in headers: self._header_updated(header) self._update_headers_from_headerinfo() self._broadcast_study() def _update_headers_from_headerinfo(self): """Updates headers based on information in self.header_info """ old_visible_headers = deepcopy(self.visible_headers) self.visible_headers.clear() sorted_headers = sorted(self.header_info.header_indexes.items(), key=lambda x: x[1]) for header, _tmp in sorted_headers: header_line = HeaderLine(header, self.header_info) header_type = header_line.get_header_type() if self.types_visible[header_type]: self.visible_headers.append(header_line) self.all_headers[header_line.original_header] = header_line for key_pair_string in self.header_info.value_mapping: key_header, key_val = utils.get_pair_out_of_string(key_pair_string) pair = (key_header, key_val) for header in self.visible_headers: if header.original_header == key_header: value = self.header_info.value_mapping[str(pair)] header.add_value_mapping(pair, value) break if len(self.visible_headers) == len(old_visible_headers): for index, header in enumerate(old_visible_headers): new_header = self.visible_headers[index] if header != new_header: pub.sendMessage(Messages.HEADER_CHANGED, index=index, header=new_header) else: pub.sendMessage(Messages.ALL_HEADERS_CHANGED, headers=self.visible_headers) pub.sendMessage(Messages.UNDO_REDO_CHANGED, undo=bool(self.undo_stack), redo=bool(self.redo_stack)) def _broadcast_study(self): """Broadcasts messages for a newly selected study. """ pub.sendMessage(Messages.STUDY_EXISTS, exists=kvali_interface.study_exists()) pub.sendMessage(Messages.INDEX_EXISTS, exists=kvali_interface.index_exists()) pub.sendMessage(Messages.TEMP_EXISTS, exists=kvali_interface.temp_files_exist()) def _header_updated(self, header): """Updates the headerinfo. :param HeaderLine header: The new header. """ header_info = self.header_info key = header.original_header header_info.header_indexes[key] = header.index header_info.header_enabled[key] = header.status == HeaderStatus.SHOW header_info.header_alignment[key] = header.alignment if not header.builtin: header_info.selected_headers[key] = header.status != HeaderStatus.NOT_HEADER header_info.header_mapping[key] = header.edited_header if header.status == HeaderStatus.DAF_HEADER: header_info.daf_header = header.edited_header elif header.edited_header == header_info.daf_header: header_info.daf_header = None for key_pair, new_val in header.get_values().items(): header_info.value_mapping[str(key_pair)] = new_val kvali_interface.set_header_info(self.header_info) self._set_unsaved_changes(True) def _set_unsaved_changes(self, changes): """Sets self.unsaved_changes to True/False. :param changes: Determines whether or not there's unsaved changes. """ if changes == self.unsaved_changes: return self.unsaved_changes = changes pub.sendMessage(Messages.UNSAVED_CHANGES, unsaved_changes=changes)
# pylint: enable=R0201, R0904
[docs]class HeaderStatus(Enum): """ Header status enum. Values ------ SHOW = A header to include in the index file. HIDE = A header not to be included in the index file. NOT_HEADER = Indicates a data paragraph line. DAF_HEADER = The data file header. """ SHOW = 1 HIDE = 2 NOT_HEADER = 3 DAF_HEADER = 4 def __str__(self): status_strings = {1: _("Show"), 2: _("Hide"), 3: _("Not header"), 4: _("DaF header")} return status_strings[int(self.value)]
[docs]class StatusError(Exception): """An exception class to report issues with setting the status for a header."""
[docs]class HeaderLine: """Stores information about each header row in GUIModel """ def __init__(self, header, header_info): """Constructor. :param str header: The name of the header. :param HeaderInfo header_info: Contains information from the headers in the study. """ self.original_header = header self.index = header_info.header_indexes[header] self.alignment = header_info.header_alignment[header] self.builtin = not header_info.is_ordinary_header(header) self.edited_header = header if self.builtin else header_info.header_mapping[header] self.status = self._get_initial_status(header_info) self.value_mapping = header_info.value_mapping self._values = {} def __eq__(self, other): return (self.original_header == other.original_header and self.status == other.status and self.index == other.index and self.builtin == other.builtin and self.edited_header == other.edited_header and self.value_mapping == other.value_mapping and self.alignment == other.alignment)
[docs] def get_header_type(self): """Gets the HeaderType enum for the header. """ if self.builtin: return HeaderType.BUILTIN if self.status == HeaderStatus.NOT_HEADER: return HeaderType.NOT_HEADER return HeaderType.NORMAL
[docs] def set_mapping(self, pair, value): """Sets a value mapping""" self._values[pair] = value
[docs] def add_value_mapping(self, pair, value): """Adds a value mapping from (key, original_value) to value. """ self._values[pair] = value
[docs] def get_values(self): """Gets values.""" return self._values
def __lt__(self, other): if self.status == other.status: if self.index == other.index: return self.edited_header < other.edited_header return self.index < other.index return self.status.value < other.status.value def _get_initial_status(self, header_info): if self.edited_header == header_info.daf_header: return HeaderStatus.DAF_HEADER if self.builtin or header_info.selected_headers[self.original_header]: # Builtin header can't be non-header if header_info.header_enabled[self.original_header]: return HeaderStatus.SHOW return HeaderStatus.HIDE return HeaderStatus.NOT_HEADER
[docs] def set_status(self, status): """Sets HeaderLine status. :param int status: The new status. """ if self.builtin and status == HeaderStatus.DAF_HEADER: raise StatusError('Cannot set built-in header as daF header!') self.status = status
[docs] def set_align(self, align): """Sets HeaderLine alignment. :param str align: The new alignment. """ self.alignment = align
[docs] def restore_original_header(self): """Restores original header. """ self.edited_header = self.original_header
[docs] def update_changed_values(self, new_values): """Updates changed values for this header """ for key, new_value in new_values.items(): self._values[key] = new_value