Intro

I was looking for a simple example of a producer-consumer bash script that would distribute tasks to multiple consumers through a job queue. I found a few incomplete examples and decided to try my hand at writing my own.

So this blog post will focus on a few ways of implementing a queue like that, first by using fifos, then by using SysV message queues, and finally using Redis.

Here are a few use-cases:

  • enhancing the speed of document indexing (for example, the maff_table_index function described in this blog post)
  • writing a parallel (or distributed, if you use a distributed message queue) crawler
  • writing a parallel (or distributed) RSS/Atom news feed fetcher
  • speeding up test-suites

The different versions all start with the description of how that version of the message queue works, a description of the problems I encountered using them, then a flowchart, and then the producer and consumer implementations.

Testing

The fact that messages flow from the producer to the consumers is not enough. It's also important that everything produced is consumed. To this end, we make a simple test by writing the output of the consumers to a file, and then make some simple statistics on their output to see how many messages got dropped, how many messages were delivered succesfuly and how many messages there were in total.

seq_stats() {
    AWK_PROG='''
    BEGIN {
        missing_list = "";
    }
    { 
        if(min==""){min=max=$1};
        if($1>max){max=$1};
        if($1<min){min=$1};
        sum+=$1;
        observed+=1;

        ## if there is a gap, store the missing numbers
        if($1-prev>1) {
            for(i=prev+1;i<$1;i++) {
                missing_list = missing_list "," i;
            };
        };
        prev=$1
    }
    END {
        expected=(max-min+1);
        missing=(expected-observed);
        printf "observed: %d\nmin: %d\nmax: %d\nmissing: %d\nmissing_p: %.5f\n",
        observed,min,max,missing,(100.0*(missing/expected));
        printf "missing_list: %s\n",missing_list;
    }
    '''
    sort -g | awk "$AWK_PROG"
}

We can run it with all the integers from 1 to 40 but the integers from 10 to 20 missing and see what we get back

#!/bin/bash
. $PWD/queue-samples/seq-stats.sh
(seq 1 9; seq 21 40) | seq_stats
observed: 29
min: 1
max: 40
missing: 11
missing_p: 27.5
missing_list: ,10,11,12,13,14,15,16,17,18,19,20

FIFO versions

one fifo, multiple consumers, without locking

Here, the producer/consumer open the fifo for every message.

There are certain limitations such as only being able to write atomically at most 4096 bytes to a fifo 1 , 2 , 3, and this leads to problems such as consumers reading only parts of the messages sent to them.

There are several places 4 that advise against having multiple readers read from a single fifo. Usually they're used for 5 a one-consumer and one-producer scenarios.

In this case, an error will be thrown by the producer because it'll try to write ./producer.sh: line 10: echo: write error: Broken pipe. This has to do with the producer writing to a pipe closed by a client who exited too early. So this version will drop messages.

Flowchart

Producer

#!/bin/bash
FIFO=/tmp/fifo1
n=0
mkfifo $FIFO

while [[ true ]]; do
    n=$((n+1))
    MESSAGE="message $n"
    echo "$MESSAGE" >$FIFO
done

Consumer

#!/bin/bash
FIFO=/tmp/fifo1
while [[ true ]]; do
    cat $FIFO
done

one fifo, multiple consumers, with locking

This is very similar to the previous version except all the reads/writes are done if the lock can be acquired. One of two things happens here:

  1. The consumer acquires the lock first, it then starts reading from the pipe but since nobody else has opened the other end for writing, it will block 6. The consumer won't release the lock until it has read something from the fifo. The producer then tries to acquire the lock, but since the consumer has it, it won't be able to.
  2. The producer acquires the lock first, it starts writing to the pipe, but the write operation blocks 6 until the reading end of the fifo is opened. As in the previous case, the consumer can't acquire the lock anymore.

So this is a typical deadlock, neither the consumer nor the producer can interact because of the way they acquire the lock and the blocking fifo operations.

Flowchart

Producer

#!/bin/bash
FIFO=/tmp/fifo1
LOCK=/var/lock/.prod.ex.lock
N=0
mkfifo $FIFO

trap '{  }' SIGPIPE
trap '{ echo -e "\nlast N=$N"; exit 0; }' EXIT

while true; do
    N=$((N+1))
    (
    flock -w 0.03 -n 200
    if [[ "$?" != 0 ]]; then
        echo "can't acquire lock!";
    fi
    MSG="msg $N"
    echo "$MSG" > $FIFO
    ) 200>$LOCK
done

Consumer

#!/bin/bash
FIFO=/tmp/fifo1
LOCK=/var/lock/.prod.ex.lock

