Intro

As previously mentioned, UpStatsBot is part of UpStats. It's a notification system in the form of a bot. You can get notifications about jobs while you're on the street or in a cafe.

This bot has been running for ~7 months now. And it was running quite well, except for one thing, the Postgres logs were growing a lot. Not long ago, I've analyzed PostgreSQL logs for 1 week using pgbadger. The results were startling:

As the image shows, the bot was responsible for more than 40% of all queries to that database. In addition, the Pg logs were growing quite a lot.

The bot would run queries to poll for new items every 10 minutes; those queries would run regardless if the collector had brought in new data since the last time the queries were run.

This blog post will describe how I fixed that issue using PostgreSQL's LISTEN/NOTIFY feature 1 , 2.

The main advantage of LISTEN/NOTIFY is the ability to receive near-realtime notifications with a fraction of the queries.

Overview of the UpStats project

The diagram above summarizes how UpStats and UpStatsBot work and how the system is composed:

  • A data collector
  • A PostgreSQL db
  • A web app
  • Metrics (that are recomputed periodically)
  • A notification system

A user can access it from the telegram bot or from the web app. The data is collected from the UpWork API, placed in a PostgreSQL database. Metrics can be computed(via more complex SQL queries) and notifications dispatched using the Telegram API.

We aim to move the logic responsible for notifications computation from the bot into the collector in order to realize near real-time dispatch (i.e. whenever new data becomes available).

Tables involved in notifications

The relevant tables here are:

  • odesk_job
  • odesk_search_job
  • odesk_telegram

In summary, we use search keywords from odesk_telegram and search for them in odesk_job via odesk_search_job. The odesk_search_job table holds full-text indexes for the jobs. The odesk_telegram table holds search keywords and active search streams for each subscribed user.

PosgreSQL's LISTEN/NOTIFY in Python

To offer some background, some use-cases of LISTEN/NOTIFY include:

  • using it in conjunction with websockets to build chat systems 3
  • building asynchronous and trigger-based replication systems 4
  • keeping caches in sync with a PostgreSQL database 5

We're using the psycopg2 connector 6 for PostgreSQL. The connector uses a socket to talk to the Postgres database server, and that socket has a file descriptor. That descriptor is used in the select call. Select checks if the descriptor is ready for reading 7 .

In order to exemplify this, we'll write a simple Python class that allows to send and listen to notifications 8 .

import select
import psycopg2
import psycopg2.extensions
from psycopg2.extensions import QuotedString
import json
import time

__doc__="""
This class is used to create an easy to use queue
mechanism where you can send and listen to messages.
"""

class PgQueue:
    dbuser = None
    dbpass = None
    dbname = None

    conn = None
    curs = None
    channel = None

    continue_recv = True

    def __init__(self,channel,dbname=None,dbuser=None,dbpass=None):
        """
        Connect to the database.
        If one of dbname, dbuser or dbpassword are not provided,
        the responsibility of providing (and setting a connection on
        this object) will fall on the calling code. 

        Otherwise, this will create a connection to the database.
        """
        self.dbname = dbname
        self.dbuser = dbuser
        self.dbpass = dbpass
        self.channel = channel

        if not channel:
            raise Exception('No channel provided')

        if dbname and dbuser and dbpass:
            # store connection
            self.conn = psycopg2.connect( \
                    'dbname={dbname} user={dbuser} password={dbpass} host=127.0.0.1'.format(\
                    dbname=dbname,dbuser=dbuser,dbpass=dbpass))
            # this is required mostly by the NOTIFY statement because it has
            # to commit after the query has been executed
            self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

    def recvLoop(self):
        """
        Loop that's concerned with receiving notifications
        """

        self.curs = self.conn.cursor()
        self.curs.execute("LISTEN {0};".format(self.channel))

        conn = self.conn
        curs = self.curs

        while self.continue_recv:
            if select.select([conn],[],[],6) == ([],[],[]):
                print "consumer: timeout"
            else:
                conn.poll()
                print "consumer: received messages"
                while conn.notifies:
                    notif = conn.notifies.pop(0)
                    # print "Got NOTIFY:", notif.pid, notif.channel, notif.payload
                    self.recvCallback(notif)

    def recvCallback(self, notification):
        """
        Needs to be implemented with notification handling logic
        """
        pass

    def send(self, data):
        """
        Send a notification
        """
        curs = self.conn.cursor()

        message = {}
        print "producer: sending.."
        # equip the message object with a timestamp
        message['time'] = time.time()
        message['data'] = data
        messageJson = json.dumps(message)
        messagePg = QuotedString(messageJson).getquoted()

        query = 'NOTIFY {0}, {1};'.format(self.channel, messagePg )
        print query
        curs.execute(query)

