Using Digital Trace Data in the Social Sciences, University of Konstanz (Summer 2018)

Instructor: Andreas Jungherr

Week 8: Loading Twitter Data Into a Database

During this session, we will learn how to load data downloaded on Twitter into a database. Loading tweets into a database will create a little overhead in work at the beginning but trust me, this will more then pay off for you further down the line when you are working with large data sets.

In this session, we will discuss the workings of the file database.py provided in twitterresearch accompanying the tutorial. The script will load downloaded tweets into a SQLite database object and establish a predefined database structure allowing you a set of typical analytical approaches to Twitter data. The script uses peewee to interact with SQLite from Python.

To retrace your steps after the session have a look at pages 29-41 in the tutorial. Also, have a look at the example code below.

Starting out working with SQL can be challenging. So for this part of the course it is even more advisable than for the other elements to spend some time to familiarize yourself with the language beyond the confines of the course. So make sure to have a look at the following introductory courses:

Code Examples:

In this session, we will learn how to load downloaded tweets into a database to allow for more flexbible analyses later on.

First, let’s point the command line to your working directory for this project

cd "/Users/().../twitterresearch"

First, let’s get some data.

Start iPython and collect all available tweets of a user. In our example case, we collect available messages posted by US law professor Lawrence Lessig.

ipython
import examples
examples.save_user_archive_to_database()

Now, you should find a new file in your working directory called tweets.db. The file contains all available tweets posted by Lawrence Lessig saved in database ready formatting.

Of course you are able to change the user or users you want to track by adapting the function save_user_archive_to_database() in the file examples.py according to your interests. There you can also change the name of the file into something to your liking. Doing so is advisable when you adapt our scripts.

In a next step, we have to load the file tweets.db into a SQLite database. We do so through running our script database.py included in our twitterresearch script set.

A Closer look at database.py

Before we run the script database.py, let’s have a look at its workings in detail.

We start by importing necessary modules

import logging
import datetime
from dateutil import parser
from pytz import utc
import peewee
from playhouse.fields import ManyToManyField

In a next step, we load the file tweets.db as a SQLite database to the object db. We will interact with SQLite directly from Python through the tool peewee which you downloaded with other modules through our file requirements.txt.

For more information on pewee see its documenation.

For more information on SQLite see its documenation or

Back to the script:

db = peewee.SqliteDatabase("tweets.db", threadlocals=True)
db.connect()

After connecting to the database, we have define fields in our database. Or in other words, define database models. Our script will take to most of the work for you but let’s have a close read through to understand better what happens here:

We start by setting a base model for the database

class BaseModel(peewee.Model):

    """
    Base model for setting the database to use
    """
    class Meta:
        database = db

Now we use a selection of the fields provided by Twitter through its API to create database fields that will allow us to directly query or summarize the values contained in these field.

Here we define the field Hashtag:

class Hashtag(BaseModel):

    """
    Hashtag model.
    """
    tag = peewee.CharField(unique=True, primary_key=True)

Here we define the field URL:

class URL(BaseModel):

    """
    URL model.
    """
    url = peewee.CharField(unique=True, primary_key=True)

Here we define the field User:

class User(BaseModel):

    """
    Twitter user model.
    Stores the user's unique ID as a primary key along with the username.
    """
    id = peewee.BigIntegerField(unique=True, primary_key=True)
    username = peewee.CharField(null=True)

    def last_tweet(self):
        return Tweet.select().where(Tweet.user == self).order_by(Tweet.id.desc())[0]

    def first_tweet(self):
        return Tweet.select().where(Tweet.user == self).order_by(Tweet.id.asc())[0]

Here we define the field Tweet:

class Tweet(BaseModel):

    """
    Tweet model.
    Stores the tweet's unique ID as a primary key along with the user, text and date.
    """
    id = peewee.BigIntegerField(unique=True, primary_key=True)
    user = peewee.ForeignKeyField(User, related_name='tweets', index=True)
    text = peewee.TextField()
    date = peewee.DateTimeField(index=True)
    tags = ManyToManyField(Hashtag)
    urls = ManyToManyField(URL)
    mentions = ManyToManyField(User)
    reply_to_user = peewee.ForeignKeyField(
        User, null=True, index=True, related_name='replies')
    reply_to_tweet = peewee.BigIntegerField(null=True, index=True)
    retweet = peewee.ForeignKeyField(
        'self', null=True, index=True, related_name='retweets')

For the detailed workings of these database models make sure to check out the documentations of SQLite and peewee.

