The data being used for UpStats comes straight from the UpWork API. In this blog post I'll describe how I recovered some timestamps that had a wrong timezone. There are two main topics here, one is the design of a small data lake, and the other is a data migration.

Among other things, the UpWork API, provides job postings with a date_created attribute.

People seem to agree on at least two valid ways of storing time data:

What I did was something in between. I somehow managed to store wrong UNIX timestamps thinking they were UTC when in fact they weren't.

In order to solve this, I had planned to rewrite the logic that converts date_created -> UTC UNIX epoch like this:

  1. Making sure I have things in place to get it from the string datetime format to a UTC UNIX timestamp
  2. Writing code to convert all date_created to UTC in the DB by digging into the odesk_debug_req table
  3. Updating collector with the new logic
  4. Updating the Telegram bot with the new logic
  5. Change the frontend code for /#search/jobs to interpret the timestamp correctly and convert it to local time

Now a bit about the odesk_debug_req table

column name column type
id integer
added_time integer
sha character varying(100)
data BYTEA
query character varying(400)

In the column data I would store compressed JSON structures of the UpWork API responses but sorted by keys(Python ships with a gzip compression library called zlib, which I'm using to compress/uncompress these). The reason for this was that I also wanted to compute a SHA-1 before putting them in the table. I wanted this SHA-1 to be unique across the table and not be influenced by the order of keys in the JSON data(which is why I needed to have the keys sorted).

This would ensure that I could easily check if I had already received a certain response, and consider it a duplicate and not store in the table. In case of a duplicate I would store a row in this table, with the SHA-1 but with the data column empty. Because there's also an added_time column(which indicates when the row was placed in the database), I can easily see when it was added, even for the duplicate ones that have no data.

If I bump into a situation where I actually want to see the data but the data column is empty, it would simply mean that this is a duplicate, and I'd have to look for a previous row with the same SHA-1 that had a non-empty data column and I would be able to easily retrieve the data.

I've used the following Python function to get a timestamp of the form 2015-10-11T00:26:51+0100 and convert it to a UTC UNIX timestamp.

def convert_utc_timestamp(s):
    ## this function does the following
    ## separates timezone information out
    ## 
    ## parses datetime string and converts to
    ## a UNIX timestamp then adjusts the timestamp with the
    ## timezone information to make it UTC

    # parse the hour and minute from the timezone
    m = re.search('([\+\-])(\d{2})(\d{2})$',s)
    m = m.groups()

    # discard the timezone
    s = re.sub('([\+\-]\d{4})$','',s)
    t = time.strptime(s,"%Y-%m-%dT%H:%M:%S")

    # get a UNIX timestamp
    ts = int(calendar.timegm(t))

    # apply timezone offset to get a UTC UNIX timestamp
    if m[0] == '+':
        a = int(m[1])
        b = int(m[2])
        ts -= (a*3600) + (b*60)
    elif m[0] == '-':
        a = int(m[1])
        b = int(m[2])
        ts += (a*3600) + (b*60)

    # return a UTC UNIX timestamp
    return ts

Then I was able to actually start doing the data migration which is, go through the uncompressed JSON data, identify the jobs, look them up in the DB and update them with the right date_created values.

While doing this, another problem came up as the DigitalOcean I was doing this on only had 500MB of memory. The code for the data migration seemed to take 2GB of memory locally and I was a bit surprised by such a high memory usage. My first guess was that sqlalchemy was fetching all rows from the odesk_debug_req table and keeping them in memory throughout execution.

Luckily I found the yield_per(N) sqlalchemy method (which is also mentioned in the performance section of the sqlalchemy documentation) which retrieves chunks of N rows at a time and was able to take down memory usage to a more manageable 24.5 MB.

class DataMigration(object):

    # constructor receives database session
    def __init__(self, session):
        self.session = session

    # processing all jobs that are present in the json
    # passed as a parameter(updating their date_created column
    # along the way)
    def process_jobs(self, req_id, j):
        for job in j:
           job_id = job["id"]
           if job_id:
               date_created = job["date_created"]
               db_date_created = convert_utc_timestamp(date_created)
               self.session.query(OdeskJob).filter(OdeskJob.job_id == job_id)\
                       .update({OdeskJob.date_created: db_date_created})


    def update(self):

        results = self.session \
                .query(OdeskDebugReq.id, OdeskDebugReq.data) \
                .join(OdeskJob, OdeskJob.req_id == OdeskDebugReq.id) \
                .distinct(OdeskDebugReq.id) \
                .order_by(OdeskDebugReq.id)

        for r in results.yield_per(5):
            j = None
            j_raw = zlib.decompress(r.data)

            if j_raw:
                try:
                    j = json.loads(j_raw)
                except Exception, e:
                    continue
            else:
                continue

            req_id = r.id
            self.process_jobs(req_id, j)

        self.session.commit()

In the web UI, I've used the moment.js Javascript library which is capable of manipulating time data easily. What I did here was I got a UTC UNIX timestamp from the backend, parsed it, told momentjs that it was UTC through the utc() method, then converted it to local with local() and finally formatted it to a readable datetime string.

var localDateTime = moment(val[i].date_created,'X').utc().local().format('YYYY-MM-DD HH:MM');

Now, about the UpStatsBot on Telegram, this has some cool advantages for it as well. The main feature of that bot is to provide notifications when new jobs pop up. Because the datetime data is now reliable, every time it searches for new jobs, it can tighten up the range(the lower bound to be exact) on which it searches for new jobs, thereby running much faster.

When I built this, I didn't really know it was called a data lake. I'm pretty sure the term is used for large scale databases, I'm not sure if mine actually qualifies (It is similar though, it's a data store with unstructured data). If we're at it, let's throw some numbers in. The database currently has 166,743 job postings with full details, and 370,393 with partial details.

A Data lake is a large storage repository that "holds data until it is needed". [..] As of 2015, data lakes could be described as "one of the more controversial ways to manage big data"

This way of storing data also comes with an advantage. It's an advantage in low disk space situations(like mine, only 20GB of disk space on DigitalOcean).

After some time of collecting data I can do the following:

  • copy some of the older data to a different machine with more space and re-import it in the local database
  • discard old data on the low-space box but keep SHA1-s in place
  • run VACUUM FULL to resize the DB to only take up the space it needs
  • let the collection process continue and get new data

This allows to keep collecting data and periodically move some of them to a different place and then continue the process.

Another problem I have which is not solved yet is that of replicating the data to another machine. I tried using pgbackrest but due to lack of knowledge around WAL and such things, I wasn't able to get it to work. My usecase was an incremental backup of the the DB. However, what I can do is use only the odesk_debug_req unstructured data table, and only pull the new rows from it, and adapt the collector such that it only runs on it(instead of the API like it does now). This will allow to get only the new rows and then let the collector put everything in each table as necessary. This is another advantage of having that unstructured data table.

Stay tuned for more by subscribing to the RSS feed.