Now that we've implemented the class we can use it. The producer will be quite simple, and the consumer will need to either patch the notifyCallback method or subclass the PgQueue class to override the same method. We'll use the former, we'll patch the method. We'll run the producer in a thread and the consumer in a different thread.

def sample_producer_thread():
    q = PgQueue('botchan', dbname='dbname', dbuser='username', dbpass='password')

    while(True):
        time.sleep(0.4)
        message = {}
        message['test'] = "value"
        q.send(message)

def sample_consumer_thread():
    q = PgQueue('botchan', dbname='dbname', dbuser='username', dbpass='password')

    def newCallback(m):
        if m.payload:
            payload = m.payload
            print "callback: ", payload

    # replace the receiver callback
    q.recvCallback = newCallback
    q.recvLoop()

if __name__ == '__main__':
    import signal 
    from threading import Thread

    thread_producer = Thread(target=sample_producer_thread)
    thread_consumer = Thread(target=sample_consumer_thread)
    thread_producer.start()
    thread_consumer.start()
    thread_producer.join()
    thread_consumer.join()

Putting together user notifications

The regexes below are creating tsquery-compatible strings. Then those strings are used to run full-text searches on the job table. This way we can build notifications for each user and for each of their active search streams.

The last_job_ts is used to make sure we limit our searches to the new data.

We make use of wCTE (WITH common table expressions) because they're easy to work with and allow for gradually refining results of previous queries until the desired data can be extracted.

Near the end of the query we neatly pack all the data using PostgreSQL's JSON functions.

WITH user_notifs AS (
    SELECT
    id,
    last_job_ts,
    search,
    chat_id,
    regexp_replace(
           LOWER(
               regexp_replace(
                   rtrim(ltrim(search,' '),' '),
                   '\s+',' ','g'
               )
           ),
        '(\s*,\s*|\s)' , ' & ', 'g'
    )
    AS fts_query
    FROM odesk_telegram
    WHERE paused = false AND deleted = false
), jobs AS (
    SELECT A.job_id, A.tsv_basic, B.job_title, B.date_created
    FROM odesk_search_job A
    JOIN odesk_job B ON A.job_id = B.job_id
    WHERE B.date_created > EXTRACT(epoch FROM (NOW() - INTERVAL '6 HOURS'))::int
), new AS (
    SELECT
    A.id, A.chat_id, A.search, B.job_id, B.job_title, B.date_created
    FROM user_notifs AS A
    JOIN jobs B ON (
        B.tsv_basic @@ to_tsquery('english', A.fts_query) AND
        B.date_created > A.last_job_ts
    )
), json_packed AS (
    SELECT
    A.id,
    A.search,
    A.chat_id,
    json_agg(
    json_build_object (
        'job_id', A.job_id,
        'job_title', A.job_title,
        'date_created', A.date_created
    )) AS j
    FROM new A
    GROUP BY A.id, A.search, A.chat_id
)
SELECT * FROM json_packed;

Tightening the constraints

Near the end of the collector program we compute the notifications and an expensive search query needs to be run in order to find out what to send and whom to send it to.

