cleaner.py 15.3 KB
"""Read Markdown file converted from ODT and fix as many errors as possible.
This uses heuristics to fix markdown, punctuation and even common misspellings.
"""
import re

LOWER = 'a-ząćęłńóśźż'
UPPER = 'A-ZĄĆĘŁŃÓŚŹŻ'
LETTER = 'A-Za-zĄĆĘŁŃÓŚŹŻąćęłńóśźż'


class Cleaner:
    'The class to clean Markdown'

    def __init__(self, verbose=''):
        self.verbose = verbose

    def clean(self, filename, output):
        ' Do all the work '
        pars = self._read(filename)
        filters = [self._merge_titles,
                   self._clear_odt,
                   self._merge_pars,
                   self._trim_interpelations,
                   self._fix_speakers,
                   self._fix_comments,
                   #    self._simplify_formatting
                   ]
        for filter_function in filters:
            pars = filter_function(pars)
        self.save(pars, output)

        # if self.args.mode != 'senate':
        # pars = self._fix_senate(pars)
        # if self.args.text:
        # if self.args.speakers:
        # self.extractSpeakers(filename, pars)

    def _read(self, filename):
        'Read data from file. Merge lines into paragraphs using empty line as separator'
        pars = []
        current = ''
        for line in open(filename):
            line = self._fix_punctuation(line).strip()
            if line:
                if current.endswith('- ') and not current.endswith('o- '):
                    current = current[:-2]
                current += line + ' '
            else:
                pars.append(current.strip())
                current = ''
        if current.strip():
            pars.append(current.strip())
        return pars

    def _merge_titles(self, pars):
        'Merge split speakers with title in the first line and name in the second'
        fixed = []
        for par in pars:
            if par.startswith('**') and fixed and ':' not in fixed[-1] and fixed[-1].endswith('**'):
                fixed[-1] = (fixed[-1][:-2] + ' ' + par[2:]).replace('  ', ' ')
            else:
                fixed.append(par)
        return fixed

    def _fix_punctuation(self, line):
        'Fix space punctuation and mdash'
        line = re.sub(r'[\xc2\xad]', '', line)  # ‍
        line = re.sub(r'\s+', ' ', line)  # spaces
        line2 = line
        replacements = {
            r'(\s+|^),,': r'\1„',
            r'"': r'”',
            r'\s*---\s*': ' — ',
            r'(\w)\s+\.': '\1.',  # w ustawie . -> w ustawie.
            r'\( ': '(',  # ( razem) -> (raem)
            r'(\w)\(': r'\1 (',  # to(albo tamto) -> to (albo tamto)
            r' \)': ')',  # (razem ) -> (razem)
            # biało- czerwony -> biało-czerwony
            fr'([{LOWER}]{{3}}[wknłcz]o)-\s+([{LOWER}]{{4}})': r'\1-\2',
            r'([{LOWER}])-\s+([{LOWER}]{{2}})': r'\1\2',  # al- bo -> albo
            r'(\d) — (\d)': r'\1-\2',  # 12 - 20% -> 12-20%
        }
        for pattern, replacement in replacements.items():
            line = re.sub(pattern, replacement, line)
        self._debug('punctuation', ['punctuation changed',
                    line2, line], line != line2)
        return line

    def _clear_odt(self, pars):
        'Remove leftovers from OpenOffice: bookmarks, footnotes, hyphenation, backslashes'
        removed = ['[]', '\\*', '\\', '{.Apple-converted-spaces}', '(—)']
        reremoved = [r'\s*\u00ad\s*', '{#anchor-?[0-9]*}', r'\{[0-9s.]*\}',
                     r'\[\W]*\]', r'\[\s*\]']
        cleared = []
        for par in pars:
            for remove in removed:
                par = par.replace(remove, ' ')
            for remove in reremoved:
                par = re.sub(remove, ' ', par)
            if '*' in par:
                par = self._fix_markdown(par)
            if re.search(r'\S\s\S\s\S\s\S\s\S\s\S\s\S', par):
                par = self._fix_spaceout(par)
            cleared.append(par.strip())
        return cleared

    def _fix_spaceout(self, par):
        'Try to merge spaced out words'
        oldpar = par
        match = re.search(r'\s\S\s\S\s\S\s\S\s\S\s\S\s', par)
        if match:
            start = match.start()
            end = start
            while end < len(par) - 1 and par[end] == ' ':
                end += 2
            spaceout = re.sub('(.)([A-ZĆŁÓŚŻŹ])', '\\1 \\2',
                              par[start:end].replace(' ', ''))
            par = par[:start] + spaceout + par[end:]
            self._debug('spaceout', ['spaceout removed', oldpar, par])
        return par

    def _fix_markdown(self, par):
        'Simplify markdown asterisk. Fix spaces adjacent to asterisk'
        oldpar = par
        # Replace redundant asterisks
        par = re.sub(r'\*\*([-=.]?)\*\*', r'\1', par)
        for mark in ['**', '*']:
            start = par.find(mark)
            if start == -1 or start >= len(par) - len(mark) or par[start+len(mark)] == '*':
                continue
            while start > 0 and not par[start-1].isspace():
                start -= 1
            last = par.rfind(mark)
            if last <= start:
                continue
            if par[last-1] == ' ':
                mid = par[start:last-1].replace('*', '')
                par = f'{par[:start]}{mark}{mid}{mark} {par[last+len(mark):]}'
            else:
                while (last < len(par) and par[last] != ' '):
                    last += 1
                par = par[:start] + mark + \
                    par[start:last].replace('*', '') + mark + par[last:]
        par = re.sub(r'\*\*(\s*)\*\*', '\\1', par, 0, re.MULTILINE)
        # Add trailing * for starting * + bracket
        par = re.sub(r'\s\*\(([^)]*)\)\.?\s+\*?', lambda m: ' *(' +
                     m.group(1).replace('*', '') + ')* ', par)
        # Remove comment with no colon/bracket - some texts contain spurious italic
        par = re.sub(
            fr'\*+([{LETTER} ][{UPPER} !?-]*)\*+', '\\1', par)
        self._debug('markdown', ['markdown fixed', oldpar, par], par != oldpar)
        return par

    def _fix_speakers(self, pars):
        'Heuristics to remove spurious speaker formatting and to format some unformatted speakers'
        fixed = []
        for par in pars:
            oldpar = par
            if '**' in par:
                # Remove ** around single words
                par = re.sub(
                    r'\*\*([{LOWER}0-9%,.:/\s-]*)\*\*', '\\1', par)
                # Remove ** not at the start of the line
                if '**' in par and not par.startswith('**'):
                    par = par.replace('**', '')
                # Move ** after the colon
                par = re.sub(
                    r'^\*\*([^*:]*):\s+([^*]*)\s*\*\*', '**\\1:** \\2', par)
                # Remove ** if the content does not look like person name
                match = re.match(r'^\*\*([^\*]*):?\*\*', par)
                if not match or not self._can_be_person(match.group(1)):
                    par = self._find_speakers(par.replace('**', ''))
                    self._debug('speakers', ['not a speaker', par])
            else:
                par = self._find_speakers(par)
            fixed.append(par)
            status = '' if par.startswith('**') else 'not a'
            self._debug('speakers', [f'{status} speaker', par], oldpar != par)
        return fixed

    def _find_speakers(self, par):
        'Add ** around unformatted speakers if applicable'
        match = re.match(
            fr'([{UPPER}][{LOWER}.]+\s+[{UPPER}][{LOWER}]+\s+[{UPPER}][{LETTER}-]+)[.:\s]*$', par)
        if match:
            if self._can_be_person(match.group(1)):
                return f'**{match.group(1)}:**'
            else:
                return par
        match = re.match(
            fr'([{UPPER}][{LOWER}.]+\s+[{UPPER}][{LETTER}-]+):\s+(.*)$', par)
        if match:
            return f'**{match.group(1)}:**\n\n{match.group(2)}'
        match = re.match(
            fr'([{UPPER}][{LOWER}.]+\s+[{UPPER}][{LETTER}-]+\s+[{UPPER}][{LOWER}-]+):\s+(.*)$', par)
        if match:
            return f'**{match.group(1)}:**\n\n{match.group(2)}'
        if par == u'Marszałek':
            return u'**Marszałek:**'
        return par

    def _fix_comments(self, pars):
        """ Add italic to some comments not formatted in the original file
        They are assumed to start after a dot, be in brackets and start with uppercase.
        Fragments with numbers are excluded to avoid formatting legal references.
        Also fix some comment formatting.
        """
        fixed = []
        for par in pars:
            oldpar = par
            # Fix comments like '*(Part of* comment)'
            par = par.replace('(*', '*(')
            par = re.sub(r'\*\(([^\)]*)\*([^\)]*)\)', '*(\\1\\2)*', par)
            # Add missing comments
            par = re.sub(fr'\. \(([{UPPER}][^0-9)]*)\)\.?(\s|$|\*)',
                         lambda m: '. *(' + m.group(1).replace('*', '') + ')* ', par)
            # Fix comments marked as new speaker
            not_speaker = re.match(r'\*\*\((.*)\)\.?\s*\*\*$', par)
            if not_speaker:
                par = '*(' + not_speaker.group(1) + ')*'
            self._debug('comments', [par], oldpar != par)
            fixed.append(par)
        return fixed

    def _fix_senate(self, pars):
        fixed = []
        header = False
        for par in pars:
            if not header and not par.startswith('**'):
                continue
            header = True
            if par.startswith('[]'):
                continue
            if re.match(r'\[\d+\]?$', par):
                continue
            if re.match(r'\*\(Początek posiedzenia o godzinie \d+ minut \d+\)\*', par):
                continue
            if re.match(r'\d+\. posiedzenie [\wąćęłńóśżźŹŻŚŁ\s,]+$', par):
                continue
            if re.match(r'\d+$', par):
                continue
            if re.match(r'w dniu \d+ \S+ [0-9]{4} r\.$', par):
                continue
            if re.match(r'\d+\. posiedzenie .*\*', par):
                fixed.append(re.sub(r'^[^*]*(\*.*)$', '\\1', par))
            else:
                fixed.append(par)
                if '*(Koniec posiedzenia' in par:
                    break
        return fixed

    def _can_be_person(self, person):
        'Check if given fragment can be a person. Used to remove spurious bold around some titles'
        if not 3 < len(person) < 151:
            return False
        for prefix in ['(', 'Obywatel', 'Wysoka', 'Proszę', 'Polski', 'Panie', 'Dziękuję',
                       'Sprawozdanie', 'Pan ', 'Pani ', 'Przystępuj']:
            if person.startswith(prefix):
                return False
        for suffix in ['ego']:
            if person.endswith(suffix):
                return False
        for infix in ['II', 'Sejm', 'rzystępujemy', 'nterpelacj', 'Warszawa']:
            if infix in person:
                return False
        if re.search('[0-9]', person):
            return False
        return True

    def _simplify_formatting(self, pars):
        'Remove repeated or nested formatting'
        text = []
        for par in pars:
            par = re.sub(r'\*\*[^\*]*\*\*', '', par)
            par = re.sub(r'\*[^\*]*\*', '', par)
            par = par.strip()
            if par:
                text.append(par)
        return text

    def _merge_pars(self, pars):
        """Heuristic merging of paragraphs. Adds weights suggesting it IS a new paragraph.
        If the total weight is below the threshold, the paragraph is merged.
        """
        merged = pars[:1]
        for par in pars[1:]:
            if not par:
                continue
            previous = merged[-1]
            eol = 0
            if previous.endswith(('!', '?', '.', ':', '"', ';')):
                eol += 14
            if '*' in previous:
                eol += 5
            if len(previous) < 60:
                eol += 5
            if '**' in par and len(par) < 80:
                eol += 16
            if previous.endswith('**'):
                eol += 17
            if par.startswith('--'):
                eol += 12
            if par[0].isupper():
                eol += 5
            if par.startswith('**') and len(par) > 2 and par[2].isupper():
                eol += 17
            if re.search(r' [a-z] [a-z] [a-z] [a-z] ', previous):
                eol += 11
            if eol >= 10:
                self._debug('paragraphs', [
                    f'paragraphs not merged (weight {eol})', merged[-1], par],
                    par[0].islower())
                merged.append(par)
            else:
                self._debug(
                    'paragraphs', [f'paragraphs merged (weight {eol})', merged[-1], par])
                if merged[-1].endswith('-'):  # Remove trailing hyphen when merging
                    merged[-1] = merged[-1][:-1] + par
                else:
                    merged[-1] = merged[-1] + ' ' + par
        return merged

    def _trim_interpelations(self, pars):
        'Remove interpelations'
        trimmed = []
        ignored = False
        for par in pars:
            if par.replace('*', '').startswith(u'Odpowiedź'):  # and not self.interpellations
                ignored = True
                self._debug('interpellations', 'interpellation detected')
            if not ignored:
                trimmed.append(par)
            elif self.verbose == 'interpellations':
                self._debug('interpellations', ['interpellation ignored', par])
        return trimmed

    @ staticmethod
    def save(pars, path):
        'Write the content to given file'
        with open(path, mode='w') as out:
            for par in pars:
                out.write(par)
                out.write(u'\n\n')

    # def extractSpeakers(self, filename, pars):
        # 'Prepare a list of unique speaker names for checking'
        # self.speakers.write(r'[{}]\n'.format(filename))
        # speakers = set()
        # for par in pars:
        #     match = re.match(r'^\*\*([^\*:]*)', par)
        #     if match:
        #         speakers.add(match.group(1))
        # speakers = list(speakers)
        # speakers.sort()
        # for speaker in speakers:
        #     self.speakers.write(speaker + u'\n')
        # self.speakers.write(u'\n')

    def _debug(self, category, message, condition=True):
        if condition and self.verbose == category:
            if not isinstance(message, list):
                message = [message]
            print(f'{category.upper()}: {message[0]}')
            for line in message[1:]:
                print(f' • {line.rstrip()}')


# Read options
# parser = argparse.ArgumentParser()
# parser.add_argument('-o', '--output', help='output directory', default='out')
# parser.add_argument('-i', '--interpellations',
#                     help='include interpellations', action='store_true')
# parser.add_argument('-x', '--speakers',
#                     help='extract speakers', action='store_true')
# parser.add_argument('-t', '--text', help='remove metatext',
#                     action='store_true')
# parser.add_argument('-v', '--verbose', help='show messages', default='none',
#                     choices=['none', 'line', 'markdown', 'merge', 'spaceout',
#                              'interpellations', 'speakers', 'comments'])
# parser.add_argument('-m', '--mode', help='special mode', default='default',
#                     choices=['default', 'senate'])
# parser.add_argument('filename', help='file to process', nargs='+')
# args = parser.parse_args()