Now, we define a series of helper functions, that facilitate the loading of the data to the database:

def deduplicate_lowercase(l):
    """
    Helper function that performs two things:
    - Converts everything in the list to lower case
    - Deduplicates the list by converting it into a set and back to a list
    """
    lowercase = [e.lower() for e in l]
    deduplicated = list(set(lowercase))
    return deduplicated


def create_user_from_tweet(tweet):
    """
    Function for creating a database entry for
    one user using the information contained within a tweet

    :param tweet:
    :type tweet: dictionary from a parsed tweet
    :returns: database user object
    """
    user, created = User.get_or_create(
        id=tweet['user']['id'],
        defaults={'username': tweet['user']['screen_name']},
    )
    return user


def create_hashtags_from_entities(entities):
    """
    Attention: Casts tags into lower case!
    Function for creating database entries for
    hashtags using the information contained within entities

    :param entities:
    :type entities: dictionary from a parsed tweet's "entities" key
    :returns: list of database hashtag objects
    """
    tags = [h["text"] for h in entities["hashtags"]]
    # Deduplicate tags since they may be used multiple times per tweet
    tags = deduplicate_lowercase(tags)
    db_tags = []
    for h in tags:
        tag, created = Hashtag.get_or_create(tag=h)
        db_tags.append(tag)
    return db_tags


def create_urls_from_entities(entities):
    """
    Attention: Casts urls into lower case!
    Function for creating database entries for
    urls using the information contained within entities

    :param entities:
    :type entities: dictionary from a parsed tweet's "entities" key
    :returns: list of database url objects
    """
    urls = [u["expanded_url"] for u in entities["urls"]]
    urls = deduplicate_lowercase(urls)
    db_urls = []
    for u in urls:
        url, created = URL.get_or_create(url=u)
        db_urls.append(url)
    return db_urls


def create_users_from_entities(entities):
    """
    Function for creating database entries for
    users using the information contained within entities

    :param entities:
    :type entities: dictionary from a parsed tweet's "entities" key
    :returns: list of database user objects
    """
    users = [(u["id"], u["screen_name"]) for u in entities["user_mentions"]]
    users = list(set(users))
    db_users = []
    for id, name in users:
        user, created = User.get_or_create(
            id=id,
            defaults={'username': name},
        )
        db_users.append(user)
    return db_users


def create_tweet_from_dict(tweet, user=None):
    """
    Function for creating a tweet and all related information as database entries
    from a dictionary (that's the result of parsed json)
    This does not do any deduplication, i.e. there is no check whether the tweet is
    already present in the database. If it is, there will be an UNIQUE CONSTRAINT exception.

    :param tweet:
    :type tweet: dictionary from a parsed tweet
    :returns: bool success
    """
    # If the user isn't stored in the database yet, we
    # need to create it now so that tweets can reference her/him
    try:
        if not user:
            user = create_user_from_tweet(tweet)
        tags = create_hashtags_from_entities(tweet["entities"])
        urls = create_urls_from_entities(tweet["entities"])
        mentions = create_users_from_entities(tweet["entities"])
        # Create new database entry for this tweet
        t = Tweet.create(
            id=tweet['id'],
            user=user,
            text=tweet['text'],
            # We are parsing Twitter's date format using a "magic" parser from the python-dateutil package
            # The resulting datetime object has timezone information attached.
            # However, since SQLite cannot store timezones, that information is stripped away.
            # If you use PostgreSQL instead, please refer to the DateTimeTZField in peewee
            # and remove the "strftime" call here
            date=parser.parse(tweet['created_at']).strftime(
                "%Y-%m-%d %H:%M:%S"),
        )
        if tags:
            t.tags = tags
        if urls:
            t.urls = urls
        if mentions:
            t.mentions = mentions
        if tweet["in_reply_to_user_id"]:
            # Create a mock user dict so we can re-use create_user_from_tweet
            reply_to_user_dict = {"user":
                                  {'id': tweet['in_reply_to_user_id'],
                                   'screen_name': tweet['in_reply_to_screen_name'],
                                   }}
            reply_to_user = create_user_from_tweet(reply_to_user_dict)
            t.reply_to_user = reply_to_user
            t.reply_to_tweet = tweet['in_reply_to_status_id']
        if 'retweeted_status' in tweet:
            retweet = create_tweet_from_dict(tweet['retweeted_status'])
            t.retweet = retweet
        t.save()
        return t
    except peewee.IntegrityError as exc:
        logging.error(exc)
        return False

