""" A module for parsing paragrahs out of data files."""
import logging
import os
from enum import Enum
from kvalikirstu2 import exceptions
from kvalikirstu2 import utils
from kvalikirstu2 import data_file_tempwriter
from kvalikirstu2 import argument_parser
from kvalikirstu2 import reader
from kvalikirstu2.localization import _
logger = logging.getLogger(__name__)
ParagraphParsingException = exceptions.ParagraphParsingException
[docs]class ParagraphType(Enum):
""" Paragraph type enum.
Values
------
DATA = A normal data paragraph containing text relevant to a certain subject.
HEADER = A header paragraph containing headers for a certain subject.
BEGINDATA = Indicates the start of a data block.
ENDDATA = Indicates the end of a data block.
UNKNOWN = Type hasn't been determined.
"""
DATA = 1
HEADER = 2
BEGINDATA = 3
ENDDATA = 4
UNKNOWN = 5
[docs]class Paragraph:
"""Contains a paragraph of text, lines of text not separated by empty lines
:var lines: A list of lines contained in the paragraph.
:type lines: list(str)
:var ParagraphType par_type: The type of the paragraph.
:var bool manual_type: True if the type of the paragraph has been manually specified using
BEGINDATA and ENDDATA tokens.
:var row_min: First row.
:var row_max: Last row.
:var temp_path: Temporary filepath.
:var args: The args of the application.
"""
def __init__(self, temp_path, par_type=ParagraphType.UNKNOWN):
self.lines = []
self.par_type = par_type
self.manual_type = False
self.args = argument_parser.get_args()
self.row_min = 0
self.row_max = 0
self.temp_path = temp_path
[docs] def is_empty(self):
"""Tests if the paragraph contains no information"""
return (self.par_type != ParagraphType.BEGINDATA and self.par_type != ParagraphType.ENDDATA and
not self.lines)
[docs] def write_to_stream(self, stream):
"""Writes the paragraph to a stream"""
if self.par_type == ParagraphType.BEGINDATA:
stream.write("%s%s" % (self.args.begindata, os.linesep))
elif self.par_type == ParagraphType.ENDDATA:
stream.write("%s%s" % (self.args.enddata, os.linesep))
else:
for line in self.lines:
stream.write("%s%s" % (line, os.linesep))
[docs] def resolve_type(self, selected_headers: dict = None):
"""Resolves if the paragraph is a header or not. Sets the is_header attribute to its correct value.
:param selected_headers: The selected headers dictionary.
"""
if not self.par_type == ParagraphType.UNKNOWN:
return
count = 0
for line in self.lines:
if utils.is_header_line(line, selected_headers):
count = count + 1
if count >= self.args.min_headers_paragraph:
self.par_type = ParagraphType.HEADER
else:
self.par_type = ParagraphType.DATA
[docs] def set_type(self, par_type):
"""Sets the paragraph type manually.
:param ParagraphType par_type: The type.
"""
self.par_type = par_type
self.manual_type = True
def __eq__(self, other):
"""Tests for equality.
:param Paragraph other: The right hand-side value in the equality operation.
"""
if isinstance(other, Paragraph):
return self.lines == other.lines and self.par_type == other.par_type
return False
[docs] def check_valid(self):
"""Checks if the paragraph is valid. A paragraph is invalid in the case that it is a header paragraph,
but contains non-header lines.
"""
if self.par_type == ParagraphType.DATA:
return
if self.par_type == ParagraphType.UNKNOWN:
raise ParagraphParsingException(self.temp_path, self.row_min, self.row_max,
_("Paragraph parsing error: Paragraph had an unknown type!"))
if (self.par_type == ParagraphType.BEGINDATA or self.par_type == ParagraphType.ENDDATA) and self.lines:
raise ParagraphParsingException(self.temp_path, self.row_min, self.row_max,
_("Paragraph parsing error: BEGINDATA/ENDDATA paragraph contained text!"))
for index, line in enumerate(self.lines):
if not utils.is_header_line(line, None):
raise ParagraphParsingException(self.temp_path, self.row_min + index, self.row_max,
_("Paragraph parsing: Error in file %s on row %s")
% (self.temp_path, line))
[docs] def resolve_row(self, running_row):
"""Resolves the span of rows that the paragraph appears in the file, given the starting row.
:param running_row: The current running index of the row.
:return: The new running row.
"""
line_count = 1
if self.lines:
line_count = len(self.lines)
self.row_min = running_row
self.row_max = running_row + line_count
return self.row_max + 1
[docs] def get_split_point(self, headers_selected):
"""The split point of a header paragraph.
:param headers_selected: A dictionary of which headers should be selected.
:return: The index that the paragraph should be split on.
"""
if self.par_type != ParagraphType.HEADER:
raise ParagraphParsingException(self.temp_path, self.row_min, self.row_max,
_("Paragraph parsing error: Tried to split non-header paragraph!"))
header_indexes = [index for index, line in enumerate(self.lines)
if utils.is_header_line(line, headers_selected)]
return max(header_indexes) + 1
[docs] def should_split(self, headers_selected):
"""Should split the paragraph?
:param headers_selected: A dictionary of which headers should be selected.
"""
if self.par_type != ParagraphType.HEADER:
return False
return self.get_split_point(headers_selected) < len(self.lines)
[docs] def split_paragraph(self, index):
""" Splits the paragraph at the index, returning the two resulting paragraphs.
:param index: Where the paragraph should be split at.
"""
par1 = Paragraph(self.temp_path)
par2 = Paragraph(self.temp_path)
par1.lines = self.lines[:index]
par1.set_type(ParagraphType.HEADER)
par2.lines = self.lines[index:]
par2.set_type(ParagraphType.DATA)
return par1, par2
[docs]class ParagraphReader:
"""Reads a file into paragraphs
:var reader: Responsible for reading the file line by line
:var str path: The file path
:var list(Paragraph) paragraphs: Paragraphs contained in the text
:var bool overwrite_temp: Overwrite temporary files.
:var selected_headers: The selected headers dictionary.
:var use_temp: Whether or not temp files should be used when reading.
:var study_path: The path of the study.
:var end_markers: Whether or not end markers should be inserted in the temp file.
:var split_headers: Should split the paragraph.
"""
# pylint: disable=R0913
def __init__(self, path: str, overwrite_temp: bool = True, selected_headers: dict = None, use_temp: bool = True,
study_path: str = None, end_markers=True, split_headers=True):
self.path = path
self.overwrite_temp = overwrite_temp
self.selected_headers = selected_headers
self.use_temp = use_temp
self.study_path = study_path
self.temp_path = data_file_tempwriter.generate_temp_path(self.path, self.study_path)
self.old_tempfile = (os.path.isfile(self.temp_path) and os.path.isfile(self.path) and
os.path.getmtime(self.temp_path) < os.path.getmtime(self.path))
self.paragraphs = []
self.end_markers = end_markers
self.split_headers = split_headers
self.args = argument_parser.get_args()
self.reader = self.get_reader()
# pylint: enable=R0913
[docs] def get_reader(self):
"""Gets the reader for a given file extension
:param str path: The file path of the file to be read.
:return: The reader to be used in the parsing process.
"""
# If temp file exists, use that instead
temp_path = data_file_tempwriter.generate_temp_path(self.path, self.study_path)
if (self.use_temp and not self.overwrite_temp and os.path.isfile(temp_path) and
not self.old_tempfile):
return reader.get_reader(temp_path)
return reader.get_reader(self.path)
[docs] def try_add_paragraph_to_list(self, par):
"""Add paragraph if not empty.
:param Paragraph par: Paragraph to be added.
"""
if not par.is_empty():
self.paragraphs.append(par)
def _split_header_paragraphs(self):
"""Splits header paragraphs if necessary."""
cp_paragraphs = self.paragraphs.copy()
cp_index = 0
for par in self.paragraphs:
if par.should_split(self.selected_headers):
split_index = par.get_split_point(self.selected_headers)
par1, par2 = par.split_paragraph(split_index)
del cp_paragraphs[cp_index]
cp_paragraphs.insert(cp_index, par2)
cp_paragraphs.insert(cp_index, par1)
cp_index += 1
cp_index += 1
self.paragraphs = cp_paragraphs
def _resolve_paragraph_rows(self):
"""Resolves what rows the paragraphs reside in."""
running_row = 0
# Resolve paragraph type and rows for all paragraphs
for par in self.paragraphs:
running_row = par.resolve_row(running_row)
def _resolve_paragraph_types(self):
"""Resolve paragraph types and row numbers for all paragraphs."""
in_data = False
for par in self.paragraphs:
if par.par_type == ParagraphType.BEGINDATA:
in_data = True
elif in_data and par.par_type == ParagraphType.ENDDATA:
in_data = False
elif in_data:
par.set_type(ParagraphType.DATA)
par.resolve_type(self.selected_headers)
[docs] def read_paragraphs(self):
"""Reads paragraphs from a text file
:rtype: list(Paragraph)
:return: List of paragraphs in the text.
"""
current = self._create_paragraph()
# Read all lines into paragraphs
while self.reader.can_read():
line = self.reader.read_line()
if not line or line.isspace():
self.try_add_paragraph_to_list(current)
current = self._create_paragraph()
elif line == self.args.begindata:
self.try_add_paragraph_to_list(current)
self.try_add_paragraph_to_list(self._create_paragraph(ParagraphType.BEGINDATA))
current = self._create_paragraph()
elif line == self.args.enddata:
self.try_add_paragraph_to_list(current)
self.try_add_paragraph_to_list(self._create_paragraph(ParagraphType.ENDDATA))
current = self._create_paragraph()
else:
current.lines.append(line)
self.try_add_paragraph_to_list(current)
self._resolve_paragraph_types()
if self.split_headers:
self._split_header_paragraphs()
self._resolve_paragraph_rows()
if self.end_markers:
self._add_end_markers()
if self.use_temp:
# Write a temp file that the user can edit if one does not exist
temp_path = data_file_tempwriter.generate_temp_path(self.path, self.study_path)
if self.overwrite_temp or not os.path.isfile(temp_path) or self.old_tempfile:
self._write_paragraphs_to_temp(self.paragraphs)
return self.paragraphs
def _create_paragraph(self, par_type=ParagraphType.UNKNOWN):
""" Creates a new paragraph.
:param par_type: Paragraph type to be created.
"""
return Paragraph(self.temp_path, par_type)
def _add_end_markers(self):
"""Adds end markers between headers if they don't already exist."""
# Add ENDDATA marker to the end of the file
if self.paragraphs and self.paragraphs[-1].par_type != ParagraphType.ENDDATA:
self.paragraphs.append(self._create_paragraph(ParagraphType.ENDDATA))
for index in range(len(self.paragraphs)-1, -1, -1):
if self.paragraphs[index].par_type == ParagraphType.HEADER:
# Add a BEGINDATA after each header
if index < len(self.paragraphs) - 1:
successor = self.paragraphs[index+1]
if successor.par_type != ParagraphType.BEGINDATA:
self.paragraphs.insert(index+1, self._create_paragraph(ParagraphType.BEGINDATA))
# And an ENDDATA before each header(except if at the start of the file)
if index > 0:
predecessor = self.paragraphs[index-1]
if predecessor.par_type != ParagraphType.ENDDATA:
self.paragraphs.insert(index, self._create_paragraph(ParagraphType.ENDDATA))
def _write_paragraphs_to_temp(self, paragraphs):
"""Writes all subjects to a temp file
:param list paragraphs: All paragraphs from file
"""
data_file_tempwriter.write_to_file(self.temp_path, paragraphs)
[docs] def read_subjects(self):
"""Returns all the subjects from the given file
:rtype: list(Subject)
:return: List of subjects from a given file
"""
# Read paragraphs if not read already
if not self.paragraphs:
self.read_paragraphs()
current_subject = None # Current subject is initially invalid, no headers have been set
subjects = []
for paragraph in self.paragraphs:
# In the case of a new header, start the data for a new subject
if paragraph.is_header():
if current_subject: # If valid, save the previous subject
subjects.append(current_subject)
current_subject = Subject(self.path, self.temp_path) # initialize new subject
current_subject.set_headers(paragraph)
elif current_subject:
# If not at the start of the file, add the paragraph data to the subject.
current_subject.add_data(paragraph)
# If last subject is valid, add it
if current_subject and current_subject.has_headers():
subjects.append(current_subject)
return subjects
[docs]class Subject:
"""A subject and its relevant data in the study.
:var dict headers: A dictionary of (key, value) pairs, often containing metadata about the subject.
:var list(Paragraph) paragraphs: Contains all the text paragraphs relevant to this subject.
:var dict(str, int) header_indexes: A dictionary mapping header names to their index.
:var str data_file: The path to the relevant data file for the subject.
:var temp_path: The filepath of the temp file.
"""
def __init__(self, data_file: str, temp_path):
self.headers = {}
self.header_indexes = {}
self.paragraphs = []
self.data_file = data_file
self.temp_path = temp_path
[docs] def get_language_code(self, language_codes):
"""Gets the language code for the given file.
:param language_codes: Set of possible language codes.
"""
return utils.get_language_code_from_path(self.data_file, language_codes)
def __eq__(self, other):
"""Tests for equality.
:param Subject other: The object that this object is compared against.
:rtype: bool
:return: True if objects are equal, False otherwise.
"""
if isinstance(other, Subject):
return self.headers == other.headers and self.paragraphs == other.paragraphs
return False
[docs] def resolve_daf(self, header_info):
"""Resolve the data file for the subject, accounting for data files specified in the header."""
daf_header = header_info.daf_header
if daf_header in self.headers:
daf_file = os.path.join(os.path.dirname(self.data_file), self.headers[daf_header])
logger.info('Resolving daf out of header %s and file path %s: %s', self.headers[daf_header],
daf_file, self.data_file)
self.data_file = daf_file
[docs] def add_data(self, paragraph):
""" Add a text paragraph for the subject.
:param Paragraph paragraph: A paragraph containing some text.
"""
assert not paragraph.is_header()
self.paragraphs.append(paragraph)
[docs] def remap_values(self, value_mapping: dict):
"""Remaps values to new values.
:param dict value_mapping: A value mapping dictionary from (key, value) to value.
"""
for key, value in list(self.headers.items()):
pair_str = str((key, value))
if pair_str in value_mapping:
self.headers[key] = value_mapping[pair_str]
[docs] def remap(self, header_info):
"""Remaps the headers to their corresponding names.
Iterates through all the subjects headers and maps them to their new values. In some cases there are multiple
headers in a study that have the same meaning, but are just different ways of phrasing it. In cases like this
they should be all be remapped into the same name. In some cases you may also want to discard some attributes
from the index.
If a header is in the dictionary, it means that it should be remapped to a possibly different name.
:param HeaderInfo header_info: Contains information about the different headers in the study.
"""
headers_copy = list(self.headers.items())
self.headers.clear()
for (key, value) in headers_copy:
if key in header_info.header_mapping:
remapped = header_info.header_mapping[key]
if remapped != key:
logger.debug('Remapped header %s to %s', key, remapped)
self.headers[remapped] = value
self.header_indexes[remapped] = self.header_indexes[key]
self.resolve_daf(header_info)
[docs] def check_validity(self):
"""Checks that the subject is valid. Does not return anything, just raises an exception if the subject
is invalid.
"""
for par in self.paragraphs:
if not par.manual_type:
par.check_valid()