Source code for kvalikirstu2.paragraph_analyzer

""" A module for parsing paragrahs out of data files."""
import logging
import os
from enum import Enum
from kvalikirstu2 import exceptions
from kvalikirstu2 import utils
from kvalikirstu2 import data_file_tempwriter
from kvalikirstu2 import argument_parser
from kvalikirstu2 import reader
from kvalikirstu2.localization import _


logger = logging.getLogger(__name__)
ParagraphParsingException = exceptions.ParagraphParsingException


[docs]class ParagraphType(Enum): """ Paragraph type enum. Values ------ DATA = A normal data paragraph containing text relevant to a certain subject. HEADER = A header paragraph containing headers for a certain subject. BEGINDATA = Indicates the start of a data block. ENDDATA = Indicates the end of a data block. UNKNOWN = Type hasn't been determined. """ DATA = 1 HEADER = 2 BEGINDATA = 3 ENDDATA = 4 UNKNOWN = 5
[docs]class Paragraph: """Contains a paragraph of text, lines of text not separated by empty lines :var lines: A list of lines contained in the paragraph. :type lines: list(str) :var ParagraphType par_type: The type of the paragraph. :var bool manual_type: True if the type of the paragraph has been manually specified using BEGINDATA and ENDDATA tokens. :var row_min: First row. :var row_max: Last row. :var temp_path: Temporary filepath. :var args: The args of the application. """ def __init__(self, temp_path, par_type=ParagraphType.UNKNOWN): self.lines = [] self.par_type = par_type self.manual_type = False self.args = argument_parser.get_args() self.row_min = 0 self.row_max = 0 self.temp_path = temp_path
[docs] def is_empty(self): """Tests if the paragraph contains no information""" return (self.par_type != ParagraphType.BEGINDATA and self.par_type != ParagraphType.ENDDATA and not self.lines)
[docs] def is_header(self): """ Is the paragraph a header paragraph? """ return self.par_type == ParagraphType.HEADER
[docs] def write_to_stream(self, stream): """Writes the paragraph to a stream""" if self.par_type == ParagraphType.BEGINDATA: stream.write("%s%s" % (self.args.begindata, os.linesep)) elif self.par_type == ParagraphType.ENDDATA: stream.write("%s%s" % (self.args.enddata, os.linesep)) else: for line in self.lines: stream.write("%s%s" % (line, os.linesep))
[docs] def resolve_type(self, selected_headers: dict = None): """Resolves if the paragraph is a header or not. Sets the is_header attribute to its correct value. :param selected_headers: The selected headers dictionary. """ if not self.par_type == ParagraphType.UNKNOWN: return count = 0 for line in self.lines: if utils.is_header_line(line, selected_headers): count = count + 1 if count >= self.args.min_headers_paragraph: self.par_type = ParagraphType.HEADER else: self.par_type = ParagraphType.DATA
[docs] def set_type(self, par_type): """Sets the paragraph type manually. :param ParagraphType par_type: The type. """ self.par_type = par_type self.manual_type = True
def __eq__(self, other): """Tests for equality. :param Paragraph other: The right hand-side value in the equality operation. """ if isinstance(other, Paragraph): return self.lines == other.lines and self.par_type == other.par_type return False
[docs] def check_valid(self): """Checks if the paragraph is valid. A paragraph is invalid in the case that it is a header paragraph, but contains non-header lines. """ if self.par_type == ParagraphType.DATA: return if self.par_type == ParagraphType.UNKNOWN: raise ParagraphParsingException(self.temp_path, self.row_min, self.row_max, _("Paragraph parsing error: Paragraph had an unknown type!")) if (self.par_type == ParagraphType.BEGINDATA or self.par_type == ParagraphType.ENDDATA) and self.lines: raise ParagraphParsingException(self.temp_path, self.row_min, self.row_max, _("Paragraph parsing error: BEGINDATA/ENDDATA paragraph contained text!")) for index, line in enumerate(self.lines): if not utils.is_header_line(line, None): raise ParagraphParsingException(self.temp_path, self.row_min + index, self.row_max, _("Paragraph parsing: Error in file %s on row %s") % (self.temp_path, line))
[docs] def resolve_row(self, running_row): """Resolves the span of rows that the paragraph appears in the file, given the starting row. :param running_row: The current running index of the row. :return: The new running row. """ line_count = 1 if self.lines: line_count = len(self.lines) self.row_min = running_row self.row_max = running_row + line_count return self.row_max + 1
[docs] def get_split_point(self, headers_selected): """The split point of a header paragraph. :param headers_selected: A dictionary of which headers should be selected. :return: The index that the paragraph should be split on. """ if self.par_type != ParagraphType.HEADER: raise ParagraphParsingException(self.temp_path, self.row_min, self.row_max, _("Paragraph parsing error: Tried to split non-header paragraph!")) header_indexes = [index for index, line in enumerate(self.lines) if utils.is_header_line(line, headers_selected)] return max(header_indexes) + 1
[docs] def should_split(self, headers_selected): """Should split the paragraph? :param headers_selected: A dictionary of which headers should be selected. """ if self.par_type != ParagraphType.HEADER: return False return self.get_split_point(headers_selected) < len(self.lines)
[docs] def split_paragraph(self, index): """ Splits the paragraph at the index, returning the two resulting paragraphs. :param index: Where the paragraph should be split at. """ par1 = Paragraph(self.temp_path) par2 = Paragraph(self.temp_path) par1.lines = self.lines[:index] par1.set_type(ParagraphType.HEADER) par2.lines = self.lines[index:] par2.set_type(ParagraphType.DATA) return par1, par2
[docs]class ParagraphReader: """Reads a file into paragraphs :var reader: Responsible for reading the file line by line :var str path: The file path :var list(Paragraph) paragraphs: Paragraphs contained in the text :var bool overwrite_temp: Overwrite temporary files. :var selected_headers: The selected headers dictionary. :var use_temp: Whether or not temp files should be used when reading. :var study_path: The path of the study. :var end_markers: Whether or not end markers should be inserted in the temp file. :var split_headers: Should split the paragraph. """ # pylint: disable=R0913 def __init__(self, path: str, overwrite_temp: bool = True, selected_headers: dict = None, use_temp: bool = True, study_path: str = None, end_markers=True, split_headers=True): self.path = path self.overwrite_temp = overwrite_temp self.selected_headers = selected_headers self.use_temp = use_temp self.study_path = study_path self.temp_path = data_file_tempwriter.generate_temp_path(self.path, self.study_path) self.old_tempfile = (os.path.isfile(self.temp_path) and os.path.isfile(self.path) and os.path.getmtime(self.temp_path) < os.path.getmtime(self.path)) self.paragraphs = [] self.end_markers = end_markers self.split_headers = split_headers self.args = argument_parser.get_args() self.reader = self.get_reader() # pylint: enable=R0913
[docs] def get_reader(self): """Gets the reader for a given file extension :param str path: The file path of the file to be read. :return: The reader to be used in the parsing process. """ # If temp file exists, use that instead temp_path = data_file_tempwriter.generate_temp_path(self.path, self.study_path) if (self.use_temp and not self.overwrite_temp and os.path.isfile(temp_path) and not self.old_tempfile): return reader.get_reader(temp_path) return reader.get_reader(self.path)
[docs] def try_add_paragraph_to_list(self, par): """Add paragraph if not empty. :param Paragraph par: Paragraph to be added. """ if not par.is_empty(): self.paragraphs.append(par)
def _split_header_paragraphs(self): """Splits header paragraphs if necessary.""" cp_paragraphs = self.paragraphs.copy() cp_index = 0 for par in self.paragraphs: if par.should_split(self.selected_headers): split_index = par.get_split_point(self.selected_headers) par1, par2 = par.split_paragraph(split_index) del cp_paragraphs[cp_index] cp_paragraphs.insert(cp_index, par2) cp_paragraphs.insert(cp_index, par1) cp_index += 1 cp_index += 1 self.paragraphs = cp_paragraphs def _resolve_paragraph_rows(self): """Resolves what rows the paragraphs reside in.""" running_row = 0 # Resolve paragraph type and rows for all paragraphs for par in self.paragraphs: running_row = par.resolve_row(running_row) def _resolve_paragraph_types(self): """Resolve paragraph types and row numbers for all paragraphs.""" in_data = False for par in self.paragraphs: if par.par_type == ParagraphType.BEGINDATA: in_data = True elif in_data and par.par_type == ParagraphType.ENDDATA: in_data = False elif in_data: par.set_type(ParagraphType.DATA) par.resolve_type(self.selected_headers)
[docs] def read_paragraphs(self): """Reads paragraphs from a text file :rtype: list(Paragraph) :return: List of paragraphs in the text. """ current = self._create_paragraph() # Read all lines into paragraphs while self.reader.can_read(): line = self.reader.read_line() if not line or line.isspace(): self.try_add_paragraph_to_list(current) current = self._create_paragraph() elif line == self.args.begindata: self.try_add_paragraph_to_list(current) self.try_add_paragraph_to_list(self._create_paragraph(ParagraphType.BEGINDATA)) current = self._create_paragraph() elif line == self.args.enddata: self.try_add_paragraph_to_list(current) self.try_add_paragraph_to_list(self._create_paragraph(ParagraphType.ENDDATA)) current = self._create_paragraph() else: current.lines.append(line) self.try_add_paragraph_to_list(current) self._resolve_paragraph_types() if self.split_headers: self._split_header_paragraphs() self._resolve_paragraph_rows() if self.end_markers: self._add_end_markers() if self.use_temp: # Write a temp file that the user can edit if one does not exist temp_path = data_file_tempwriter.generate_temp_path(self.path, self.study_path) if self.overwrite_temp or not os.path.isfile(temp_path) or self.old_tempfile: self._write_paragraphs_to_temp(self.paragraphs) return self.paragraphs
def _create_paragraph(self, par_type=ParagraphType.UNKNOWN): """ Creates a new paragraph. :param par_type: Paragraph type to be created. """ return Paragraph(self.temp_path, par_type) def _add_end_markers(self): """Adds end markers between headers if they don't already exist.""" # Add ENDDATA marker to the end of the file if self.paragraphs and self.paragraphs[-1].par_type != ParagraphType.ENDDATA: self.paragraphs.append(self._create_paragraph(ParagraphType.ENDDATA)) for index in range(len(self.paragraphs)-1, -1, -1): if self.paragraphs[index].par_type == ParagraphType.HEADER: # Add a BEGINDATA after each header if index < len(self.paragraphs) - 1: successor = self.paragraphs[index+1] if successor.par_type != ParagraphType.BEGINDATA: self.paragraphs.insert(index+1, self._create_paragraph(ParagraphType.BEGINDATA)) # And an ENDDATA before each header(except if at the start of the file) if index > 0: predecessor = self.paragraphs[index-1] if predecessor.par_type != ParagraphType.ENDDATA: self.paragraphs.insert(index, self._create_paragraph(ParagraphType.ENDDATA)) def _write_paragraphs_to_temp(self, paragraphs): """Writes all subjects to a temp file :param list paragraphs: All paragraphs from file """ data_file_tempwriter.write_to_file(self.temp_path, paragraphs)
[docs] def read_subjects(self): """Returns all the subjects from the given file :rtype: list(Subject) :return: List of subjects from a given file """ # Read paragraphs if not read already if not self.paragraphs: self.read_paragraphs() current_subject = None # Current subject is initially invalid, no headers have been set subjects = [] for paragraph in self.paragraphs: # In the case of a new header, start the data for a new subject if paragraph.is_header(): if current_subject: # If valid, save the previous subject subjects.append(current_subject) current_subject = Subject(self.path, self.temp_path) # initialize new subject current_subject.set_headers(paragraph) elif current_subject: # If not at the start of the file, add the paragraph data to the subject. current_subject.add_data(paragraph) # If last subject is valid, add it if current_subject and current_subject.has_headers(): subjects.append(current_subject) return subjects
[docs]class Subject: """A subject and its relevant data in the study. :var dict headers: A dictionary of (key, value) pairs, often containing metadata about the subject. :var list(Paragraph) paragraphs: Contains all the text paragraphs relevant to this subject. :var dict(str, int) header_indexes: A dictionary mapping header names to their index. :var str data_file: The path to the relevant data file for the subject. :var temp_path: The filepath of the temp file. """ def __init__(self, data_file: str, temp_path): self.headers = {} self.header_indexes = {} self.paragraphs = [] self.data_file = data_file self.temp_path = temp_path
[docs] def get_language_code(self, language_codes): """Gets the language code for the given file. :param language_codes: Set of possible language codes. """ return utils.get_language_code_from_path(self.data_file, language_codes)
[docs] def get_header_value(self, header): """Gets the value of a header field. :param str header: The header which value is to be retrieved. """ if header in self.headers: return self.headers[header] return ""
def __eq__(self, other): """Tests for equality. :param Subject other: The object that this object is compared against. :rtype: bool :return: True if objects are equal, False otherwise. """ if isinstance(other, Subject): return self.headers == other.headers and self.paragraphs == other.paragraphs return False
[docs] def resolve_daf(self, header_info): """Resolve the data file for the subject, accounting for data files specified in the header.""" daf_header = header_info.daf_header if daf_header in self.headers: daf_file = os.path.join(os.path.dirname(self.data_file), self.headers[daf_header]) logger.info('Resolving daf out of header %s and file path %s: %s', self.headers[daf_header], daf_file, self.data_file) self.data_file = daf_file
[docs] def set_headers(self, paragraph): """Adds a header paragraph to the file. :param Paragraph paragraph: A header paragraph that will be added to this subject. """ if not paragraph.is_header(): raise ParagraphParsingException(self.temp_path, paragraph.row_min, paragraph.row_max, _("Tried to set headers for a non-header paragraph!")) self.paragraphs.append(paragraph) for index, header in enumerate(paragraph.lines): if not utils.is_header_line(header, None): raise ParagraphParsingException(self.temp_path, paragraph.row_min + index, paragraph.row_max, _("\nFile %s had an incorrect header in section: \"%s\"." "\nThis usually means that either:\na) The paragraph has been" " incorrectly marked as a header paragraph\nb) There is no line" " break after the paragraph.") % (self.data_file, header)) key, value = utils.parse_header(header) if key in self.headers: raise ParagraphParsingException(self.temp_path, paragraph.row_min + index, paragraph.row_max, _("\nFile %s had a duplicate header in section: \"%s\"." ) % (self.data_file, header)) self.headers[key] = value self.header_indexes[key] = index
[docs] def add_data(self, paragraph): """ Add a text paragraph for the subject. :param Paragraph paragraph: A paragraph containing some text. """ assert not paragraph.is_header() self.paragraphs.append(paragraph)
[docs] def has_headers(self): """Does subject have headers read? :rtype: bool :return: True if subject has headers, False otherwise. """ for par in self.paragraphs: if par.is_header(): return True return False
[docs] def remap_values(self, value_mapping: dict): """Remaps values to new values. :param dict value_mapping: A value mapping dictionary from (key, value) to value. """ for key, value in list(self.headers.items()): pair_str = str((key, value)) if pair_str in value_mapping: self.headers[key] = value_mapping[pair_str]
[docs] def remap(self, header_info): """Remaps the headers to their corresponding names. Iterates through all the subjects headers and maps them to their new values. In some cases there are multiple headers in a study that have the same meaning, but are just different ways of phrasing it. In cases like this they should be all be remapped into the same name. In some cases you may also want to discard some attributes from the index. If a header is in the dictionary, it means that it should be remapped to a possibly different name. :param HeaderInfo header_info: Contains information about the different headers in the study. """ headers_copy = list(self.headers.items()) self.headers.clear() for (key, value) in headers_copy: if key in header_info.header_mapping: remapped = header_info.header_mapping[key] if remapped != key: logger.debug('Remapped header %s to %s', key, remapped) self.headers[remapped] = value self.header_indexes[remapped] = self.header_indexes[key] self.resolve_daf(header_info)
[docs] def check_validity(self): """Checks that the subject is valid. Does not return anything, just raises an exception if the subject is invalid. """ for par in self.paragraphs: if not par.manual_type: par.check_valid()