duckduckgo.py
2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import codecs
import itertools
import os
from tempfile import mkdtemp, mkstemp
from subprocess import check_call
import settings
class DuckDuckGo:
def __init__(self, min_hits=settings.DUCKDUCKGO_MIN_HITS):
self.min_hits = min_hits
def check_expression(self, expression, new_segments=[]):
tmp_folder = mkdtemp()
tmp_response_file, tmp_response_filename = mkstemp(dir=tmp_folder)
tmp_error_file, tmp_error_filename = mkstemp(dir=tmp_folder)
expression_query = self.__get_expression_query(expression, new_segments)
check_call(['ddgr',
'-n', str(self.min_hits),
'--json',
expression_query], stdout=tmp_response_file, stderr=tmp_error_file)
os.close(tmp_response_file)
response_reader = codecs.open(tmp_response_filename, 'rt', encoding='utf-8')
os.close(tmp_error_file)
error_reader = codecs.open(tmp_error_filename, 'rt', encoding='utf-8')
for line in error_reader:
raise RuntimeError('Error: 403')
linecount = 0
for line in response_reader:
linecount += 1
if linecount > 5*self.min_hits:
return True
return False
def check_entry(self, catchword, definition):
tmp_folder = mkdtemp()
tmp_response_file, tmp_response_filename = mkstemp(dir=tmp_folder)
tmp_error_file, tmp_error_filename = mkstemp(dir=tmp_folder)
catchword_query = self.__get_expression_query(catchword)
definition_query = self.__get_expression_query(definition)
check_call(['ddgr',
'-n', str(self.min_hits),
'--json',
catchword_query, definition_query], stdout=tmp_response_file, stderr=tmp_error_file)
os.close(tmp_response_file)
response_reader = codecs.open(tmp_response_filename, 'rt', encoding='utf-8')
os.close(tmp_error_file)
error_reader = codecs.open(tmp_error_filename, 'rt', encoding='utf-8')
for line in error_reader:
raise RuntimeError('Error: 403')
linecount = 0
for line in response_reader:
linecount += 1
if linecount > 5*self.min_hits:
return True
return False
def __get_expression_query(self, expression, new_segments=[]):
expr = ''
expr_segments = expression.segments.order_by('position_in_expr')
if not new_segments:
new_segments = [seg.orth for seg in expr_segments]
for expr_seg, new_seg in itertools.izip(expr_segments, new_segments):
if new_seg is None:
continue
orth = new_seg
if expr_seg.ctag == 'interp' and expr_seg.orth != '"':
orth = u'\\%s' % orth
if expr_seg.has_nps:
expr += orth
else:
expr += ' %s' % orth
return u'\"' + expr.lstrip() + u'\"'