cleaner.py 8.7 KB
"""Read Markdown file converted from ODT and fix as many errors as possible.
This uses heuristics to fix markdown, punctuation and even common misspellings.
"""
import re

LOWER = 'a-ząćęłńóśźż'
UPPER = 'A-ZĄĆĘŁŃÓŚŹŻ'
NAME = f'[{UPPER}](?:[{LOWER}]+\\.?|\\.)'
SURNAME = f'[{UPPER}][{LOWER}]+(?:-[{UPPER}][{LOWER}]+)?'
LETTER = 'A-Za-zĄĆĘŁŃÓŚŹŻąćęłńóśźż'
PUNCT = ',;.?!:…'
UNIQUE = '~&^'


class Cleaner:
    'The class to clean Markdown'

    def __init__(self, verbose=''):
        self.verbose = verbose
        self.cache = []
        self.filter_status = None

    def clean(self, filename, output):
        'Apply all filters'
        pars = self._read(filename)
        filters = {
            'lines': self._merge_lines,
            'odt': self._clear_odt,
            'spaceout': self._fix_spaceout,
            'whitespace': self._fix_whitespace,
            'punctuation': self._fix_punctuation,
            'spelling': self._spelling,
            'merge': self._merge_paragraphs,
            'comments': self._fix_comments,
            'speakers': self._add_speakers,
        }
        for category, function in filters.items():
            pars = self._filter(category, function, pars)
        self.save(pars, output)

    @staticmethod
    def _read(filename):
        'Read data from file. Merge lines into paragraphs using empty line as separator'
        return [line.strip() for line in open(filename)]

    def _filter(self, category, function, pars):
        '''Apply a single filter and return modified list of paragraphs.
        For each paragraph invoked function can return a paragraph or a list of them.'''
        fixed = []
        self.cache = []
        self.filter_status = None
        for par in pars:
            result = function(par)
            if isinstance(result, list):
                self._debug(category, [par] + result)
                fixed.extend(filter(None, result))
            elif isinstance(result, str):
                if result != par:
                    self._debug(category, [par, result])
                fixed.append(result)
        if self.cache:
            fixed.append(self._cache())
        return fixed

    def _merge_lines(self, line):
        'Merge lines into paragraphs'
        if not line:
            return self._cache()
        if self.cache and re.match(r'[a-np-ząćęłóńśźż]-', self.cache[-1][-2:]):
            # and previous[-1].rindex(' ') > len(previous[-1]) - 5:
            self.cache[-1] = f'{self.cache[-1][:-1]}{line}'
        else:
            self.cache.append(line)
        return None

    @staticmethod
    def _clear_odt(par):
        'Remove leftovers from OpenOffice: bookmarks, footnotes, hyphenation, backslashes'
        reremoved = [r'{#anchor-?[0-9]*}', r'\{[0-9s.]*\}', r'\[\s*\]']
        removed = ['[]', '\\*', '\\',
                   '{.Apple-converted-spaces}', '(—)', '*', '#', '^', '>']
        for remove in reremoved:
            par = re.sub(remove, '', par)
        for remove in removed:
            par = par.replace(remove, '')
        return par.strip()

    @staticmethod
    def _fix_spaceout(par):
        'Try to merge spaced out words'
        match = re.search(r'\s\S\s\S\s\S\s\S\s\S\s\S\s', par)
        if match:
            start = match.start()
            end = start
            while end < len(par) - 1 and par[end] == ' ':
                end += 2
            spaceout = re.sub('(.)([A-ZĆŁÓŚŻŹ])', r'\1 \2',
                              par[start:end].replace(' ', ''))
            par = par[:start] + spaceout + par[end:]
        return par

    @staticmethod
    def _fix_whitespace(par):
        'Remove &zwj; and double spaces'
        return re.sub(r'\s+', ' ', (re.sub(r'[\xc2\xad]', '', par)))

    @staticmethod
    def _fix_punctuation(par):
        'Fix space punctuation and mdash'
        replacements = {
            r'(\s+|^),,': r'\1„',  # quote from commas
            r'\'\'': r'”',  # quote from single quotes
            r'\\?\.\.\.': r'…',  # więc\... -> więc…
            r'\s*_\s*': ' ',  # underscore
            r'"': r'”',  # quotes
            r'\s”': ' „',  # leading quotes
            r'\s*---\s*': ' — ',  # hyphens
            r',\.': r'.',  # dość,. -> dość.
            fr'([:,])([{LETTER}])': r'\1 \2',  # a,b -> a, b
            fr'(\w)\s+([{PUNCT}])': r'\1\2',  # w ustawie . -> w ustawie.
            # biało- czerwony -> biało-czerwony
            fr'([{LOWER}]{{3}}[wknłcz]o)-\s+([{LOWER}]{{4}})': r'\1-\2',
            r'(\d) — (\d)': r'\1-\2',  # 12 - 20% -> 12-20%,
            r'([IVX]+) - ([IVX]+)': r'\1-\2',  # I - III -> I-III,
            r'\s(\d+)\/\s': r' \1) ',  # 1/ -> 1)
            r'\s\/([^\/]+)\/': r' (\1)',  # /PZPR/ -> (PZPR)
            r' \)': ')',  # (razem ) -> (razem)
            r'\( ': '(',  # ( razem) -> (raem)
            r'(\w)\(': r'\1 (',  # to(albo tamto) -> to (albo tamto)
        }
        for pattern, replacement in replacements.items():
            par = re.sub(pattern, replacement, par)
        return par

    def _spelling(self, par):
        fixes = {'Glos ': 'Głos: ', 'Glosy ': 'Głosy:',
                 'Marszalek': 'Marszałek', 'NAUKl': 'NAUKI'}
        for misspelling, fix in fixes.items():
            par = par.replace(misspelling, fix)
        return par

    def _merge_paragraphs(self, par):
        """Heuristic merging of paragraphs. Adds weights suggesting it IS a new paragraph.
        If the total weight is below the threshold, the paragraph is merged.
        """
        if not self.cache:
            self.cache.append(par)
            return None
        previous = self.cache[-1]
        eol = 14 if previous.endswith(
            ('!', '?', '.', ':', '"', ';', '…')) else 0
        eol += 5 if '*' in previous else 0
        eol += 5 if len(previous) < 60 else 0
        eol += 16 if '**' in par and len(par) < 80 else 0
        eol += 17 if previous.endswith('**') else 0
        eol += 12 if par.startswith('--') else 0
        eol += 20 if par.startswith('[') else 0
        eol += 5 if par and par[0].isupper() else 0
        eol += 17 if par.startswith('**') and len(
            par) > 2 and par[2].isupper() else 0
        eol += 11 if re.search(r' [a-z] [a-z] [a-z] [a-z] ', previous) else 0
        if eol >= 10:
            return self._cache([par])
        if previous.endswith('-'):  # Remove trailing hyphen
            self.cache[-1] = self.cache[-1][: -1] + par
        else:
            self.cache.append(par)
        return None

    @staticmethod
    def _add_speakers(par):
        'Add ** around unformatted speakers if applicable'
        if '*' in par:
            return par
        patterns = [
            r'(Głosy?\s.*):\s*(.*)$',
            fr'({NAME}\s+{SURNAME})[.:\s]*$',  # Jan Kot:
            fr'({NAME}\s+{SURNAME} \([^:]*\)):$',  # Jan Kot (SD): Nie!
            fr'({NAME}\s+{SURNAME}):\s+(.*)$',  # Jan Kot: Nie!
            fr'({NAME}\s+{NAME}\s+{SURNAME}):\s+(.*)$',  # Jan Maria Kot: Nie!
            r'(Marszałek):\s+(.*)$',  # Marszałek: Proszę wstać.
            # Poseł Jan Kot:
            fr'((?:[^:\s]+\s+){{0,20}}?{NAME}\s+{SURNAME}):$',
            fr'((?:[^:\s]+\s+){{0,20}}?{NAME}\s+{SURNAME} \(.*\)):$',
            fr'((?:[^:\s]+\s+){{0,20}}?{NAME}\s+{SURNAME}): (.*)$',
            fr'((?:[^:\s]+\s+){{0,20}}?{NAME}\s+{SURNAME} \([^\):]*\)): (.*)$',
        ]
        for pattern in patterns:
            match = re.match(pattern, par)
            if match and match.group(1)[0].isalpha():
                person = f'**{match.group(1)}:**'
                return [person] + list(match.groups())[1:]
        return par

    @staticmethod
    def _fix_comments(par):
        """ Add italic to some comments not formatted in the original file
        They are assumed to start after a dot, be in brackets and start with uppercase.
        Fragments with numbers are excluded to avoid formatting legal references.
        """
        if re.match(r'\(.*\)$', par):
            return f'*{par}*'
        par = re.sub(fr'([.!?] |^)\(([{UPPER}][^0-9)]*)\)\.?(\s|$|\*)',
                     lambda m: f'{m.group(1)}\n*({m.group(2).replace("*", "")})*\n', par)
        return par.split('\n')

    def _cache(self, content=None):
        'Fetch and clear paragraph cache'
        cache = ' '.join(self.cache)
        self.cache = content or []
        return cache

    @ staticmethod
    def save(pars, path):
        'Write the content to given file'
        with open(path, mode='w') as out:
            for par in pars:
                out.write(par)
                out.write(u'\n\n')

    def _debug(self, category, items):
        'Print debug message if applicable'
        if category == self.verbose:
            print(f'{category.upper()}')
            for item in items:
                print(f' • {item.rstrip()}')