# read message from FIFO
while true; do
    (
    flock -w 0.03 -n 200
    if [[ "$?" != 0 ]]; then
        echo "can't acquire lock!";
    fi
    read MSG < $FIFO
    echo "$MSG"
    ) 200>$LOCK
done

multiple fifos and dirlocks

This version is a bit different. We create the fifos, start the consumers and record their pids. Afterwards, the producer will start generating messages. Every time the producer needs to send a message(through the fifo) to a consumer, it will first acquire a dirlock. There is one dirlock for each consumer. In order for the producer not to block while sending a message, the write operation on the fifo is put in background. The dirlock mentioned earlier is only released by the consumer, when it has finished consuming the message received through the fifo. The producer continues to cycle through the consumers to find which one is free and continues to do the same for them. The consumer only reads new messages from the fifo, and unlocks the dirlock, which allows for new messages to be delivered on that fifo. The dirlock 7 is one of the different locks used primarily in bash scripts.

Flowchart

Producer

#!/bin/bash
## In this scenario, we create the number of consumers, each with their
## own fifo, and then the producer will hand out messages by cycling through
## their fifos and writing to each of them.

FIFO_PREF="/tmp/bgfifo"
LOCK_PREF="/tmp/bgfifo.lock."
# message number
N=1
# number of consumers
C=10

create_fifos() {
    ## create the fifos
    for i in $(seq 0 $((C-1))); do
        CONSUMER_FIFO="$FIFO_PREF$i"
        CONSUMER_LOCK="$LOCK_PREF$i"
        mkfifo "$CONSUMER_FIFO"
        rmdir $CONSUMER_LOCK
    done
}

# make a table with these columns
# pid_consumer,pid_echo
# this table should be used when checking the status of writes
# or checking which pid has which index
declare -A status
init_consumers() {
    pids=$(pgrep -f "consumer-back")
    # C rows, 2 columns
    for ((i=0;i<=C-1;i++)) do
        CONSUMER_FIFO="$FIFO_PREF$i"
        # we start the consumer in the background, it will wait for data
        # to be available on its associated fifo
        echo "Creating consumer.."
        nohup ./consumer-back.sh "$i" & 
        # consumer pid
        status[$i,0]=$!
        # slot for consumer echo pid
        status[$i,1]=0
    done
}


kill_consumers() {
    # needs to be reimplemented
    echo "closing consumers"
    for ((i=0;i<=C-1;i++)) do
        CONSUMER_LOCK="$LOCK_PREF$i"
        cpid=${status[$i,0]}
        epid=${status[$i,1]}

        if [[ $cpid > 0 ]]; then
            echo "closing consumer $cpid"
            kill -9 $cpid 2>/dev/null
        fi

        if [[ $epid > 0 ]]; then
            kill -9 $epid 2>/dev/null
        fi

    done
}


producer() {
    ## start generating messages and distribute them
    ## among the consumers
    while (( N <= 5000 )); do
        ## NOTE: We overcome the problems related to blocking writes because
        ## we put in the background every single write we make

        # produce message and send it to the first free consumer
        for ((i=0;i<=C-1;i++)) do
            CONSUMER_FIFO="$FIFO_PREF$i"
            CONSUMER_LOCK="$LOCK_PREF$i"
            cpid=${status[$i,0]}
            epid=${status[$i,1]}

            kill -0 $cpid 2>/dev/null
            ok_cons=$?
            kill -0 $epid 2>/dev/null
            ok_echo=$?


            # if the consumer is still there and there's no previous
            # writes on this consumer's fifo then dispatch a message to it
            if [[ $ok_cons -eq 0 && $( mkdir $CONSUMER_LOCK 2>/dev/null; echo $?;) == 1 ]]; then
                echo "consumer $cpid was free"
                # produce next message
                # and send it
                echo "message $N" > $CONSUMER_FIFO 2>/dev/null &
                # update epid
                status[$i,1]=$!
                # we could break here and wait for the next iteration
                # find another free consumer
                N=$((N+1))
            else
                echo "consumer $cpid was busy"
            fi
        done
        echo "===================="
    done
}

# kill consumers upon exit
trap '{ kill_consumers; exit 0; }' SIGINT

create_fifos
init_consumers
producer

wait

Consumer

#!/bin/bash
FIFO_PREF="/tmp/bgfifo"
FIFO="$FIFO_PREF$1"
LOCK_PREF="/tmo/bgfifo.lock."
LOCK="$LOCK_PREF$1"
while true; do
    ## if no message was received, skip to the
    ## next iteration
    read line < $FIFO
    if [[ -z $line ]]; then
        continue
    fi
    ## message was received, just print it
    echo "$$ -> $line"
    rmdir $LOCK 2>/dev/null