However, every time this query has run, we can store the latest timestamp on that search stream so next time we can tighten the search constraints and only search after that timestamp.

In order to do this, the search streams' last_job_ts needs to be updated:

UPDATE odesk_telegram SET last_job_ts = %(new_ts)d WHERE id = %(id)d;

For the active search streams that had new jobs, the earliest timestamp can be passed as parameter to this query.

Even for the active search streams that have seen no new jobs, we still have to tighten the search by updating their last_job_ts to the time when the collector started (we can only go so far, any later than this and we might miss jobs that were posted while the collector was running).

Optimizing the search streams

If enough data and enough users are present, we could craft a better query for this. For example, the search keywords could be organized in a tree structure.

This particular tree would store keywords based on the number of search results they're present in; in other words, the more queries a keyword exists in, the closer to the root that keyword will be.

A search stream corresponds to a path in this tree.

For example, in the tree below, php, wordpress is a search stream and user3 has registered for it. Accordingly, user3 will receive job ads that match the words php and wordpress. Given the logic described above, php will match more jobs than wordpress.9

A temporary table can be created for the high-volume top-level search streams. To get to the more specific search streams, a JOIN on this table followed by the conditions for the lower-volume keywords would be enough.

For example, there are two search streams php,wordpress (for user3) and php,mysql (for user2). We could cache the ids of the items to be notified (the search results) for the larger stream php and then refine it to get the two streams we're interested in.

This would be particularly interesting for a situation with a large number of subscribed users and a lot of active search streams (and the tree should have an average branching factor greater than one so there's at least some nodes that we can cache the search results for).

Conclusion

This blog post describes the redesign of a notification system and some ideas about improving its performance.

Footnotes:

1

Notifications in PostgreSQL have been around for a long time, but as of version 9.0 they are equipped with a payload.

2

The message passing described in this blog post is simple (compared to other scenarios) and similar to the pubsub pattern.

Message delivery may be more critical than what the solution described in this post can provide.

For that purpose, this article provides an improved solution that handles delivery failures. It uses a queue table and two triggers. The queue table is used to persist the messages that are sent.

One trigger is placed on the table whose changes are of interest.

The other trigger will be placed on the queue table. So the message will be the "cascading" result of modifying the actual table. The advantage here is persistence (among other advantages that you can read in that article).

What's more, for the first trigger, the function row_to_json offers a way to serialize the changes in a structure-agnostic way.

Here's another article describing a queue-table-centric approach (without any LISTEN/NOTIFY). Each item is consumed exactly once. There's an emphasis put on locking and updating the 'processed' state of each item in the queue table, and different approaches for that.

3

For example this presentation in which the author explains how Postgres sends notifications to a Python backend via LISTEN/NOTIFY, which are then forwarded to the browser via websockets. The presentation is also available on youtube here.

5

This article describes a scenario where a PostgreSQL database updates a cache by broadcasting changes to it.

6

Although the code here is in Python, you may certainly use PostgreSQL's notifications in other languages (it's a well-supported feature) including the following:

7

More details about the conn.poll() statement. The poll() method comes from psycopg2 (it's called conn_poll in the C code) and it reads all the notifications from PQsocket (the ones we have issued a LISTEN statement for). There are 5 functions involved:

  • conn_poll (which in turn calls _conn_poll_query)
  • _conn_poll_query (in turn, calls pq_is_busy)
  • pq_is_busy (which calls conn_notifies_process and conn_notice_process)
  • conn_notifies_process (reads them from the PQsocket and populates C data structures)
  • conn_notice_process (turns the available notifications into Python data structures)
8

Need to keep in mind that the payloads are limited to 8000 bytes.

9

This section about optimizing and caching search results is just an idea at this point. Details such as how to keep the tree updated, which search streams should be cached in the temporary table, and how to represent it are not yet worked out. It's not yet implemented, it will probably be used later on.