duckduckgo.py
4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import codecs
import itertools
import os
from tempfile import mkdtemp, mkstemp
from subprocess import check_call
import settings
class DuckDuckGo:
def __init__(self, min_hits=settings.DUCKDUCKGO_MIN_HITS):
self.min_hits = min_hits
def check_expression(self, expression, new_segments=[]):
tmp_folder = mkdtemp()
tmp_response_file, tmp_response_filename = mkstemp(dir=tmp_folder)
tmp_error_file, tmp_error_filename = mkstemp(dir=tmp_folder)
expression_query = self.__get_expression_query(expression, new_segments)
check_call(['ddgr',
'-n', str(self.min_hits),
'--json',
expression_query], stdout=tmp_response_file, stderr=tmp_error_file)
os.close(tmp_response_file)
response_reader = codecs.open(tmp_response_filename, 'rt', encoding='utf-8')
os.close(tmp_error_file)
error_reader = codecs.open(tmp_error_filename, 'rt', encoding='utf-8')
for line in error_reader:
raise RuntimeError('Error: 403')
linecount = 0
for line in response_reader:
linecount += 1
if linecount > 5*self.min_hits:
return True
return False
def expression_responses_count(self, expression, new_segments=[]):
tmp_folder = mkdtemp()
tmp_response_file, tmp_response_filename = mkstemp(dir=tmp_folder)
tmp_error_file, tmp_error_filename = mkstemp(dir=tmp_folder)
expression_query = self.__get_expression_query(expression, new_segments)
check_call(['ddgr',
'-n', str(self.min_hits),
'--json',
expression_query], stdout=tmp_response_file, stderr=tmp_error_file)
os.close(tmp_response_file)
response_reader = codecs.open(tmp_response_filename, 'rt', encoding='utf-8')
os.close(tmp_error_file)
error_reader = codecs.open(tmp_error_filename, 'rt', encoding='utf-8')
for line in error_reader:
raise RuntimeError('Error: 403')
linecount = 0
for line in response_reader:
linecount += 1
return int(linecount/5)
def check_entry(self, catchword, definition):
tmp_folder = mkdtemp()
tmp_response_file, tmp_response_filename = mkstemp(dir=tmp_folder)
tmp_error_file, tmp_error_filename = mkstemp(dir=tmp_folder)
catchword_query = self.__get_expression_query(catchword)
definition_query = self.__get_expression_query(definition)
check_call(['ddgr',
'-n', str(self.min_hits),
'--json',
catchword_query, definition_query], stdout=tmp_response_file, stderr=tmp_error_file)
os.close(tmp_response_file)
response_reader = codecs.open(tmp_response_filename, 'rt', encoding='utf-8')
os.close(tmp_error_file)
error_reader = codecs.open(tmp_error_filename, 'rt', encoding='utf-8')
for line in error_reader:
raise RuntimeError('Error: 403')
linecount = 0
for line in response_reader:
linecount += 1
if linecount > 5*self.min_hits:
return True
return False
def entry_responses_count(self, catchword, definition):
tmp_folder = mkdtemp()
tmp_response_file, tmp_response_filename = mkstemp(dir=tmp_folder)
tmp_error_file, tmp_error_filename = mkstemp(dir=tmp_folder)
catchword_query = self.__get_expression_query(catchword)
definition_query = self.__get_expression_query(definition)
check_call(['ddgr',
'-n', str(self.min_hits),
'--json',
catchword_query, definition_query], stdout=tmp_response_file, stderr=tmp_error_file)
os.close(tmp_response_file)
response_reader = codecs.open(tmp_response_filename, 'rt', encoding='utf-8')
os.close(tmp_error_file)
error_reader = codecs.open(tmp_error_filename, 'rt', encoding='utf-8')
for line in error_reader:
raise RuntimeError('Error: 403')
linecount = 0
for line in response_reader:
linecount += 1
return int(linecount/5)
def __get_expression_query(self, expression, new_segments=[]):
expr = ''
expr_segments = expression.segments.order_by('position_in_expr')
if not new_segments:
new_segments = [seg.orth for seg in expr_segments]
for expr_seg, new_seg in itertools.izip(expr_segments, new_segments):
if new_seg is None:
continue
orth = new_seg
if expr_seg.ctag == 'interp' and expr_seg.orth != '"':
orth = u'\\%s' % orth
if expr_seg.has_nps:
expr += orth
else:
expr += ' %s' % orth
return u'\"' + expr.lstrip() + u'\"'