done

Redis & Bash version

Redis provides multiple data structures, and lists are one of them. For lists, it provides blocking and non-blocking operations that act on the head of the list as well as the tail of the list. Using these primitives, we're going to implement the reliable queue pattern 8 according to the Redis documentation. I've also bumped into an error about connections 9 but was able to overcome it.

So in this instance we have 2 queues, q1 and q2. The producer puts messages on q1, the consumer atomically pops an element off of q1 and puts it onto q2. After it's been processed by the consumer, the message is deleted from q2. The second queue (q2) is used to recover from failures(network problems or consumer crashes). If messages are equipped with a timestamp, then their age can be measured, and if they sit in q2 for too much time, they can be transferred back to q1 to be re-processed again (by a consumer).

Flowchart

Producer

#!/bin/bash
REDIS_CLI="redis-cli -h 127.0.0.1"
n=1
nmax=1000
q1="queue"
q2="processing"

clean() {
    echo "DEL $q1" | $REDIS_CLI
    echo "DEL $q2" | $REDIS_CLI
}

produce() {
    while (($n <= $nmax)); do
        MSG="message $n"
        echo "LPUSH $q1 \"$MSG\"" | $REDIS_CLI
        n=$((n+1))
    done
}

clean
produce

Consumer

#!/bin/bash
REDIS_CLI="redis-cli -h 127.0.0.1"
q1="queue"
q2="processing"
# redis nil reply
nil=$(echo -n -e '\r\n')

consume() {
    while true; do
        # move message to processing queue
        MSG=$(echo "RPOPLPUSH $q1 $q2" | $REDIS_CLI)
        if [[ -z "$MSG" ]]; then
            break
        fi
        # processing message
        echo "$MSG"
        # remove message from processing queue
        echo "LREM $q2 1 \"$MSG\"" | $REDIS_CLI >/dev/null
    done
}

consume

Perl version using message queues

In this section we use a SysV message queue. The producer will write to it and the consumers will read from it.

The maximum size 10 of a message in a message queue is 8192 bytes and the total size of the message queue is 16384 bytes (although these limits can be changed, but these are the default values).

The advantage is that a consumer succesfuly retrieves a message from the message queue and then it is removed from the queue. There's no risk of retrieving just part of the message. In addition, the queue has a maximum capacity 10 and calls to msgsnd will block until there is enough room in the queue to write more messages.

Producer

#!/usr/bin/env perl
## This is a basic Producer
use strict;
use warnings;
use Carp;
use IPC::SysV qw(
    IPC_PRIVATE IPC_RMID IPC_CREAT S_IRWXU S_IRUSR
    S_IWUSR ftok IPC_STAT IPC_PRIVATE MSG_NOERROR);

# flush output
$| = 1;

my $queue_file = "/tmp/queue1";

# create file (will be used to generate the 
open(my $fh,'>',$queue_file);
close($fh);

# use file to generate an IPC key value
my $msgkey = ftok($queue_file);

# check if the IPC key is defined
if(!defined $msgkey) {
    croak "couldn't generate IPC key value";
};

# create the message queue
my $ipc_id = msgget( $msgkey, IPC_CREAT | S_IRUSR | S_IWUSR );

my $n = 0;

$SIG{INT} = sub {
    print "Last n=$n\n";
    exit 0;
};
# start sending messages
while(1) {
    my $mtype = 1;
    my $buffer_size = 200;
    $n++;
    my $buffer = "message $n";
    my $msg = pack('V V a200', $mtype, $buffer_size, $buffer);
    msgsnd( $ipc_id, $msg, 0);
};

#sleep 30;
# dispose of the queue file
#unlink $queue_file;

Consumer

#!/usr/bin/env perl
## This is a basic Consumer
use strict;
use warnings;
use Carp;
use IPC::SysV qw(
    IPC_PRIVATE IPC_RMID IPC_CREAT S_IRWXU S_IRUSR
    S_IWUSR ftok IPC_STAT IPC_PRIVATE MSG_NOERROR IPC_NOWAIT);
use Errno qw(:POSIX);
use Time::HiRes qw(usleep);
# flush output
$| = 1;

my $queue_file = "/tmp/queue1";

# create file (will be used to generate the 
open(my $fh,'>',$queue_file);
close($fh);

# use file to generate an IPC key value
my $msgkey = ftok($queue_file);

# check if the IPC key is defined
if(!defined $msgkey) {
    croak "couldn't generate IPC key value";
};

# create the message queue
my $ipc_id = msgget( $msgkey, IPC_CREAT | S_IRUSR | S_IWUSR );

