update_ppc_docs.py
5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
from django.core.management.base import BaseCommand, CommandError
from django.db.models import Q
from collector import settings
from datetime import datetime
from loaders import ppc_tei
from writers import tei, tei_anno_and_mtas
from storage.models import Document, Participant
from projects.ppc.ppo import PPO
from projects.ppc.models import Function
class Command(BaseCommand):
help = 'Add political ontology to PPC data.'
def add_arguments(self, parser):
parser.add_argument('-i',
'--input',
action='store',
dest='input',
required=True,
type=str,
help='path to file with documents ids to update')
def handle(self, *args, **options):
if not os.path.isfile(options['input']):
raise CommandError('Input must be a file!')
docs2update = self._get_docs_to_update(options['input'])
self._update_docs(docs2update)
def _get_docs_to_update(self, ids_file_path):
doc_ids = []
with open(ids_file_path, 'r') as f:
for line in f.readlines():
doc_id = line.strip()
if doc_id:
doc_ids.append(doc_id)
return Document.objects.filter(id__in=doc_ids)
def _update_docs(self, docs2update):
for document in docs2update.all():
ppc_tei.reload_document(document)
document.indexed = False
document.changed = True
document.save()
document.pipeline.annotate(document)
document.pipeline.add_terminology(document)
tei_anno_and_mtas.write(document)
document.pipeline.index(document)
document.changed = False
document.save()
self._add_political_ontology(docs2update)
for document in docs2update.all():
tei.write_header(document)
def _add_political_ontology(self, documents):
ppo = PPO(settings.PPO_PATH)
for politician in ppo.get_politicians():
for public_function in politician.hasFunction:
self._add_function(ppo, documents, politician, public_function)
for person in ppo.get_other_persons():
for public_function in person.hasFunction:
self._add_function(ppo, documents, person, public_function)
def _add_function(self, ppo, documents, politician, public_function):
if public_function.position:
start_date, end_date = self._get_function_term_of_office(public_function)
if start_date and end_date:
documents = documents.filter(publication_date__gte=start_date)
documents = documents.filter(publication_date__lte=end_date)
else:
print(public_function, 'No start/end date!!')
return
else:
start_date, end_date = self._get_house_term_of_office(public_function)
election = str(public_function.occursWith[0]).split('.')[-1]
system = ppo.get_system(election)
house, term = self._get_house_and_term(public_function)
documents = documents.filter(metadata__name='system', metadata__value=system)
documents = documents.filter(metadata__name='house', metadata__value=house)
documents = documents.filter(metadata__name='termNo', metadata__value=term)
if start_date:
documents = documents.filter(publication_date__gte=start_date)
if end_date:
documents = documents.filter(publication_date__lte=end_date)
q_names = Q()
for lastName in politician.lastName:
q_names |= Q(name__regex=r'(^|\s)((%s\s(.+\s)?%s)|(%s\s(.+\s)?%s))(\s|$)' % (
politician.firstName[0], lastName, lastName, politician.firstName[0]))
name_surname_match = Participant.objects.filter(Q(type='person') & Q(document__in=documents) &
q_names).distinct()
print(public_function, 'full match: %d' % (name_surname_match.count()))
function_obj, _ = Function.objects.get_or_create(iri=public_function.iri)
for participant in name_surname_match.all():
participant.functions.add(function_obj)
def _get_function_term_of_office(self, public_function):
start = None
end = None
if public_function.startTime[0]:
date_parts = public_function.startTime[0].split('-')
if len(date_parts) == 3:
start = datetime.strptime(public_function.startTime[0], '%d-%m-%Y')
elif len(date_parts) == 2:
start = datetime.strptime(public_function.startTime[0], '%m-%Y')
else:
start = datetime.strptime(public_function.startTime[0], '%Y')
if public_function.stopTime[0]:
date_parts = public_function.stopTime[0].split('-')
if len(date_parts) == 3:
end = datetime.strptime(public_function.stopTime[0], '%d-%m-%Y')
elif len(date_parts) == 2:
end = datetime.strptime(public_function.stopTime[0], '%m-%Y')
else:
end = datetime.strptime(public_function.stopTime[0], '%Y')
return start, end
def _get_house_term_of_office(self, public_function):
start = None
end = None
if public_function.dateFrom:
start = datetime.strptime(public_function.dateFrom.split('T')[0], '%Y-%m-%d')
if public_function.dateTo:
end = datetime.strptime(public_function.dateTo.split('T')[0], '%Y-%m-%d')
return start, end
def _get_house_and_term(self, public_function):
house = 'Senat'
if public_function.isLowerHouse[0]:
house = 'Sejm'
term = str(public_function).split('_')[-1].replace(house, '').lstrip('-')
return house, term