monitor_users.py
6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import time
import sys
import json
import datetime
import oauth2
from pymongo import MongoClient
def load_users(user_file, tweetsDB):
user_id2last_tweet_id = {}
with open(user_file, 'r') as f:
for line in f:
spl = line.strip().split("\t")
user_id = spl[0]
user_id2last_tweet_id[user_id] = "1"
try:
last_tweet = tweetsDB.find({"user.id_str": user_id}).sort("created_at_mongo", -1).limit(1)
if last_tweet.count(True) == 1:
user_id2last_tweet_id[user_id] = last_tweet[0]['id_str']
except Exception as ex:
print ex
print "Unable to find last downloaded tweet id for user", user_id
return user_id2last_tweet_id
def load_tokens(path):
f = file(path, 'r')
for line in f.readlines():
if line.startswith("#"):
continue
parts = [x.strip() for x in line.split(",")]
(consumer_key, consumer_secret, auth_key, auth_secret) = parts
tokens = dict()
tokens["CLIENT_KEY"] = consumer_key
tokens["CLIENT_SECRET"] = consumer_secret
tokens["ATOKEN_KEY"] = auth_key
tokens["ATOKEN_SECRET"] = auth_secret
break # assume first token line needed
return tokens
def request(url):
try:
consumer = oauth2.Consumer(key=tokens["CLIENT_KEY"], secret=tokens["CLIENT_SECRET"])
token = oauth2.Token(key=tokens["ATOKEN_KEY"], secret=tokens["ATOKEN_SECRET"])
client = oauth2.Client(consumer, token)
resp, content = client.request(url, method="GET")
except Exception:
resp = {}
resp['status'] = 404
content = '[]'
return resp, content
def fetch_tweets(user_id, sid, max_id):
end = False
tweets = []
error_count = 0
while error_count < MAX_ERRORS_PER_REQUEST:
if max_id is not None:
r, c = request(
"https://api.twitter.com/1.1/statuses/user_timeline.json?user_id={0}&count={1}&since_id={2}&max_id={3}"
.format(user_id, str(STEP), sid, max_id))
else:
r, c = request(
'https://api.twitter.com/1.1/statuses/user_timeline.json?user_id={0}&count={1}&since_id={2}'
.format(user_id, str(STEP), sid))
status = int(r['status'])
if status == 200:
tweets = json.loads(c)
break
elif status == 401:
print '\tUser %s protects his tweets' % user_id
end = True
break
elif status == 404:
print '\tPage not found for user %s' % user_id
end = True
break
elif status == 429:
wait_time = 60
r, c = request('https://api.twitter.com/1.1/application/rate_limit_status.json')
try:
rstatus = json.loads(c)
remaining = int(rstatus['resources']['statuses']['/statuses/user_timeline']['remaining'])
if remaining == 0:
now = time.time()
reset = int(rstatus['resources']['statuses']['/statuses/user_timeline']['reset'])
wait_time = reset - now + 1
except:
pass
print '\tRate limit reached, waiting %i seconds' % wait_time
time.sleep(wait_time)
elif status == 400:
print '\tError %i, waiting 60 seconds' % status
time.sleep(60)
error_count += 1
else:
print '\tError %i, waiting %i seconds' % (status, WAIT_PERIOD)
time.sleep(WAIT_PERIOD)
error_count += 1
result = []
for tweet in tweets:
if "created_at" not in tweet \
or "user" not in tweet \
or "id_str" not in tweet \
or "id_str" not in tweet["user"] \
or "screen_name" not in tweet["user"]:
continue
if tweet["user"]["id_str"] != user_id:
print "\tDowloaded tweet user id different than requested! Skipping user id."
result = []
break
result.append(tweet)
if len(result) == 0:
end = True
else:
print '\t%s has %i new tweets' % (user_id, len(result))
return end, result
# ###################### MAIN PROGRAM ###########################
if len(sys.argv) != 4:
print "Wrong number of arguments! Try: python", sys.argv[0], "user_file", "twittertokens", "mongodb_params"
sys.exit(1)
USER_FILE = sys.argv[1]
TWITTERTOKENS = sys.argv[2]
client = MongoClient()
spl = sys.argv[3].split(":")
tweetsDB = client[spl[0]][spl[1]]
tokens = load_tokens(TWITTERTOKENS)
user_id2last_tweet_id = load_users(USER_FILE, tweetsDB)
WAIT_PERIOD = 2 # time until retry for a failed Twitter API call
STEP = 200 # number of tweets retrieved per call; should always be 200 (maximum)
MAX_ERRORS_PER_REQUEST = 5
start = datetime.datetime.now()
print start, "Starting collecting tweets"
i = 1
total_new = 0
not_saved_in_db = 0
exceptions = []
for user_id in user_id2last_tweet_id:
print "User", i, "out of", len(user_id2last_tweet_id), ". User id:", user_id
i += 1
newest_saved_sid = user_id2last_tweet_id[user_id]
end, tweets = fetch_tweets(user_id, newest_saved_sid, None)
if end:
continue
tlist = tweets
oldest_current_id = tweets[len(tweets) - 1]['id_str']
while long(oldest_current_id) > long(newest_saved_sid):
end, tweets = fetch_tweets(user_id, newest_saved_sid, long(oldest_current_id) - 1)
if end:
break
oldest_current_id = tweets[len(tweets) - 1]['id_str']
tlist += tweets
# process the tweets we got in reverse order so that we maintain the #order of timestamps
tlist.reverse()
for tweet in tlist:
try:
created_at = tweet["created_at"]
dt = datetime.datetime.strptime(created_at, '%a %b %d %H:%M:%S +0000 %Y')
tweet["created_at_mongo"] = dt
if "id" in tweet:
del tweet["id"]
if "id" in tweet["user"]:
del tweet["user"]["id"]
tweetsDB.insert(tweet)
except Exception as ex:
not_saved_in_db += 1
exceptions.append(ex)
total_new += len(tlist)
end = datetime.datetime.now()
print end, "Done collecting", total_new, "tweets."
print "Took:", end - start
print not_saved_in_db, "not saved in db:"
for ex in exceptions:
print ex