Next, these are a series of helper functions allowing us to create summary statistics for information contained in our database field.

def database_counts():
    """
    Generate counts for objects in the database.

    :returns: dictionary with counts
    """
    return {
        "tweets": Tweet.select().count(),
        "hashtags": Hashtag.select().count(),
        "urls": URL.select().count(),
        "users": User.select().count(),
    }


def mention_counts(start_date, stop_date):
    """
    Perform an SQL query that returns users sorted by mention count.
    Users are returned as database objects in decreasing order.
    The mention count is available as ".count" attribute.
    """
    # First we get the Table that sits between Tweets and Users
    mentions = Tweet.mentions.get_through_model()
    # The query - note the Count statement creating our count variable
    users = (User.select(User, peewee.fn.Count(mentions.id).alias('count'))
             # join in the intermediary table
             .join(mentions)
             # join in the tweets
             .join(Tweet, on=(mentions.tweet == Tweet.id))
             # filter by date
             .where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date))
             # group by user to eliminate duplicates
             .group_by(User)
             # sort by tweetcount
             .order_by(
        peewee.fn.Count(mentions.tweet).desc())
    )
    return users


def url_counts(start_date, stop_date):
    """
    Perform an SQL query that returns URLs sorted by mention count.
    URLs are returned as database objects in decreasing order.
    The mention count is available as ".count" attribute.
    """
    urlmentions = Tweet.urls.get_through_model()
    urls = (URL.select(URL, peewee.fn.Count(urlmentions.id).alias('count'))
            .join(urlmentions)
            .join(Tweet, on=(urlmentions.tweet == Tweet.id))
            .where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date))
            .group_by(URL)
            .order_by(peewee.fn.Count(urlmentions.tweet).desc())
            )
    return urls


def hashtag_counts(start_date, stop_date):
    """
    Perform an SQL query that returns hashtags sorted by mention count.
    Hashtags are returned as database objects in decreasing order.
    The mention count is available as ".count" attribute.
    """
    hashtagmentions = Tweet.tags.get_through_model()
    hashtags = (Hashtag.select(Hashtag, peewee.fn.Count(hashtagmentions.id).alias('count'))
                .join(hashtagmentions)
                .join(Tweet, on=(hashtagmentions.tweet == Tweet.id))
                .where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date))
                .group_by(Hashtag)
                .order_by(peewee.fn.Count(hashtagmentions.tweet).desc())
                )
    return hashtags


def retweet_counts(start_date, stop_date, n=50):
    """
    Find most retweeted users.
    Instead of performing a rather complex SQL query, we do this in more
    readable python code. It may take a few minutes to complete.

    It's possible to pass in a premade query that will be used as the
    baseline for retweet counts (for example in order to limit
    date ranges).
    """
    from collections import Counter
    rt = Tweet.alias()
    rtu = User.alias()
    baseline = (Tweet.select().
                where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date))
                )
    query = (baseline.select(Tweet.id, rt.id, rtu.id)
             .join(rt, on=(Tweet.retweet == rt.id))
             .join(rtu, on=(rt.user == rtu.id))
             )
    c = Counter(
                (tweet.retweet.user.id for tweet in query)
               )
    # We use an ordered dict for the results so that the top results
    # appear first
    from collections import OrderedDict
    results = OrderedDict()
    for k, v in c.most_common(n):
        results[User.get(id=k).username] = v
    return results

Finally, we have here a series of helper functions supporting us in querying the data in our database:

def tweetcount_per_user():
    """
    This function executes a query that:
    - joins the User and Tweet tables so we can reason about their Relationship
    - groups by User so we have one result per User
    - counts tweets per user and stores the result in the name "count"
    - orders users by descending tweet count

    The users objects are accessible when looping over the query, such as:
    for user in query:
        print("{0}: {1}".format(user.username, user.count))

    Note that as always, this query can be augmented by appending further operations.
    """
    tweet_ct = peewee.fn.Count(Tweet.id)
    query = (User
             .select(User, tweet_ct.alias('count'))
             .join(Tweet)
             .group_by(User)
             .order_by(tweet_ct.desc(), User.username))
    return query


def first_tweet():
    """
    Find the first Tweet by date
    """
    return Tweet.select().order_by(Tweet.date.asc()).first()


def last_tweet():
    """
    Find the last Tweet by date
    """
    return Tweet.select().order_by(Tweet.date.desc()).first()


