monitor_users.py 6.57 KB
import time
import sys
import json
import datetime
import oauth2
from pymongo import MongoClient


def load_users(user_file, tweetsDB):
    user_id2last_tweet_id = {}

    with open(user_file, 'r') as f:
        for line in f:
            spl = line.strip().split("\t")
            user_id = spl[0]
            user_id2last_tweet_id[user_id] = "1"

            try:
                last_tweet = tweetsDB.find({"user.id_str": user_id}).sort("created_at_mongo", -1).limit(1)
                if last_tweet.count(True) == 1:
                    user_id2last_tweet_id[user_id] = last_tweet[0]['id_str']

            except Exception as ex:
                print ex
                print "Unable to find last downloaded tweet id for user", user_id

    return user_id2last_tweet_id


def load_tokens(path):
    f = file(path, 'r')
    for line in f.readlines():
        if line.startswith("#"):
            continue
        parts = [x.strip() for x in line.split(",")]
        (consumer_key, consumer_secret, auth_key, auth_secret) = parts
        tokens = dict()
        tokens["CLIENT_KEY"] = consumer_key
        tokens["CLIENT_SECRET"] = consumer_secret
        tokens["ATOKEN_KEY"] = auth_key
        tokens["ATOKEN_SECRET"] = auth_secret
        break  # assume first token line needed

    return tokens


def request(url):
    try:
        consumer = oauth2.Consumer(key=tokens["CLIENT_KEY"], secret=tokens["CLIENT_SECRET"])
        token = oauth2.Token(key=tokens["ATOKEN_KEY"], secret=tokens["ATOKEN_SECRET"])
        client = oauth2.Client(consumer, token)
        resp, content = client.request(url, method="GET")
    except Exception:
        resp = {}
        resp['status'] = 404
        content = '[]'
    return resp, content


def fetch_tweets(user_id, sid, max_id):
    end = False
    tweets = []
    error_count = 0
    while error_count < MAX_ERRORS_PER_REQUEST:

        if max_id is not None:
            r, c = request(
                "https://api.twitter.com/1.1/statuses/user_timeline.json?user_id={0}&count={1}&since_id={2}&max_id={3}"
                .format(user_id, str(STEP), sid, max_id))
        else:
            r, c = request(
                'https://api.twitter.com/1.1/statuses/user_timeline.json?user_id={0}&count={1}&since_id={2}'
                .format(user_id, str(STEP), sid))

        status = int(r['status'])
        if status == 200:
            tweets = json.loads(c)
            break

        elif status == 401:
            print '\tUser %s protects his tweets' % user_id
            end = True
            break

        elif status == 404:
            print '\tPage not found for user %s' % user_id
            end = True
            break

        elif status == 429:
            wait_time = 60
            r, c = request('https://api.twitter.com/1.1/application/rate_limit_status.json')
            try:
                rstatus = json.loads(c)
                remaining = int(rstatus['resources']['statuses']['/statuses/user_timeline']['remaining'])
                if remaining == 0:
                    now = time.time()
                    reset = int(rstatus['resources']['statuses']['/statuses/user_timeline']['reset'])
                    wait_time = reset - now + 1
            except:
                pass
            print '\tRate limit reached, waiting %i seconds' % wait_time
            time.sleep(wait_time)

        elif status == 400:
            print '\tError %i, waiting 60 seconds' % status
            time.sleep(60)
            error_count += 1

        else:
            print '\tError %i, waiting %i seconds' % (status, WAIT_PERIOD)
            time.sleep(WAIT_PERIOD)
            error_count += 1

    result = []
    for tweet in tweets:
        if "created_at" not in tweet \
                or "user" not in tweet \
                or "id_str" not in tweet \
                or "id_str" not in tweet["user"] \
                or "screen_name" not in tweet["user"]:
            continue
        if tweet["user"]["id_str"] != user_id:
            print "\tDowloaded tweet user id different than requested! Skipping user id."
            result = []
            break

        result.append(tweet)

    if len(result) == 0:
        end = True
    else:
        print '\t%s has %i new tweets' % (user_id, len(result))

    return end, result

# ###################### MAIN PROGRAM ###########################
if len(sys.argv) != 4:
    print "Wrong number of arguments! Try: python", sys.argv[0], "user_file", "twittertokens", "mongodb_params"
    sys.exit(1)

USER_FILE = sys.argv[1]
TWITTERTOKENS = sys.argv[2]
client = MongoClient()
spl = sys.argv[3].split(":")
tweetsDB = client[spl[0]][spl[1]]

tokens = load_tokens(TWITTERTOKENS)
user_id2last_tweet_id = load_users(USER_FILE, tweetsDB)

WAIT_PERIOD = 2  # time until retry for a failed Twitter API call
STEP = 200  # number of tweets retrieved per call; should always be 200 (maximum)
MAX_ERRORS_PER_REQUEST = 5

start = datetime.datetime.now()
print start, "Starting collecting tweets"

i = 1
total_new = 0
not_saved_in_db = 0
exceptions = []
for user_id in user_id2last_tweet_id:
    print "User", i, "out of", len(user_id2last_tweet_id), ". User id:", user_id
    i += 1

    newest_saved_sid = user_id2last_tweet_id[user_id]
    end, tweets = fetch_tweets(user_id, newest_saved_sid, None)
    if end:
        continue

    tlist = tweets
    oldest_current_id = tweets[len(tweets) - 1]['id_str']
    while long(oldest_current_id) > long(newest_saved_sid):
        end, tweets = fetch_tweets(user_id, newest_saved_sid, long(oldest_current_id) - 1)
        if end:
            break
        oldest_current_id = tweets[len(tweets) - 1]['id_str']
        tlist += tweets

    # process the tweets we got in reverse order so that we maintain the #order of timestamps
    tlist.reverse()
    for tweet in tlist:
        try:
            created_at = tweet["created_at"]
            dt = datetime.datetime.strptime(created_at, '%a %b %d %H:%M:%S +0000 %Y')
            tweet["created_at_mongo"] = dt

            if "id" in tweet:
                del tweet["id"]
            if "id" in tweet["user"]:
                del tweet["user"]["id"]

            tweetsDB.insert(tweet)
        except Exception as ex:
            not_saved_in_db += 1
            exceptions.append(ex)

    total_new += len(tlist)

end = datetime.datetime.now()
print end, "Done collecting", total_new, "tweets."
print "Took:", end - start
print not_saved_in_db, "not saved in db:"
for ex in exceptions:
    print ex