Source code for kvalikirstu2.writer

""" A module for writing odt and txt files.
"""
import logging
import os
from odf.opendocument import load, OpenDocumentText
from odf.text import P
from kvalikirstu2 import argument_parser
from kvalikirstu2 import utils
from kvalikirstu2 import odt_reader
from kvalikirstu2 import reader
from kvalikirstu2 import paragraph_analyzer


logger = logging.getLogger(__name__)


[docs]def write_empty_odt(path: str): """Writes an empty .odt file :param str path: The path for the empty .odt file """ logger.info('Writing empty odt to %s', path) doc = OpenDocumentText() doc.save(path)
[docs]def write_txt(path: str, content="", encoding=None): """Writes an empty .txt file :param str path: The path for the .txt file :param str content: The content to write into the file. :param str encoding: The encoding of the output file. """ if not encoding: args = argument_parser.get_args() encoding = args.encoding logger.info('Writing text to path %s with encoding %s', path, encoding) # The newline argument disables new line translation from occurring to prevent any extra lines # When writing a file, python converts any instance of \r to \r\n. # Therefore a Windows newline gets converted from \r\n to \r\n\n without this argument. with open(path, mode="w", encoding=encoding, newline='') as file_handle: file_handle.write(content)
def _add_text_to_txtfile(path: str, output_path: str, added_line: str): """Adds line to the start of a text file and then writes it to a new file :param str path: The path of the original file. :param str output_path: The path where the file should be written to. :param str added_line: The line to be added. """ args = argument_parser.get_args() logger.info('Adding line %s to .txt file %s, output to %s', added_line, path, output_path) file_reader = reader.get_reader(path) with open(output_path, mode="w", encoding=args.encoding, newline='') as file_write: file_write.write(added_line + os.linesep + os.linesep) while file_reader.can_read(): line = file_reader.read_line() file_write.write(line + os.linesep) def _add_text_to_odtfile(path: str, output_path: str, lines: list): """Adds text to an .odt file :param str path: The input path. :param str output_path: The output path of the file. :param str added_line: The line to be added to the start of the file. """ logger.info('Adding lines %s to odt file %s, output %s', lines, path, output_path) doc = load(path) nodes = odt_reader.get_text_nodes_from_odt(doc) if nodes: node1_text = odt_reader.get_text_from_node(nodes[0]) first_node = P(text=node1_text) # Deleting and inserting the first node gets rid off all style information in it # This is done to prevent extra page breaks being added after the added text doc.text.childNodes.insert(0, first_node) doc.text.childNodes.remove(nodes[0]) for line in lines: paragraph_element = P(text=line) doc.text.insertBefore(paragraph_element, first_node) doc.text.insertBefore(P(), first_node) else: for line in lines: paragraph_element = P(text=line) doc.text.childNodes.append(paragraph_element) doc.save(output_path) def _should_write_line(path: str, added_line: str): """Checks if the line should be written to the start of the file, determined by whether or not there is an identical first line in the file. :param str path: The path of the original file :param str added_line: The line to be added to the start of the file :return: Boolean that determines if the line should be written. """ file_reader = reader.get_reader(path) if not file_reader: return False return added_line != file_reader.read_line()
[docs]def add_text_to_file(path: str, output_path: str, text: str): """Adds text to the start of the file, and saves it to a new path. Note: You can also ovewrite the original by setting output_path same as path. :param str path: The original filepath. :param str output_path: The output filepath. :param text: The text to be added. """ logger.info('Adding %s line to file %s, output %s', text, path, output_path) _, extension = os.path.splitext(path) temp_path = utils.get_temp_path(os.path.dirname(path), extension) if path.endswith(".txt"): _add_text_to_txtfile(path, temp_path, text) elif path.endswith(".odt"): _add_text_to_odtfile(path, temp_path, text.split('\n')) if os.path.exists(output_path): os.unlink(output_path) os.rename(temp_path, output_path)
[docs]def add_text_to_folder(path: str, text: str, check_for_duplicate=False): """Adds text to all .odt and .txt files in the folder :param str path: The path of the folder. :param str text: The text to be added. :param check_for_duplicate: Whether or not to check for duplicate text at the start of the file. """ utils.check_if_any_file_in_use(path, utils.SUPPORTED_FORMATS) logger.info('Adding text %s to folder %s', text, path) files = utils.get_supported_files(path) for filepath in files: if not check_for_duplicate or _should_write_line(filepath, text): add_text_to_file(filepath, filepath, text)
def _remap_header_line(line, header_mapping: dict, header_value_mapping: dict): """ Remap headers to new names. :param line: The line to be converted. :param header_mapping: A dictionary of old headers to new headers. :param header_value_mapping: A dictionary for remapping old values to new values. """ if utils.is_header_line(line): header, value = utils.parse_header(line) pair = str((header, value)) if header in header_mapping: remapped = header_mapping[header] else: remapped = header if header_value_mapping and pair in header_value_mapping: new_value = header_value_mapping[pair] else: new_value = value logger.info('Remapped header line (%s, %s) to (%s, %s)', header, value, remapped, new_value) return utils.get_formatted_header_line(remapped, new_value) return line def _write_headers_to_stream(paragraph: paragraph_analyzer.Paragraph, header_mapping: dict, header_value_mapping: dict, stream): """ Write headers to stream. :param paragraph: The header paragraph to be written. :param header_mapping: A mapping of header names. :param header_value_mapping. A mapping of header values. """ for line in paragraph.lines: remapped_line = _remap_header_line(line, header_mapping, header_value_mapping) stream.write(remapped_line + os.linesep) # pylint: disable=R0913 def _rename_headers_in_txtfile(path: str, output_path: str, selected_headers: dict, header_mapping: dict, header_value_mapping: dict, encoding=None): """Rename headers and write them to the file. :param path: The filepath to be read. :param output_path: The output filepath. :param selected_headers: A dictionary from string to bool, determining which headers are real. :param header_mapping: A dictionary for remapping headers to their new names. :param header_value_mapping: A dictionary mapping old values to new ones. :param encoding: The encoding to be used. If left empty uses the default encoding determined in the config. """ logger.info('Renaming headers in file %s, output path %s', path, output_path) if not encoding: args = argument_parser.get_args() encoding = args.encoding par_reader = paragraph_analyzer.ParagraphReader(path, overwrite_temp=False, use_temp=False, selected_headers=selected_headers, end_markers=False, split_headers=False) paragraphs = par_reader.read_paragraphs() with open(output_path, mode='w', encoding=encoding, newline='') as stream: for paragraph in paragraphs: if not paragraph.is_header(): paragraph.write_to_stream(stream) else: _write_headers_to_stream(paragraph, header_mapping, header_value_mapping, stream) stream.write(os.linesep) # pylint: enable=R0913 def _change_header(line: P, str_line, header_mapping, header_value_mapping: dict): """ Change header to remapped value. :param line: A "paragraph" from the opendocument :param str_line: The value of the paragraph converted to string. :param header_mapping: A dictionary mapping header names to their new names. :param header_value_mapping: A dictionary for remapping old values to new values. """ remapped_line = _remap_header_line(str_line, header_mapping, header_value_mapping) new_header = P() new_header.setAttribute("stylename", line.getAttribute("stylename")) new_header.addText(remapped_line) line.parentNode.insertBefore(new_header, line) line.parentNode.removeChild(line) def _rename_headers_in_odtfile(path: str, output_path: str, selected_headers: dict, header_mapping: dict, header_value_mapping: dict): """Rename headers and write them to the file. :param path: The filepath to be rewritten. :param selected_headers: A dictionary from string to bool, determining which headers are real. :param header_mapping: A dictionary for remapping headers to their new names. :param header_value_mapping: A dictionary for remapping old values to new values. """ logger.info('Renaming headers in file %s, output %s', path, output_path) par_reader = paragraph_analyzer.ParagraphReader(path, overwrite_temp=False, use_temp=False, selected_headers=selected_headers, end_markers=False, split_headers=False) paragraphs = par_reader.read_paragraphs() document = load(path) lines = odt_reader.get_text_nodes_from_odt(document) paragraph_index = 0 in_par = False in_header = False for line in lines: str_line = odt_reader.get_text_from_node(line) empty_line = (not str_line or str_line.isspace()) if not in_par and not empty_line: in_par = True in_header = paragraphs[paragraph_index].is_header() elif in_par and empty_line: in_par = False in_header = False paragraph_index += 1 if str_line and in_header: _change_header(line, str_line, header_mapping, header_value_mapping) document.write(output_path)
[docs]def rename_headers_in_folder(path: str, header_info, encoding=None): """ Rename headers in folder path. :param path: The path of the folder. :param header_info: Header info for the study. :param encoding: Encoding to be used in the data files. """ utils.check_if_any_file_in_use(path, utils.SUPPORTED_FORMATS) logger.debug('Rename headers in folder %s', path) selected_headers = header_info.selected_headers header_mapping = header_info.header_mapping header_value_mapping = header_info.value_mapping for filepath in utils.natsorted_glob(os.path.join(path, '**')): if filepath.endswith('.txt'): _rename_headers_in_txtfile(filepath, filepath, selected_headers, header_mapping, header_value_mapping, encoding) elif filepath.endswith('.odt'): _rename_headers_in_odtfile(filepath, filepath, selected_headers, header_mapping, header_value_mapping)
def _replace_in_txt_file(path, output_path, original_text, new_text, encoding): """Replaces text in a .txt file. :param path: Filepath. :param output_path: Output filepath. :param original_text: Original text to be replaced. :param new_text: What the original text will be replaced with. """ replaced_count = 0 content = '' with open(path, encoding=encoding) as file_handle: for line in file_handle: if original_text in line: replaced_count += 1 line = line.replace(original_text, new_text) content += line with open(output_path, encoding=encoding, mode='w') as file_handle: file_handle.write(content) return replaced_count def _replace_in_odt_file(path, output_path, original_text, new_text): """Replaces text in an .odt file. """ replaced_count = 0 document = load(path) lines = odt_reader.get_text_nodes_from_odt(document) for line in lines: for node in line.childNodes: if node.tagName == 'Text': if original_text in node.data: node.data = node.data.replace(original_text, new_text) replaced_count += 1 document.save(output_path) return replaced_count
[docs]def replace_in_folder(path, original_text, new_text, encoding=None): """Replace text in a folder. :param path: The path of the folder. :param original_text: The original text. :param new_text: The new text. :param encoding: The encoding used. """ utils.check_if_any_file_in_use(path, utils.SUPPORTED_FORMATS) replaced = {} if not encoding: args = argument_parser.get_args() encoding = args.encoding for filepath in utils.natsorted_glob(os.path.join(path, '**')): replaced[filepath] = 0 if filepath.endswith('.txt'): replaced[filepath] += _replace_in_txt_file(filepath, filepath, original_text, new_text, encoding) elif filepath.endswith('.odt'): replaced[filepath] += _replace_in_odt_file(filepath, filepath, original_text, new_text) return replaced