update_ppc_docs.py 5.9 KB
import os

from django.core.management.base import BaseCommand, CommandError
from django.db.models import Q

from collector import settings
from datetime import datetime
from loaders import ppc_tei
from writers import tei, tei_anno_and_mtas
from storage.models import Document, Participant
from projects.ppc.ppo import PPO
from projects.ppc.models import Function


class Command(BaseCommand):
    help = 'Add political ontology to PPC data.'

    def add_arguments(self, parser):

        parser.add_argument('-i',
                            '--input',
                            action='store',
                            dest='input',
                            required=True,
                            type=str,
                            help='path to file with documents ids to update')

    def handle(self, *args, **options):
        if not os.path.isfile(options['input']):
            raise CommandError('Input must be a file!')

        docs2update = self._get_docs_to_update(options['input'])
        self._update_docs(docs2update)

    def _get_docs_to_update(self, ids_file_path):
        doc_ids = []
        with open(ids_file_path, 'r') as f:
            for line in f.readlines():
                doc_id = line.strip()
                if doc_id:
                    doc_ids.append(doc_id)
        return Document.objects.filter(id__in=doc_ids)

    def _update_docs(self, docs2update):
        for document in docs2update.all():
            ppc_tei.reload_document(document)
            document.indexed = False
            document.changed = True
            document.save()
            document.pipeline.annotate(document)
            document.pipeline.add_terminology(document)
            tei_anno_and_mtas.write(document)
            document.pipeline.index(document)
            document.changed = False
            document.save()

        self._add_political_ontology(docs2update)
        for document in docs2update.all():
            tei.write_header(document)

    def _add_political_ontology(self, documents):
        ppo = PPO(settings.PPO_PATH)

        for politician in ppo.get_politicians():
            for public_function in politician.hasFunction:
                self._add_function(ppo, documents, politician, public_function)

        for person in ppo.get_other_persons():
            for public_function in person.hasFunction:
                self._add_function(ppo, documents, person, public_function)

    def _add_function(self, ppo, documents, politician, public_function):
        if public_function.position:
            start_date, end_date = self._get_function_term_of_office(public_function)

            if start_date and end_date:
                documents = documents.filter(publication_date__gte=start_date)
                documents = documents.filter(publication_date__lte=end_date)
            else:
                print(public_function, 'No start/end date!!')
                return
        else:
            start_date, end_date = self._get_house_term_of_office(public_function)

            election = str(public_function.occursWith[0]).split('.')[-1]
            system = ppo.get_system(election)

            house, term = self._get_house_and_term(public_function)
            documents = documents.filter(metadata__name='system', metadata__value=system)
            documents = documents.filter(metadata__name='house', metadata__value=house)
            documents = documents.filter(metadata__name='termNo', metadata__value=term)
            if start_date:
                documents = documents.filter(publication_date__gte=start_date)
            if end_date:
                documents = documents.filter(publication_date__lte=end_date)

        q_names = Q()
        for lastName in politician.lastName:
            q_names |= Q(name__regex=r'(^|\s)((%s\s(.+\s)?%s)|(%s\s(.+\s)?%s))(\s|$)' % (
                politician.firstName[0], lastName, lastName, politician.firstName[0]))

        name_surname_match = Participant.objects.filter(Q(type='person') & Q(document__in=documents) &
                                                        q_names).distinct()

        print(public_function, 'full match: %d' % (name_surname_match.count()))
        function_obj, _ = Function.objects.get_or_create(iri=public_function.iri)

        for participant in name_surname_match.all():
            participant.functions.add(function_obj)

    def _get_function_term_of_office(self, public_function):
        start = None
        end = None
        if public_function.startTime[0]:
            date_parts = public_function.startTime[0].split('-')
            if len(date_parts) == 3:
                start = datetime.strptime(public_function.startTime[0], '%d-%m-%Y')
            elif len(date_parts) == 2:
                start = datetime.strptime(public_function.startTime[0], '%m-%Y')
            else:
                start = datetime.strptime(public_function.startTime[0], '%Y')
        if public_function.stopTime[0]:
            date_parts = public_function.stopTime[0].split('-')
            if len(date_parts) == 3:
                end = datetime.strptime(public_function.stopTime[0], '%d-%m-%Y')
            elif len(date_parts) == 2:
                end = datetime.strptime(public_function.stopTime[0], '%m-%Y')
            else:
                end = datetime.strptime(public_function.stopTime[0], '%Y')
        return start, end

    def _get_house_term_of_office(self, public_function):
        start = None
        end = None
        if public_function.dateFrom:
            start = datetime.strptime(public_function.dateFrom.split('T')[0], '%Y-%m-%d')
        if public_function.dateTo:
            end = datetime.strptime(public_function.dateTo.split('T')[0], '%Y-%m-%d')
        return start, end

    def _get_house_and_term(self, public_function):
        house = 'Senat'
        if public_function.isLowerHouse[0]:
            house = 'Sejm'
        term = str(public_function).split('_')[-1].replace(house, '').lstrip('-')
        return house, term