def to_utc(dt):
    """
    Helper function to return UTC-based datetime objects.
    If the input datetime object has any timezone information, it
    is converted to UTC. Otherwise, the datetime is taken as-is and
    only the timezone information UTC is added.
    """
    if dt.tzinfo is None:
        logging.warning(
            "You supplied a naive date/time object. Timezone cannot be guessed and is assumed to be UTC. If your date/time is NOT UTC, you will get wrong results!")
        return utc.localize(dt)
    else:
        return utc.normalize(dt)


def objects_by_interval(Obj, date_attr_name="date", interval="day", start_date=None, stop_date=None):
    """
    General helper function that returns objects by date intervals, mainly useful for counting.
    WARNING: If used as-is with SQLite, all date/times in data and queries are UTC-based!
    If you want to use local time for queries, take note that it will be converted correctly
    ONLY if you supply the correct timezone information. In general, as long as you only
    use timezone-aware objects you should be safe.

    :param obj:
    :type obj: database model
    :param date_attr_name:
    :type date_attr_name: the name of the filed containing date/time information on the model obj, as strings
    :param interval:
    :type interval: day (default), hour, minute as string.
    :param start_date:
    :type start_date: date to start from as datetime object, defaults to first date found.
    :param stop_date:
    :type stop_date: date to stop on as datetime object, defaults to last date found.
    :returns: bool success
    """
    # define intervals, then select the one given as a function argument
    interval = {
        "minute": datetime.timedelta(minutes=1),
        "hour": datetime.timedelta(hours=1),
        "day": datetime.timedelta(days=1),
    }.get(interval)
    date_field = getattr(Obj, date_attr_name)
    # Determine first object if no start_date given
    # Todo: Maybe prettify this humongous expression.
    start_date = start_date or MST.localize(datetime.datetime(2015, 10, 27, 0))
    stop_date = stop_date or MST.localize(datetime.datetime(2015, 11, 2, 23, 59))
    # If we wanted to use the first and last element instead of the pre-determined dates,
    # this would be the way to do it:
    # getattr(Obj.select().order_by(date_field.asc()).first(), date_attr_name)
    # getattr(Obj.select().order_by(date_field.desc()).first(), date_attr_name)
    # Ensure UTC times
    start_date = to_utc(start_date)
    stop_date = to_utc(stop_date)
    # This iteration code could be shorter, but using two dedicated variables
    # makes it more intuitive.
    interval_start = start_date
    interval_stop = start_date + interval
    # Notice that this loop stops BEFORE the interval extends beyond stop_date
    # This way, we may get intervals that do not reach stop_date, but on the other hand
    # we never get intervals that are not covered by the data.
    while interval_stop <= stop_date:
        query = Obj.select().where(
            date_field >= interval_start, date_field < interval_stop)
        # First yield the results, then step to the next interval
        yield ((interval_start, interval_stop), query)
        interval_start += interval
        interval_stop += interval

After setting up these basic database models and helper functions, we now have to actually set up the database tables. This needs to run at least once before using the db.

try:
    db.create_tables([Hashtag, URL, User, Tweet, Tweet.tags.get_through_model(
    ), Tweet.urls.get_through_model(), Tweet.mentions.get_through_model()])
except Exception as exc:
    logging.debug(
        "Database setup failed, probably already present: {0}".format(exc))

Loading and accessing the database

After examining the database.py script you are ready to run it and load Lawrence Lessig’s tweets in a database.

You can either call the script from the command line directly by typing

python database.py

If you rather want to call it from iPython open iPython and run the following command:

ipython
run database

After loading the database through SQLite let’s check what we can you do now?

Let’s examine the field data in the table Tweet

Tweet.date

Let’s count how many tweets there actually are in our database:

Tweet.select().count()

Let’s count how many users were mentioned in Lawrence Lessig’s tweets

User.select().count()

What were the tweets Lessig was mentioning Donald Trump?

for tweet in Tweet.select().where(Tweet.text.contains("Trump")):
    print(tweet.text)

Or let’s do the same more elegantly by defining a query as a variable:

query = Tweet.select().where(Tweet.text.contains("Trump"))

This offers new an interesting ways to interact with the query:

first_tweet = query.get()

first_tweet.text

If run into problems with regard to the encoding of characters try this command before loading data:

set PYTHONIOENCODING=utf-8

Preparation for the next session

Download set of tweets to work with, see the tutorial pp. 44ff.

Execute the following commands:

cd "/Users/().../twitterresearch"
ipython
import examples
examples.hydrate()

Required Readings:

Background Readings:

Additional Courses:

SQL:


Week 6Week 9

back