create_text_walenty.py
7.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#-*- coding:utf-8 -*-
import datetime
import os
import tarfile
from collections import Counter
from django.core.management.base import BaseCommand
from optparse import make_option
from accounts.models import User
from dictionary.ajax_vocabulary_management import create_text_walenty, \
get_stats, write_stats, \
update_walenty_stats
from dictionary.ajax_argument_realizations import create_realizations_file
from dictionary.models import Frame_Opinion, Lemma, Vocabulary, POS, \
get_checked_statuses, get_ready_statuses
from settings import WALENTY_PATH
class Command(BaseCommand):
args = '<dict dict ...>'
help = 'Get Walenty in text format.'
option_list = BaseCommand.option_list + (
make_option('--start_date',
action='store',
type='string',
dest='start_date',
default='all',
help='Status change start date (format: YYYY-MM-DD).'),
)
def handle(self, *args, **options):
now = datetime.datetime.now().strftime('%Y%m%d')
vocab_names = list(args)
vocab_names.sort()
if vocab_names:
filename_base = '%s_%s_%s' % ('walenty', '+'.join(vocab_names), now)
else:
filename_base = '%s_%s' % ('walenty', now)
realizations_path = os.path.join(WALENTY_PATH,
'%s_%s.txt' % ('phrase_types_expand', now))
checked_stats_path = os.path.join(WALENTY_PATH, u'%s_%s.txt' % (filename_base.replace('walenty', 'stats'),
'verified'))
ready_stats_path = os.path.join(WALENTY_PATH, u'%s_%s.txt' % (filename_base.replace('walenty', 'stats'),
'all'))
vocabularies = Vocabulary.objects.none()
if vocab_names:
vocabularies = Vocabulary.objects.filter(name__in=vocab_names)
try:
all_stats = Counter({})
verified_stats = Counter({})
base_path = os.path.join(WALENTY_PATH, filename_base)
archive = tarfile.open(base_path + '-text.tar.gz', 'w:gz')
os.chdir(WALENTY_PATH)
for pos in POS.objects.exclude(tag=u'unk').order_by('priority'):
pos_stats = create_pos_archive_and_get_stats(archive, pos, vocabularies, options['start_date'], filename_base)
all_stats = all_stats + Counter(pos_stats['all'])
verified_stats = verified_stats + Counter(pos_stats['verified'])
create_realizations_file(realizations_path)
archive.add(os.path.basename(realizations_path))
if not vocab_names and options['start_date'] != 'all':
write_stats(checked_stats_path, verified_stats)
archive.add(os.path.basename(checked_stats_path))
write_stats(ready_stats_path, all_stats)
archive.add(os.path.basename(ready_stats_path))
update_walenty_stats(all_stats)
finally:
archive.close()
os.remove(realizations_path)
if not vocab_names and options['start_date'] != 'all':
os.remove(checked_stats_path)
os.remove(ready_stats_path)
def create_pos_archive_and_get_stats(archive, pos, vocabularies, start_date, filename_base):
all_stats = {}
checked_stats = {}
try:
checked_statuses = get_checked_statuses()
ready_statuses = get_ready_statuses()
lemmas = Lemma.objects.filter(old=False, entry_obj__pos=pos)
ready_lemmas = lemmas.filter(status__in=ready_statuses)
if start_date != 'all':
ready_lemmas = filter_lemmas_by_status_change(ready_lemmas, checked_statuses, start_date)
ready_lemmas = ready_lemmas.order_by('entry_obj__name')
all_path = os.path.join(WALENTY_PATH, u'%s_%ss_%s' % (filename_base, pos.tag, 'all'))
walenty_path_ready = create_text_walenty(file_name=all_path,
lemmas=ready_lemmas,
vocabularies=vocabularies,
frame_opinions=Frame_Opinion.objects.none(),
lemma_statuses=ready_statuses,
owners=User.objects.none(),
poss=POS.objects.filter(pk=pos.pk),
add_frame_opinions=True)
all_filename = os.path.basename(walenty_path_ready)
archive.add(name=all_filename, arcname=os.path.join(u'%ss' % pos.tag, all_filename))
checked_lemmas = lemmas.filter(status__in=checked_statuses)
if start_date != 'all':
checked_lemmas = filter_lemmas_by_status_change(checked_lemmas, checked_statuses, start_date)
checked_lemmas = checked_lemmas.order_by('entry_obj__name')
checked_path = os.path.join(WALENTY_PATH, u'%s_%ss_%s' % (filename_base, pos.tag, 'verified'))
walenty_path_checked = create_text_walenty(file_name=checked_path,
lemmas=checked_lemmas,
vocabularies=vocabularies,
frame_opinions=Frame_Opinion.objects.none(),
lemma_statuses=checked_statuses,
owners=User.objects.none(),
poss=POS.objects.filter(pk=pos.pk),
add_frame_opinions=True)
checked_filename = os.path.basename(walenty_path_checked)
archive.add(name=checked_filename, arcname=os.path.join(u'%ss' % pos.tag, checked_filename))
if not vocabularies.exists() and start_date != 'all':
all_stats = get_stats(ready_statuses, pos.tag)
all_stats_path = os.path.join(WALENTY_PATH, u'%s_%ss_%s.txt' % (filename_base.replace('walenty', 'stats'),
pos.tag, 'all'))
write_stats(all_stats_path, all_stats)
all_stats_filename = os.path.basename(all_stats_path)
archive.add(name=all_stats_filename, arcname=os.path.join(u'%ss' % pos.tag, all_stats_filename))
checked_stats = get_stats(checked_statuses, pos.tag)
checked_stats_path = os.path.join(WALENTY_PATH, u'%s_%ss_%s.txt' % (filename_base.replace('walenty', 'stats'),
pos.tag, 'verified'))
write_stats(checked_stats_path, checked_stats)
checked_stats_filename = os.path.basename(checked_stats_path)
archive.add(name=checked_stats_filename, arcname=os.path.join(u'%ss' % pos.tag, checked_stats_filename))
finally:
os.remove(walenty_path_ready)
os.remove(walenty_path_checked)
if not vocabularies.exists():
os.remove(all_stats_path)
os.remove(checked_stats_path)
return {'all': all_stats,
'verified': checked_stats}
def filter_lemmas_by_status_change(lemmas, statuses, start_date_str):
start_date = parse_date(start_date_str)
filtered_lemmas_pks = []
for lemma in lemmas:
if lemma.status_history.filter(status=statuses[0], date__gte=start_date).exists():
filtered_lemmas_pks.append(lemma.pk)
return lemmas.filter(pk__in=filtered_lemmas_pks)
def parse_date(date_str):
date_parts = date_str.split('-')
year = int(date_parts[0])
month = int(date_parts[1].lstrip('0'))
day = int(date_parts[2].lstrip('0'))
date = datetime.datetime(year, month, day, 00, 00)
return date