Source code for kvalikirstu2.kvali_interface

"""Contains the interface through which the application communicates with the rest of the program"""
from copy import deepcopy
import logging
import os
import shutil
import pkg_resources
from kvalikirstu2 import exceptions
from kvalikirstu2 import study
from kvalikirstu2 import utils
from kvalikirstu2 import study_reader
from kvalikirstu2 import argument_parser
from kvalikirstu2 import template_generator
from kvalikirstu2 import writer
from kvalikirstu2 import reader
from kvalikirstu2 import header_scanner
from kvalikirstu2 import csv_generator
from kvalikirstu2.localization import _
from kvalikirstu2 import converter
from kvalikirstu2 import data_file_tempwriter
from kvalikirstu2 import folder_backup
from kvalikirstu2 import data_archive_interface
import kvalikirstu2.localization


logger = logging.getLogger(__name__)


_DATA = {'study_path': None,
         'data_path': None,
         'current_study': None,
         'index_path': None,
         'header_info': None,
         'metadata': None,
         'interface': None}


[docs]class InvalidProgramStateException(Exception): """The program was in an invalid state for the function being called."""
[docs]def register_interface_for_argument_parser(): """Registers the data archive interface for the argument parser.""" # Logging is not properly set up here yet, don't call any logging messages interfaces = list(pkg_resources.iter_entry_points('kvalikirstu2_interface')) if len(interfaces) > 1: raise InvalidProgramStateException('Multiple data archive interfaces installed: %s' % [entry_point.name for entry_point in interfaces]) if not interfaces: interface_class = data_archive_interface.DummyInterface else: interface_class = interfaces[0].load() _DATA['interface'] = interface_class() argument_parser.register_archive_interface(_DATA['interface'])
[docs]def check_archive_interface(): """Checks if the archive interface is the dummy plugin. """ if type(_DATA['interface']) == data_archive_interface.DummyInterface: logger.warning(_('No kvalikirstu2 plugin found. Metadata will not be parsed properly from the study folder.'))
[docs]def get_temp_glob_pattern(): """Gets the glob pattern for temp files. :rtype: str """ if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) pattern = os.path.join(_DATA['study_path'], "temp", "**") logger.debug('Generating temp pattern for path %s: %s', _DATA['study_path'], pattern) return pattern
[docs]def get_headers_json_path(): """Gets the .json path for the current study""" if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) headers_path = os.path.join(_DATA['study_path'], "temp", "%s_headers.json" % _DATA['metadata'].study_id) logger.debug('Header path for path %s, meta %s: %s', _DATA['study_path'], _DATA['metadata'], headers_path) return headers_path
[docs]def get_study_json_path(): """Gets the .json path for the current study""" if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) return _DATA['current_study'].get_full_path()
[docs]def get_index_path(): """Gets the path where the index will be created.""" return _DATA['index_path']
[docs]def get_data_path(): """Returns the path of the current study.""" if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) return _DATA['data_path']
# pylint: disable=E1135,E1137
[docs]def get_selected_headers(): """Gets the selected headers.""" if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) return _DATA['header_info'].selected_headers
# pylint: enable=E1135,E1137
[docs]def get_header_info(): """Returns the saved HeaderInfo object.""" return _DATA['header_info']
[docs]def get_metadata(): """Get the metadata for the study.""" return _DATA['metadata']
[docs]def get_study_path(): """Gets the study path.""" return _DATA['study_path']
def _try_load_header_info(): header_selection_path = get_headers_json_path() if os.path.isfile(header_selection_path): logger.debug('loading existing headers from file %s', header_selection_path) _DATA['header_info'] = utils.json_deserialize(header_selection_path) else: logger.debug('No existing headers, reading headers from file.') _DATA['header_info'] = header_scanner.get_header_info_for_study(_DATA['data_path'])
[docs]def set_study_path(path: str): """Sets the path for the study :param path: The study path. """ if not path: path = os.getcwd() args = argument_parser.get_args() logger.debug('setting study_path %s', path) _DATA['data_path'] = os.path.join(path, args.study_data_folder) _DATA['study_path'] = path _DATA['index_path'] = os.path.join(_DATA['study_path'], "index.html") parse_metadata() _test_backup() _backup_study() set_study_and_header_info()
[docs]def parse_metadata(): """ Parses the metadata for the study. """ metadata = _DATA['interface'].parse_study_information(_DATA['study_path'], _DATA['data_path']) if not metadata: raise IOError(_('Metadata information for study could not be retrieved! Choose the folder containing a data' ' folder with a metadata file inside it.\n')) _DATA['metadata'] = metadata
[docs]def set_study_and_header_info(): """Sets the current study and loads header info.""" _DATA['current_study'] = study.Study(_DATA['metadata'].study_id, _DATA['study_path'], _DATA['metadata'].title) _try_load_header_info()
[docs]def study_exists(): """Does the study exist, ie. is a valid folder specified?""" return bool(_DATA['data_path'])
[docs]def convertable_files_exist(): """Returns True if there are convertable files in the selected study. """ path = _DATA['data_path'] if not path: return False result = bool(utils.get_convertable_files(path)) return result
[docs]def convertable_formats(): """Returns a list of convertable file formats. """ return utils.CONVERTABLE_FORMATS
[docs]def data_files_and_mef_exist(): """Returns True if data path has been set, meF is found and data directory contains files.""" path = _DATA['data_path'] if not path: return False result = _DATA['metadata'] and bool(os.listdir(path)) return result
[docs]def temp_files_exist(): """Have the temp files been generated for the study?""" if not _DATA['study_path']: return False temp_path = os.path.join(_DATA['study_path'], 'temp') return bool(os.path.exists(temp_path))
[docs]def paragraph_files_exist(): """Do the temporary paragraph files exists for the data files? """ if not _DATA['data_path']: return False files = utils.natsorted_glob(get_temp_glob_pattern()) return bool(files)
[docs]def index_exists(): """Does the index exist?""" if not _DATA['data_path']: return False csv_path = os.path.join(_DATA['data_path'], _DATA['interface'].create_csv_name(_DATA['metadata'])) index_path = template_generator.get_index_path(_DATA['study_path']) html_path = template_generator.get_html_path(_DATA['study_path']) return os.path.exists(csv_path) or os.path.exists(index_path) or os.path.exists(html_path)
[docs]def generate_tempfiles(): """Generates the temporary files for the study""" if not _DATA['data_path'] or not _DATA['metadata']: raise InvalidProgramStateException(_("A study has not been chosen.")) try: args = argument_parser.get_args() sreader = study_reader.StudyReader(_DATA['metadata'].study_id, _DATA['metadata'].title, overwrite_temp=args.overwrite_temp, selected_headers=_DATA['header_info'].selected_headers, study_path=_DATA['study_path'], data_folder_name=args.study_data_folder) _DATA['current_study'] = sreader.get_study() _DATA['current_study'].write_to_file() except Exception as exception: raise exceptions.TempFileException(exception)
[docs]def generate_output(): """Generates the index file for the study""" if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) args = argument_parser.get_args() get_current_study() _DATA['current_study'].remap_headers(_DATA['header_info']) template_generator.generate_index(_DATA['study_path'], _DATA['current_study'], _DATA['header_info'], _DATA['metadata']) csv_generator.create_csv(_DATA['current_study'], args.index_lang, args.csv_encoding, _DATA['interface'].create_csv_name(_DATA['metadata']), _DATA['header_info'], argument_parser.get_languages())
[docs]def generate_citreq(): """Generates a citation requirement for a folder""" if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) writer.add_text_to_folder(_DATA['data_path'], _DATA['metadata'].citation_requirement, True)
[docs]def get_current_study(): """Gets the current study. :rtype: Study """ if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) _DATA['current_study'] = study.Study(_DATA['metadata'].study_id, _DATA['study_path'], _DATA['metadata'].title) _DATA['current_study'].load_from_file() parse_metadata() return _DATA['current_study']
[docs]def set_headers(headers, headers_removed, header_indexes=None, header_alignment=None): """Sets the header info from the header editing menu. :param headers: A map of old headers to new headers. :param headers_removed: A list of headers that should be removed. :param header_indexes: A map of header to index. :param header_alignment: A map of header to alignment. """ if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) get_current_study() if header_alignment: _DATA['header_info'].header_alignment = header_alignment if header_indexes: _DATA['header_info'].header_indexes = header_indexes header_keys = set(_DATA['header_info'].header_enabled.keys()) for header in header_keys: if _DATA['header_info'].is_ordinary_header(header) and header in headers: new_header = headers[header] if not utils.is_valid_header(new_header): raise SyntaxError(_('Header %s was not valid!') % new_header) _DATA['header_info'].header_mapping[header] = new_header _DATA['header_info'].header_enabled[header] = header not in headers_removed save_headers()
[docs]def delete_tempfiles(): """Deletes the temporary files used for generating the index. """ if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) shutil.rmtree(os.path.join(_DATA['study_path'], "temp"))
[docs]def save_tempfile_content(path, content): """Rewrites the old temp file with new content. :param str path: The path of the temp file. :param str content: The contents of the file. """ dir_name = os.path.dirname(path) if not os.path.exists(dir_name): os.mkdir(dir_name) writer.write_txt(path, content)
[docs]def get_rows_from_file(path, line_ct): """Reads the first <line_ct lines from the file in path. :param path: The path of the file. :param line_ct: The maximum number of lines to read. """ file_reader = reader.get_reader(path) row = 0 output = "" while row < line_ct and file_reader.can_read(): output += file_reader.read_line() output += os.linesep row += 1 return output
[docs]def get_textfile_content(path): """Gets the content from a temporary file for editing purposes. :param str path: The path of the temp file. """ file_reader = reader.get_reader(path) output = "" while file_reader.can_read(): if output: output += os.linesep output += file_reader.read_line() return output
[docs]def set_selected_headers(selected_headers): """ Set selected headers. :param dict selected_headers: A dictionary of str:bool items. """ _DATA['header_info'].selected_headers = selected_headers for header, value in selected_headers.items(): _DATA['header_info'].header_enabled[header] = value save_headers()
[docs]def save_headers(): """Saves header info content to a .json file.""" utils.json_serialize(_DATA['header_info'], get_headers_json_path()) _backup_study()
[docs]def add_text_to_dafs(text): """ Add text to in front of every data file. :param text: The text added. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) writer.add_text_to_folder(_DATA['data_path'], text) _backup_study()
[docs]def rename_daf_files(): """ Rename daFs in data folder. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) utils.check_if_any_file_in_use(_DATA['data_path'], utils.SUPPORTED_FORMATS) delete_index() _DATA['interface'].rename_data_files_in_folder(_DATA['metadata'], _DATA['data_path']) _backup_study()
[docs]def get_file_count(): """Returns the number of supported and convertable files in the current study. """ return len(get_data_filepaths())
[docs]def get_data_filepaths(): """Returns a list of paths of all supported and convertable files in the current study. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) return utils.get_convertable_files(_DATA['data_path']) + utils.get_supported_files(_DATA['data_path'])
[docs]def convert_to_txt(): """ Converts data files to txt. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) args = argument_parser.get_args() yield from converter.convert_files_with_libreoffice_to_txt(_DATA['data_path'], args.timeout) yield from converter.convert_file_encoding_in_folder(_DATA['data_path'], args.encoding, ".txt")
[docs]def convert_txt_encoding(): """ Converts data files to txt. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) args = argument_parser.get_args() yield from converter.convert_file_encoding_in_folder(_DATA['data_path'], args.encoding, ".txt")
[docs]def convert_to_odt(): """ Converts data files to txt. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) convert_odt = _convert_to_odt() return convert_odt
def _convert_to_odt(): args = argument_parser.get_args() yield from converter.convert_files_with_libreoffice_to_odt(_DATA['data_path'], args.timeout)
[docs]def rename_headers_in_data_files(): """ Rename headers in data files. """ writer.rename_headers_in_folder(_DATA['data_path'], _DATA['header_info']) if temp_files_exist(): delete_tempfiles() _DATA['header_info'].update_headers() save_headers()
[docs]def save_header_info(header_info): """Saves the modified header info. :param HeaderInfo header_info: New header info to be set. """ if not _DATA['study_path'] or not _DATA['current_study']: raise InvalidProgramStateException(_("A study has not been chosen.")) _DATA['header_info'] = header_info save_headers()
[docs]def set_header_info(header_info): """Sets the header info. """ _DATA['header_info'] = deepcopy(header_info)
[docs]def get_all_files_from_data_folder(): """Returns all files from the data folder except the meF and the data files. :returns: A list of relative paths from the folder. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) mefs = _DATA['interface'].get_metadata_files(_DATA['study_path'], _DATA['data_path']) other_files = [] html_path = os.path.join(_DATA['data_path'], 'html') for filepath in utils.natsorted_glob(os.path.join(_DATA['data_path'], '**')): if os.path.isfile(filepath) and filepath not in mefs and html_path not in filepath: other_files.append(filepath) return sorted(other_files)
def _rewrite_readable_for_external_study(): """Rewrite any readable files to eliminate any subjects being detected in them. """ args = argument_parser.get_args() begindata = args.begindata + os.linesep enddata = os.linesep + args.enddata # Rewrite data file temp files so that all text is marked as data for filepath in utils.get_supported_files(_DATA['data_path']): content = get_textfile_content(filepath) temp_filepath = data_file_tempwriter.generate_temp_path(filepath, _DATA['study_path']) save_tempfile_content(temp_filepath, begindata + content + enddata) def _write_daf_container_for_external_study(daf_container_file, headers_list): """Writes a daf container file for an external study. :param daf_container_file: The path to save the daf container in. :param headers_list: A list of headers to be included. """ # Check if headers are valid for header in headers_list: test_header = "%s:" % header if not utils.is_header_line(test_header) or utils.parse_header(test_header)[0] != header: _ = kvalikirstu2.localization._ raise exceptions.InvalidHeaderException(_('Header %s was not valid!') % header) args = argument_parser.get_args() index_lang = args.index_lang _ = kvalikirstu2.localization.get_translation_func(index_lang) files = get_all_files_from_data_folder() with open(daf_container_file, mode='w', encoding=args.encoding, newline='') as file_handle: for filepath in files: if filepath != daf_container_file: rel_path = os.path.relpath(filepath, _DATA['data_path']) file_handle.write('%s: %s%s' % (_("Data file"), rel_path, os.linesep)) for header in headers_list: file_handle.write('%s: %s' % (header, os.linesep)) file_handle.write(os.linesep) def _add_headers_for_external_study(headers): """Adds headers for an external study. :param headers: List of headers. """ header_info = study.HeaderInfo() header_info.add_header(_("Data file")) header_info.set_daf_header(_("Data file")) for header in headers: header_info.add_header(header) header_info.init_builtin_headers() save_header_info(header_info)
[docs]def get_daf_container_path(): """Forms and returns the path of the daf container file. """ daf_container_file = os.path.join(_DATA['data_path'], _DATA['interface'].get_study_data_filename(_DATA['metadata'])) return daf_container_file
[docs]def setup_external_file_study(headers): """ Transforms the study into one that contains external files that are linked to from the index. Any text in readable files is marked as data so it won't be read. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) daf_container_file = get_daf_container_path() if os.path.exists(daf_container_file): os.unlink(daf_container_file) _rewrite_readable_for_external_study() _write_daf_container_for_external_study(daf_container_file, headers) if os.path.exists(get_headers_json_path()): os.unlink(get_headers_json_path()) _add_headers_for_external_study(headers) _backup_study()
def _test_backup(): args = argument_parser.get_args() if folder_backup.folder_too_large(_DATA['study_path'], args.max_backup_folder_size): logger.warning(_('Folder too large to be backed up. Backups will not be available.')) def _backup_study(): """Backs up the study folder.""" args = argument_parser.get_args() folder_backup.backup_folder(_DATA['study_path'], args.max_backup_folder_size, args.max_backups)
[docs]def restore_backup(index): """Restores a folder to a backup with the given index. :param index: The index of the backup. """ folder_backup.restore_backup(_DATA['study_path'], index) set_study_and_header_info()
[docs]def get_backups(): """Get backups with timestamps for the study folder. """ args = argument_parser.get_args() return folder_backup.get_timestamps(_DATA['study_path'], args.max_backups)
[docs]def replace_text(original_text, new_text): """Replace text with new text in data folder. :param original_text: Original text. :param new_text: New text to replace the original. """ if not _DATA['study_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) _backup_study() return writer.replace_in_folder(_DATA['data_path'], original_text, new_text)
[docs]class DaFData: """A class that contains information about a data file. """ def __init__(self, filepath, langcode, content): self.filepath = filepath self.langcode = langcode self.content = content def __eq__(self, other): return self.filepath == other.filepath and self.langcode == other.langcode and self.content == other.content
[docs]def get_daf_data(): """Returns a list of dafData instances, where content is a string that corresponds to the text content of the file. The content is an empty string if the file cannot be read. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) delete_index() langcodes = argument_parser.get_languages() output = [] for path in get_all_files_from_data_folder(): langcode = utils.get_language_code_from_path(path, langcodes) if utils.is_valid_text_format(path): content = get_textfile_content(path).strip() else: content = '' output.append(DaFData(path, langcode, content)) return output
[docs]def add_language_codes(mapping): """Adds language codes to files. :param mapping: A dictionary from old file paths to language codes. """ if not _DATA['data_path']: raise InvalidProgramStateException(_("A study has not been chosen.")) utils.check_if_any_file_in_use(_DATA['data_path'], utils.SUPPORTED_FORMATS) delete_index() langcodes = argument_parser.get_languages() for filepath, langcode in mapping.items(): newpath = utils.add_language_code_to_path(filepath, langcode, langcodes) os.rename(filepath, newpath) temp_path = data_file_tempwriter.generate_temp_path(filepath, _DATA['study_path']) new_temp_path = data_file_tempwriter.generate_temp_path(newpath, _DATA['study_path']) if os.path.exists(temp_path): os.rename(temp_path, new_temp_path)
[docs]def get_commands(): """Gets commands that can be executed from the gui. :return: A list of commands. """ return _DATA['interface'].get_commands()
[docs]def delete_index(): """Deletes the generated html index.""" csv_path = os.path.join(_DATA['data_path'], _DATA['interface'].create_csv_name(_DATA['metadata'])) index_path = template_generator.get_index_path(_DATA['study_path']) html_path = template_generator.get_html_path(_DATA['study_path']) utils.delete_file_if_exists(csv_path) utils.delete_file_if_exists(index_path) utils.delete_folder_if_exists(html_path)