"""The model part of the MVC architecture.
"""
from copy import deepcopy
from enum import Enum
import os
import subprocess
from pubsub import pub
from kvalikirstu2 import argument_parser
from kvalikirstu2 import exceptions
from kvalikirstu2 import kvali_interface
from kvalikirstu2 import utils
from kvalikirstu2.localization import _
from kvalikirstu2 import localization
[docs]class Messages:
""" Contains the identifiers for the messages sent by the GUI model.
"""
UNSAVED_CHANGES = "unsaved_changes" # Is there any unsaved changes to the headers?
STUDY_EXISTS = "study_exists" # Does the study exist?
INDEX_EXISTS = "index_exists" # Does the index exist?
TEMP_EXISTS = "temp_exists" # Do temp files exist?
LANGUAGES = "languages" # Gives the list of languages
ALL_HEADERS_CHANGED = "all_headers_changed" # Updated all headers
HEADER_CHANGED = "header_changed" # A single header changed
UNDO_REDO_CHANGED = "undo_redo_changed" # Can program undo/redo?
HEADER_TYPE_SHOW = "header_type_shown" # Updates what header types are shown in the GUI
TASK_PROGRESS = "task_progress" # A task made progress, sends an info message
TASK_FINISHED = "task_finished" # A task finished, sends an info message
# pylint: disable=R0201, R0904
[docs]class GUIModel:
"""A class that contains the program state for the GUI.
"""
def __init__(self):
self.visible_headers = []
self.all_headers = {}
self.types_visible = {HeaderType.NORMAL: True,
HeaderType.BUILTIN: False,
HeaderType.NOT_HEADER: False}
self.unsaved_changes = False
self.move_rows_active = False
self.undo_stack = []
self.redo_stack = []
self.header_info = None
[docs] def broadcast_initial_messages(self):
"""Broadcasts initial messages about program state after the GUI has been initialized.
Has to be called manually.
"""
args = argument_parser.get_args()
pub.sendMessage(Messages.ALL_HEADERS_CHANGED, headers=[])
pub.sendMessage(Messages.INDEX_EXISTS, exists=False)
pub.sendMessage(Messages.LANGUAGES, languages=localization.get_languages(), selected_language=args.index_lang)
pub.sendMessage(Messages.STUDY_EXISTS, exists=False)
pub.sendMessage(Messages.TEMP_EXISTS, exists=False)
pub.sendMessage(Messages.UNDO_REDO_CHANGED, redo=False, undo=False)
kvali_interface.check_archive_interface()
# Header stuff
[docs] def set_row_name(self, index, value):
"""Sets the name for a header.
:param index: The index of the row.
:param str value: New name of the header.
"""
header = deepcopy(self.visible_headers[index])
header.edited_header = value
self._change_headers([header])
[docs] def select_index_language(self, lang):
"""Select language for the index.
:param str lang: The language to set the index language to.
"""
argument_parser.change_setting("index_lang", lang)
if kvali_interface.study_exists():
kvali_interface.parse_metadata()
[docs] def check_changes_valid(self):
"""Checks if the current changes are valid.
"""
headers = self.all_headers.values()
selected_headers = [header for header in headers
if header.get_header_type() == HeaderType.NORMAL]
daf_headers = [header for header in headers
if header.status == HeaderStatus.DAF_HEADER]
if len(selected_headers) <= 1:
raise exceptions.HeaderEditException(_("Not enough headers selected!"))
if len(daf_headers) > 1:
raise exceptions.HeaderEditException(_("Can only select one daF header!"))
[docs] def save_changes(self):
"""Saves changes to headers.
"""
if not self.unsaved_changes:
return False
self.check_changes_valid()
kvali_interface.save_header_info(self.header_info)
pub.sendMessage(Messages.TEMP_EXISTS, exists=kvali_interface.temp_files_exist())
self._set_unsaved_changes(False)
return True
[docs] def save_changes_to_daf(self):
"""Saves changes to data files.
"""
self.check_changes_valid()
kvali_interface.save_header_info(self.header_info)
kvali_interface.rename_headers_in_data_files()
self._read_headers()
self._broadcast_study()
self._set_unsaved_changes(False)
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Saved changes to daF files."))
[docs] def undo(self):
"""Undos top-most header change in the undo stack.
"""
headers = self.undo_stack.pop()
self._change_headers(headers, True, False)
[docs] def redo(self):
"""Redos top-most header change in the top stack.
"""
headers = self.redo_stack.pop()
self._change_headers(headers, False, True)
[docs] def toggle_move_rows(self):
"""Toggles attribute move_rows_active.
"""
self.move_rows_active ^= True
def _swap_headers(self, index1, index2):
index1_header = deepcopy(self.visible_headers[index1])
index2_header = deepcopy(self.visible_headers[index2])
index1_header.index = index2
index2_header.index = index1
return index1_header, index2_header
[docs] def move_up(self, index):
"""Moves header in the index upwards by one.
:param int index: The index of the header to move.
"""
if index > 0 and self.move_rows_active:
index1, index2 = self._swap_headers(index, index-1)
self._change_headers([index1, index2])
[docs] def move_down(self, index):
"""Moves header in the index downwards by one.
:param int index: The index of the header to move.
"""
if index < len(self.visible_headers) - 1 and self.move_rows_active:
index1, index2 = self._swap_headers(index, index+1)
self._change_headers([index1, index2])
[docs] def set_type_shown(self, header_type, should_show):
"""Sets whether or not header type should be shown in the list.
:param HeaderType header_type: The header type.
:param bool should_show: Should the header type be shown in the GUI?
"""
self.types_visible[header_type] = should_show
self._update_headers_from_headerinfo()
self._broadcast_study()
def _change_headers(self, headers, is_undo=False, is_redo=False):
"""Changes headers.
:param list indexes: List of indexes for the headers.
:param dict headers: A dictionary of headers from index to header.
:param bool is_undo: Is the change an undo operation?
"""
old_headers = []
for header in headers:
old_header = self.all_headers[header.original_header]
old_headers.append(old_header)
if is_undo:
self.redo_stack.append(old_headers)
elif is_redo:
self.undo_stack.append(old_headers)
else:
self.redo_stack.clear()
self.undo_stack.append(old_headers)
self._update_headerinfo(headers)
self._set_unsaved_changes(True)
self._update_headers_from_headerinfo()
self._broadcast_study()
pub.sendMessage(Messages.UNDO_REDO_CHANGED, undo=bool(self.undo_stack), redo=bool(self.redo_stack))
# Generation
[docs] def generate_tempfiles(self):
"""Generates tempfiles.
"""
kvali_interface.set_header_info(self.header_info)
kvali_interface.generate_tempfiles()
[docs] def generate_output(self):
"""Generates index and other output files.
"""
self.save_changes()
kvali_interface.generate_output()
pub.sendMessage(Messages.INDEX_EXISTS, exists=True)
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Index created successfully."))
[docs] def generate_citation_requirement(self):
"""Generates a citation requirement for all files.
"""
kvali_interface.generate_citreq()
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Citation requirements were generated without errors."))
# Edit daF files
[docs] def get_daf_data(self):
"""Returns a list of dafData instances, where content is a string that corresponds to the text content
of the file. The content is an empty string if the file cannot be read.
"""
data = kvali_interface.get_daf_data()
self._update_index_exists()
return data
[docs] def add_language_codes(self, mapping):
"""Adds language codes to files.
:param mapping: A dictionary from old file paths to language codes.
"""
kvali_interface.add_language_codes(mapping)
self._update_index_exists()
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Renamed daF files with language codes."))
[docs] def get_language_codes(self):
"""Gets the different language codes available.
"""
return argument_parser.get_languages()
[docs] def rename_daf_files(self):
"""Renames data files.
"""
kvali_interface.rename_daf_files()
self._update_index_exists()
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Data files renamed successfully."))
[docs] def set_study_path(self, path):
"""Sets study path to given path.
:param path: The directory path for the study.
"""
kvali_interface.set_study_path(path)
self._read_headers()
self._set_unsaved_changes(False)
self._broadcast_study()
return kvali_interface.convertable_files_exist()
[docs] def add_text_to_dafs(self, text):
"""Adds text to data files.
:param text: Text to be added.
"""
kvali_interface.add_text_to_dafs(text)
msg = _("Added text \"{}\" to all files.").format(text)
pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def replace_text(self, old_text, new_text):
"""Replace text in files.
:param old_text: Text to replace.
:param new_text: Text to replace the old text with.
"""
file_list = kvali_interface.replace_text(old_text, new_text)
file_list = {os.path.basename(filen): count for filen, count in file_list.items()}
files = "\n".join(["{} ({})".format(filename, count) for filename, count in file_list.items() if count > 0])
return files
[docs] def get_file_count(self):
"""Returns the number of files in the current study.
"""
return kvali_interface.get_file_count()
[docs] def get_file_paths(self):
"""Returns the paths of all files in the current study.
"""
return kvali_interface.get_data_filepaths()
[docs] def get_preview(self, path):
"""Returns a string with the first <20 lines of a file.
:param path: The path of the file to preview.
"""
pre = kvali_interface.get_rows_from_file(path, 20)
return pre
[docs] def convert_txt_encoding(self):
"""Convert txt encoding.
"""
success = False
count = 0
try:
for converted in kvali_interface.convert_txt_encoding():
filename = os.path.basename(converted)
count += 1
message = "{}: {}".format(_("Converting file"), filename)
pub.sendMessage(Messages.TASK_PROGRESS, msg=message)
kvali_interface.set_study_and_header_info()
self._read_headers()
success = True
except Exception as exc:
raise exc
finally:
msg = _("Converting files completed") if success else ""
pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def convert_to_odt(self):
"""Convert to odt.
"""
count = 0
file_ct = self.get_file_count()
success = False
try:
for converted in kvali_interface.convert_to_odt():
filename = os.path.basename(converted)
count += 1
message = "{}: {} ({}/{})".format(_("Converting file"), filename, count, file_ct)
pub.sendMessage(Messages.TASK_PROGRESS, msg=message)
kvali_interface.set_study_and_header_info()
self._read_headers()
success = True
except Exception as exc:
raise exc
finally:
msg = _("Converting files completed") if success else ""
pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def convert_to_txt(self):
"""Convert to txt.
"""
count = 0
file_ct = self.get_file_count()
success = False
try:
for converted in kvali_interface.convert_to_txt():
filename = os.path.basename(converted)
count += 1
message = "{}: {} ({}/{})".format(_("Converting file"), filename, count, file_ct)
pub.sendMessage(Messages.TASK_PROGRESS, msg=message)
kvali_interface.set_study_and_header_info()
self._read_headers()
success = True
except Exception as exc:
raise exc
finally:
msg = _("Converting files completed") if success else ""
pub.sendMessage(Messages.TASK_FINISHED, msg=msg)
[docs] def get_tempfile_content(self, path):
"""Gets the content from a temporary file for editing purposes.
:param path: The path of the tempfile.
"""
return kvali_interface.get_textfile_content(path)
[docs] def save_tempfile_content(self, path, content):
"""Saves tempfile with content.
:param path: The path of the tempfile.
:param content: The content to save into the tempfile.
"""
kvali_interface.save_tempfile_content(path, content)
# Miscellaneous
[docs] def open_index(self):
"""Opens the index.
"""
os.system("start \"\" \"%s\"" % kvali_interface.get_index_path())
[docs] def open_data_folder(self):
"""Opens the explorer at the directory of the current study data.
"""
subprocess.Popen("explorer {}".format(kvali_interface.get_data_path()))
[docs] def check_unsaved_changes(self):
"""Returns whether or not model has unsaved changes.
"""
return self.unsaved_changes
[docs] def delete_tempfiles(self):
"""Deletes tempfiles.
"""
kvali_interface.delete_tempfiles()
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Temporary files deleted."))
pub.sendMessage(Messages.TEMP_EXISTS, exists=False)
[docs] def delete_index(self):
"""Deletes the index.
"""
kvali_interface.delete_index()
pub.sendMessage(Messages.INDEX_EXISTS, exists=False)
[docs] def daf_container_exists(self):
"""Returns True if a header list file exists.
"""
return os.path.exists(kvali_interface.get_daf_container_path())
[docs] def setup_external_file_study(self, headers):
"""Sets up an external file study.
:param headers: List of headers.
"""
kvali_interface.setup_external_file_study(headers)
self._read_headers()
self._broadcast_study()
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Created header list."))
[docs] def get_backups(self):
"""Returns the backups that the study state can be restored to.
"""
return kvali_interface.get_backups()
[docs] def restore_backup(self, index):
"""Restore backup in index.
:param index: The index of the backup to restore to.
"""
kvali_interface.restore_backup(index)
self._read_headers()
self._broadcast_study()
pub.sendMessage(Messages.TASK_FINISHED, msg=_("Restored backup number {}").format(index))
[docs] def get_study_path(self):
"""Gets the study path.
"""
return kvali_interface.get_study_path()
[docs] def get_data_path(self):
"""Returns the data path.
"""
return kvali_interface.get_data_path()
[docs] def index_exists(self):
"""Does the index exist?
"""
return kvali_interface.index_exists()
[docs] def get_commands(self):
"""Returns a list of commands.
:return: A list of commands.
"""
return kvali_interface.get_commands()
[docs] def execute_command(self, index):
"""Executes a command.
:param index: Index of the command.
"""
commands = kvali_interface.get_commands()
commands[index].execute()
# Protected internal functions
def _update_index_exists(self):
pub.sendMessage(Messages.INDEX_EXISTS, exists=kvali_interface.index_exists())
def _read_headers(self):
"""Reads headers from kvali_interface.
"""
self.header_info = deepcopy(kvali_interface.get_header_info())
self.redo_stack.clear()
self.undo_stack.clear()
self._update_headers_from_headerinfo()
def _update_headerinfo(self, headers):
for header in headers:
self._header_updated(header)
self._update_headers_from_headerinfo()
self._broadcast_study()
def _update_headers_from_headerinfo(self):
"""Updates headers based on information in self.header_info
"""
old_visible_headers = deepcopy(self.visible_headers)
self.visible_headers.clear()
sorted_headers = sorted(self.header_info.header_indexes.items(), key=lambda x: x[1])
for header, _tmp in sorted_headers:
header_line = HeaderLine(header, self.header_info)
header_type = header_line.get_header_type()
if self.types_visible[header_type]:
self.visible_headers.append(header_line)
self.all_headers[header_line.original_header] = header_line
for key_pair_string in self.header_info.value_mapping:
key_header, key_val = utils.get_pair_out_of_string(key_pair_string)
pair = (key_header, key_val)
for header in self.visible_headers:
if header.original_header == key_header:
value = self.header_info.value_mapping[str(pair)]
header.add_value_mapping(pair, value)
break
if len(self.visible_headers) == len(old_visible_headers):
for index, header in enumerate(old_visible_headers):
new_header = self.visible_headers[index]
if header != new_header:
pub.sendMessage(Messages.HEADER_CHANGED, index=index, header=new_header)
else:
pub.sendMessage(Messages.ALL_HEADERS_CHANGED, headers=self.visible_headers)
pub.sendMessage(Messages.UNDO_REDO_CHANGED, undo=bool(self.undo_stack), redo=bool(self.redo_stack))
def _broadcast_study(self):
"""Broadcasts messages for a newly selected study.
"""
pub.sendMessage(Messages.STUDY_EXISTS, exists=kvali_interface.study_exists())
pub.sendMessage(Messages.INDEX_EXISTS, exists=kvali_interface.index_exists())
pub.sendMessage(Messages.TEMP_EXISTS, exists=kvali_interface.temp_files_exist())
def _header_updated(self, header):
"""Updates the headerinfo.
:param HeaderLine header: The new header.
"""
header_info = self.header_info
key = header.original_header
header_info.header_indexes[key] = header.index
header_info.header_enabled[key] = header.status == HeaderStatus.SHOW
header_info.header_alignment[key] = header.alignment
if not header.builtin:
header_info.selected_headers[key] = header.status != HeaderStatus.NOT_HEADER
header_info.header_mapping[key] = header.edited_header
if header.status == HeaderStatus.DAF_HEADER:
header_info.daf_header = header.edited_header
elif header.edited_header == header_info.daf_header:
header_info.daf_header = None
for key_pair, new_val in header.get_values().items():
header_info.value_mapping[str(key_pair)] = new_val
kvali_interface.set_header_info(self.header_info)
self._set_unsaved_changes(True)
def _set_unsaved_changes(self, changes):
"""Sets self.unsaved_changes to True/False.
:param changes: Determines whether or not there's unsaved changes.
"""
if changes == self.unsaved_changes:
return
self.unsaved_changes = changes
pub.sendMessage(Messages.UNSAVED_CHANGES, unsaved_changes=changes)
# pylint: enable=R0201, R0904
[docs]class StatusError(Exception):
"""An exception class to report issues with setting the status for a header."""