my $qempty_tries_max = 1000;
my $qempty_tries = $qempty_tries_max;


# start sending messages
while(1) {
    my $msg;
    # read raw message from queue
    #
    # IPC_NOWAIT will cause msgrcv to not block and return immediately
    # with ENOMSG if there are no messages of that type in the message
    # queue.
    my $bytes_recv = msgrcv($ipc_id, $msg, 208, 0, IPC_NOWAIT);
    if($!{ENOMSG}) {
        $qempty_tries--;
        if($qempty_tries == 0) {
            # exit loop because we've exhausted the number of tries
            last;
        };
        # sleep 1% of a second (we're basically polling for
        # a new message on the queue. we give up if no message
        # is found and we exhaust the number of tries)
        usleep(1_000_000/100);
    } else {
        # refill tries if a message was present in the queue
        $qempty_tries = $qempty_tries_max;
    };

    # skip (no bytes received)
    next if $bytes_recv == 0;

    # split the message according to its format
    my ($mtype,$buffer_size,$buffer) = unpack("V V a200", $msg);

    print "$buffer\n";
};
warn '[dbg] Queue is empty';

# dispose of the queue file
#unlink $queue_file;

Using ipcmd

This will be very similar to the Perl version above. We're still using a SysV message queue except now we're using Bash and a command-line interface to SysV. The interface is not as complete (it does provide msgget, msgrcv, msgsnd, ftok), but it's enough for what's needed here. The manpage for ipcmd was quite good too, abunding in examples.

Producer

#!/bin/bash
ipcmd="/home/user/sources/ipcmd/bin/ipcmd"
ftok="$ipcmd ftok"
msgget="$ipcmd msgget"
msgsnd="$ipcmd msgsnd"
msgrcv="$ipcmd msgrcv"
queue_file="/tmp/queue.ipcmd"

producer() {
    touch $queue_file
    msgkey=$($ftok $queue_file)
    ipc_id=$($msgget -Q "$msgkey")
    n=1
    while true; do
        msg="message $n"
        echo "$msg" | $msgsnd -q "$ipc_id"
        n=$((n+1))
    done
}

producer

Consumer

#!/bin/bash
ipcmd="/home/user/sources/ipcmd/bin/ipcmd"
ftok="$ipcmd ftok"
msgget="$ipcmd msgget"
msgsnd="$ipcmd msgsnd"
msgrcv="$ipcmd msgrcv"
queue_file="/tmp/queue.ipcmd"

consumer() {
    touch $queue_file
    msgkey=$($ftok $queue_file)
    echo $msgkey
    # using -e will only retrieve the id of the queue
    # instead of attempting to create a new one
    ipc_id=$($msgget -e -Q "$msgkey")
    echo $ipc_id
    while true; do
        msg=$($msgrcv -q "$ipc_id")
        echo $msg
    done
}

consumer

Conclusion

Using a fifo for a situation other than 1 producer and 1 consumer will cause problems. Initially I was interested in a cheap, minimalistic message queue to use a fifo as a message queue. Upon realizing that this was not viable, I decided to look at some other options.

We haven't looked at situations with multiple producers. Below there's a table that summarizes the specifics of each version described in this blog post.

version makes use of drops messages deadlock distributed longer
  3rd party software       messages
fifo v1 [ ] [X] [ ] [ ] ≤ 4096b 3
fifo v2 [ ] [ ] [X] [ ] ≤ 4096b
fifo v3 [ ] [ ] [ ] [ ] ≤ 4096b
perl & sysv mq [ ] [ ] [ ] [ ] ≤ 8192b 10
bash & ipcmd [X] [ ] [ ] [ ] ≤ 8192b
bash & redis [X] [ ] [ ] [X] ≤ 512MB 11

Footnotes:

3

http://stackoverflow.com/a/34016872/827519

#define PIPE_BUF        4096	/* # bytes in atomic write to a pipe */
4

Having multiple readers on a single fifo is also mentioned in Beej's networking tutorial:

Finally, what happens if you have multiple readers? Well, strange things happen. Sometimes one of the readers get everything. Sometimes it alternates between readers. Why do you want to have multiple readers, anyway?

6

The fifo manpage describes how it blocks until both ends are opened:

The kernel maintains exactly one pipe object for each FIFO special file that is opened by at least one process. The FIFO must be opened on both ends (reading and writing) before data can be passed. Normally, opening the FIFO blocks until the other end is opened also.

9

this error is caused by the kernel not cleaning up TCP connections fast enough (see issue 340 in the redis project)

Could not connect to Redis at 127.0.0.1:6379: Cannot assign requested address