cleaner.py 9.91 KB
"""Read Markdown file converted from ODT and fix as many errors as possible.
This uses heuristics to fix markdown, punctuation and even common misspellings.
"""
import re

LOWER = 'a-ząćęłńóśźż'
UPPER = 'A-ZĄĆĘŁŃÓŚŹŻ'
NAME = f'[{UPPER}](?:[{LOWER}]+\\.?|\\.)'
SURNAME = f'[{UPPER}][{LOWER}]+(?:-[{UPPER}][{LOWER}]+)?'
LETTER = 'A-Za-zĄĆĘŁŃÓŚŹŻąćęłńóśźż'
PUNCT = ',;.?!:…'
UNIQUE = '~&^'


class Cleaner:
    'The class to clean Markdown'

    def __init__(self, verbose=''):
        self.verbose = verbose
        self.cache = []

    def clean(self, filename, output):
        'Apply all filters'
        pars = self._read(filename)
        filters = {
            'lines': self._merge_lines,
            'odt': self._clear_odt,
            'asterisk': self._fix_asterisk,
            'spaceout': self._fix_spaceout,
            'whitespace': self._fix_whitespace,
            'punctuation': self._fix_punctuation,
            'paragraphs': self._merge_pars,
            'nonspeakers': self._remove_nonspeakers,
            'speakers': self._add_speakers,
            'comments': self._fix_comments,
        }
        for category, function in filters.items():
            pars = self._filter(category, function, pars)
        self.save(pars, output)

    @staticmethod
    def _read(filename):
        'Read data from file. Merge lines into paragraphs using empty line as separator'
        return [line.strip() for line in open(filename)]

    def _filter(self, category, function, pars):
        '''Apply a single filter and return modified list of paragraphs.
        For each paragraph invoked function can return a paragraph or a list of them.'''
        fixed = []
        self.cache = []
        for par in pars:
            result = function(par)
            if isinstance(result, list):
                self._debug(category, [par] + result)
                fixed.extend(result)
            elif isinstance(result, str):
                if result != par:
                    self._debug(category, [par, result])
                fixed.append(result)
        if self.cache:
            fixed.append(self._cache())
        return fixed

    def _merge_lines(self, line):
        'Merge lines into paragraphs'
        if not line:
            return self._cache()
        elif self.cache and re.match(r'[a-np-ząćęłóńśźż]-', self.cache[-1][-2:]):
            # and previous[-1].rindex(' ') > len(previous[-1]) - 5:
            self.cache[-1] = f'{self.cache[-1][:-1]}{line}'
        else:
            self.cache.append(line)
        return None

    @staticmethod
    def _clear_odt(par):
        'Remove leftovers from OpenOffice: bookmarks, footnotes, hyphenation, backslashes'
        removed = ['[]', '\\*', '\\', '{.Apple-converted-spaces}', '(—)']
        reremoved = [r'{#anchor-?[0-9]*}', r'\{[0-9s.]*\}',
                     r'\[\W]*\]', r'\[\s*\]']
        for remove in removed:
            par = par.replace(remove, '')
        for remove in reremoved:
            par = re.sub(remove, '', par)
        return par.strip()

    @staticmethod
    def _fix_asterisk(par):
        'Remove Markdown italics and merge bold'
        if '*' not in par:
            return par
        par = par.replace('**', UNIQUE).replace('*', '').replace(UNIQUE, '**')
        while par.count('**') > 2:
            par = re.sub(r'\*\*(.*)\*\*(.*)\*\*', r'**\1\2**', par)
        return par

    @staticmethod
    def _fix_spaceout(par):
        'Try to merge spaced out words'
        match = re.search(r'\s\S\s\S\s\S\s\S\s\S\s\S\s', par)
        if match:
            start = match.start()
            end = start
            while end < len(par) - 1 and par[end] == ' ':
                end += 2
            spaceout = re.sub('(.)([A-ZĆŁÓŚŻŹ])', r'\1 \2',
                              par[start:end].replace(' ', ''))
            par = par[:start] + spaceout + par[end:]
        return par

    @staticmethod
    def _fix_whitespace(par):
        'Remove &zwj; and double spaces'
        return re.sub(r'\s+', ' ', (re.sub(r'[\xc2\xad]', '', par)))

    @staticmethod
    def _fix_punctuation(par):
        'Fix space punctuation and mdash'
        replacements = {
            r'(\s+|^),,': r'\1„',
            r'\\?\.\.\.': r'…',  # więc\... -> więc…
            r'"': r'”',
            r'\s”': ' „',
            r'\s*---\s*': ' — ',
            r',\.': r'.',  # dość,. -> dość.
            fr'(\w)\s+([{PUNCT}])': r'\1\2',  # w ustawie . -> w ustawie.
            r' \)': ')',  # (razem ) -> (razem)
            r'\( ': '(',  # ( razem) -> (raem)
            r'(\w)\(': r'\1 (',  # to(albo tamto) -> to (albo tamto)
            # biało- czerwony -> biało-czerwony
            fr'([{LOWER}]{{3}}[wknłcz]o)-\s+([{LOWER}]{{4}})': r'\1-\2',
            r'(\d) — (\d)': r'\1-\2',  # 12 - 20% -> 12-20%,
            r'([IVX]+) - ([IVX]+)': r'\1-\2',  # I - III -> I-III,
            r'\s\/([^\/]+)\/': r' (\1)',  # /PZPR/ -> (PZPR)
            r'(\d+)\/\s': r'\1) ',  # 1/ -> 1)
        }
        for pattern, replacement in replacements.items():
            par = re.sub(pattern, replacement, par)
        return par

    def _merge_pars(self, par):
        """Heuristic merging of paragraphs. Adds weights suggesting it IS a new paragraph.
        If the total weight is below the threshold, the paragraph is merged.
        """
        if not self.cache:
            self.cache.append(par)
            return None
        previous = self.cache[-1]
        eol = 14 if previous.endswith(('!', '?', '.', ':', '"', ';')) else 0
        eol += 5 if '*' in previous else 0
        eol += 5 if len(previous) < 60 else 0
        eol += 16 if '**' in par and len(par) < 80 else 0
        eol += 17 if previous.endswith('**') else 0
        eol += 12 if par.startswith('--') else 0
        eol += 5 if par and par[0].isupper() else 0
        eol += 17 if par.startswith('**') and len(
            par) > 2 and par[2].isupper() else 0
        eol += 11 if re.search(r' [a-z] [a-z] [a-z] [a-z] ', previous) else 0
        if eol >= 10:
            return self._cache([par])
        if previous.endswith('-'):  # Remove trailing hyphen
            self.cache[-1] = self.cache[-1][: -1] + par
        else:
            self.cache.append(par)
        return None

    def _remove_nonspeakers(self, par):
        'Heuristics to remove spurious speaker formatting and to format some unformatted speakers'
        if '**' not in par:
            return par
        if not re.match(fr'^\*\*[{UPPER}]', par):  # oraz **Poseł X**, **mówi**
            return par.replace('**', '')
        par = par.replace(': **', ':** ')
        re.sub(r'\*\*([^\*]{0,20})\:($|[^\*])', r'\1:**\2', par)
        # Remove ** if the content does not look like person name
        match = re.match(r'^\*\*([^\*]*):?\*\*', par)
        if not match or not self._can_be_person(match.group(1)):
            return par.replace('**', '')
        return par

    def _add_speakers(self, par):
        'Add ** around unformatted speakers if applicable'
        if '**' in par:
            return par
        patterns = [
            fr'({NAME}\s+{SURNAME})[.:\s]*$',  # Jan Kot:
            fr'({NAME}\s+{SURNAME} \(.*\)):$',  # Jan Kot (SD): Nie!
            fr'({NAME}\s+{SURNAME}):\s+(.*)$',  # Jan Kot: Nie!
            fr'({NAME}\s+{NAME}\s+{SURNAME}):\s+(.*)$',  # Jan Maria Kot: Nie!
            r'(Marszałek):\s+(.*)$',  # Marszałek: Proszę wstać.
            fr'((?:\w+\s+){{1,20}}{NAME}\s+{SURNAME}):$',  # Poseł Jan Kot:
            fr'((?:\w+\s+){{1,20}}{NAME}\s+{SURNAME} \(.*\)):$',
        ]
        for pattern in patterns:
            match = re.match(pattern, par)
            if match:
                person = f'**{match.group(1)}:**'
                return [person] + list(match.groups())[1:]
        return par

    @staticmethod
    def _fix_comments(par):
        """ Add italic to some comments not formatted in the original file
        They are assumed to start after a dot, be in brackets and start with uppercase.
        Fragments with numbers are excluded to avoid formatting legal references.
        Also fix some comment formatting.
        """
        par = par.replace('(*', '*(')  # (*Głos)*
        par = re.sub(r'\*\((^\)*)\*(^\)*)\)', r'*(\1\2)*', par)  # *(Głos*)
        # Add missing comments
        par = re.sub(fr'([.!?] |^)\(([{UPPER}][^0-9)]*)\)\.?(\s|$|\*)',
                     lambda m: f'{m.group(1)}*({m.group(2).replace("*", "")})* ', par)
        # Fix comments marked as new speaker
        not_speaker = re.match(r'\*\*\((.*)\)\.?\s*\*\*$', par)
        if not_speaker:
            par = '*(' + not_speaker.group(1) + ')*'
        return par

    def _cache(self, content=None):
        'Fetch and clear paragraph cache'
        cache = ' '.join(self.cache)
        self.cache = content or []
        return cache

    @ staticmethod
    def save(pars, path):
        'Write the content to given file'
        with open(path, mode='w') as out:
            for par in pars:
                out.write(par)
                out.write(u'\n\n')

    def _can_be_person(self, person):
        'Check if given fragment can be a person. Used to remove spurious bold around some titles'
        if not 3 < len(person) < 151:
            return False
        for prefix in ['(', 'Obywatel', 'Wysoka', 'Proszę', 'Polski', 'Panie', 'Dziękuję',
                       'Sprawozdanie', 'Pan ', 'Pani ', 'Przystępuj']:
            if person.startswith(prefix):
                return False
        for suffix in ['ego']:
            if person.endswith(suffix):
                return False
        for infix in ['II', 'Sejm', 'rzystępujemy', 'nterpelacj', 'Warszawa']:
            if infix in person:
                return False
        if re.search('[0-9]', person):
            return False
        return True

    def _debug(self, category, items):
        'Print debug message if applicable'
        if category == self.verbose:
            print(f'{category.upper()}')
            for item in items:
                print(f' • {item.rstrip()}')