Simple job queue in Bash using a FIFO
Table of Contents
Intro
I was looking for a simple example of a producer-consumer bash script that would distribute tasks to multiple consumers through a job queue. I found a few incomplete examples and decided to try my hand at writing my own.
So this blog post will focus on a few ways of implementing a queue like that, first by using fifos, then by using SysV message queues, and finally using Redis.
Here are a few use-cases:
- enhancing the speed of document indexing (for example, the maff_table_index function described in this blog post)
- writing a parallel (or distributed, if you use a distributed message queue) crawler
- writing a parallel (or distributed) RSS/Atom news feed fetcher
- speeding up test-suites
The different versions all start with the description of how that version of the message queue works, a description of the problems I encountered using them, then a flowchart, and then the producer and consumer implementations.
Testing
The fact that messages flow from the producer to the consumers is not enough. It's also important that everything produced is consumed. To this end, we make a simple test by writing the output of the consumers to a file, and then make some simple statistics on their output to see how many messages got dropped, how many messages were delivered succesfuly and how many messages there were in total.
seq_stats() { AWK_PROG=''' BEGIN { missing_list = ""; } { if(min==""){min=max=$1}; if($1>max){max=$1}; if($1<min){min=$1}; sum+=$1; observed+=1; ## if there is a gap, store the missing numbers if($1-prev>1) { for(i=prev+1;i<$1;i++) { missing_list = missing_list "," i; }; }; prev=$1 } END { expected=(max-min+1); missing=(expected-observed); printf "observed: %d\nmin: %d\nmax: %d\nmissing: %d\nmissing_p: %.5f\n", observed,min,max,missing,(100.0*(missing/expected)); printf "missing_list: %s\n",missing_list; } ''' sort -g | awk "$AWK_PROG" }
We can run it with all the integers from 1 to 40 but the integers from 10 to 20 missing and see what we get back
#!/bin/bash . $PWD/queue-samples/seq-stats.sh (seq 1 9; seq 21 40) | seq_stats
| observed: | 29 |
| min: | 1 |
| max: | 40 |
| missing: | 11 |
| missing_p: | 27.5 |
| missing_list: | ,10,11,12,13,14,15,16,17,18,19,20 |
FIFO versions
one fifo, multiple consumers, without locking
Here, the producer/consumer open the fifo for every message.
There are certain limitations such as only being able to write atomically at most 4096 bytes to a fifo 1 , 2 , 3, and this leads to problems such as consumers reading only parts of the messages sent to them.
There are several places 4 that advise against having multiple readers read from a single fifo. Usually they're used for 5 a one-consumer and one-producer scenarios.
In this case, an error will be thrown by the producer because it'll
try to write ./producer.sh: line 10: echo: write error: Broken
pipe. This has to do with the producer writing to a pipe closed by a
client who exited too early. So this version will drop messages.
Flowchart
Producer
#!/bin/bash FIFO=/tmp/fifo1 n=0 mkfifo $FIFO while [[ true ]]; do n=$((n+1)) MESSAGE="message $n" echo "$MESSAGE" >$FIFO done
Consumer
#!/bin/bash FIFO=/tmp/fifo1 while [[ true ]]; do cat $FIFO done
one fifo, multiple consumers, with locking
This is very similar to the previous version except all the reads/writes are done if the lock can be acquired. One of two things happens here:
- The consumer acquires the lock first, it then starts reading from the pipe but since nobody else has opened the other end for writing, it will block 6. The consumer won't release the lock until it has read something from the fifo. The producer then tries to acquire the lock, but since the consumer has it, it won't be able to.
- The producer acquires the lock first, it starts writing to the pipe, but the write operation blocks 6 until the reading end of the fifo is opened. As in the previous case, the consumer can't acquire the lock anymore.
So this is a typical deadlock, neither the consumer nor the producer can interact because of the way they acquire the lock and the blocking fifo operations.
Flowchart
Producer
#!/bin/bash FIFO=/tmp/fifo1 LOCK=/var/lock/.prod.ex.lock N=0 mkfifo $FIFO trap '{ }' SIGPIPE trap '{ echo -e "\nlast N=$N"; exit 0; }' EXIT while true; do N=$((N+1)) ( flock -w 0.03 -n 200 if [[ "$?" != 0 ]]; then echo "can't acquire lock!"; fi MSG="msg $N" echo "$MSG" > $FIFO ) 200>$LOCK done
Consumer
#!/bin/bash FIFO=/tmp/fifo1 LOCK=/var/lock/.prod.ex.lock # read message from FIFO while true; do ( flock -w 0.03 -n 200 if [[ "$?" != 0 ]]; then echo "can't acquire lock!"; fi read MSG < $FIFO echo "$MSG" ) 200>$LOCK done
multiple fifos and dirlocks
This version is a bit different. We create the fifos, start the consumers and record their pids. Afterwards, the producer will start generating messages. Every time the producer needs to send a message(through the fifo) to a consumer, it will first acquire a dirlock. There is one dirlock for each consumer. In order for the producer not to block while sending a message, the write operation on the fifo is put in background. The dirlock mentioned earlier is only released by the consumer, when it has finished consuming the message received through the fifo. The producer continues to cycle through the consumers to find which one is free and continues to do the same for them. The consumer only reads new messages from the fifo, and unlocks the dirlock, which allows for new messages to be delivered on that fifo. The dirlock 7 is one of the different locks used primarily in bash scripts.
Flowchart
Producer
#!/bin/bash ## In this scenario, we create the number of consumers, each with their ## own fifo, and then the producer will hand out messages by cycling through ## their fifos and writing to each of them. FIFO_PREF="/tmp/bgfifo" LOCK_PREF="/tmp/bgfifo.lock." # message number N=1 # number of consumers C=10 create_fifos() { ## create the fifos for i in $(seq 0 $((C-1))); do CONSUMER_FIFO="$FIFO_PREF$i" CONSUMER_LOCK="$LOCK_PREF$i" mkfifo "$CONSUMER_FIFO" rmdir $CONSUMER_LOCK done } # make a table with these columns # pid_consumer,pid_echo # this table should be used when checking the status of writes # or checking which pid has which index declare -A status init_consumers() { pids=$(pgrep -f "consumer-back") # C rows, 2 columns for ((i=0;i<=C-1;i++)) do CONSUMER_FIFO="$FIFO_PREF$i" # we start the consumer in the background, it will wait for data # to be available on its associated fifo echo "Creating consumer.." nohup ./consumer-back.sh "$i" & # consumer pid status[$i,0]=$! # slot for consumer echo pid status[$i,1]=0 done } kill_consumers() { # needs to be reimplemented echo "closing consumers" for ((i=0;i<=C-1;i++)) do CONSUMER_LOCK="$LOCK_PREF$i" cpid=${status[$i,0]} epid=${status[$i,1]} if [[ $cpid > 0 ]]; then echo "closing consumer $cpid" kill -9 $cpid 2>/dev/null fi if [[ $epid > 0 ]]; then kill -9 $epid 2>/dev/null fi done } producer() { ## start generating messages and distribute them ## among the consumers while (( N <= 5000 )); do ## NOTE: We overcome the problems related to blocking writes because ## we put in the background every single write we make # produce message and send it to the first free consumer for ((i=0;i<=C-1;i++)) do CONSUMER_FIFO="$FIFO_PREF$i" CONSUMER_LOCK="$LOCK_PREF$i" cpid=${status[$i,0]} epid=${status[$i,1]} kill -0 $cpid 2>/dev/null ok_cons=$? kill -0 $epid 2>/dev/null ok_echo=$? # if the consumer is still there and there's no previous # writes on this consumer's fifo then dispatch a message to it if [[ $ok_cons -eq 0 && $( mkdir $CONSUMER_LOCK 2>/dev/null; echo $?;) == 1 ]]; then echo "consumer $cpid was free" # produce next message # and send it echo "message $N" > $CONSUMER_FIFO 2>/dev/null & # update epid status[$i,1]=$! # we could break here and wait for the next iteration # find another free consumer N=$((N+1)) else echo "consumer $cpid was busy" fi done echo "====================" done } # kill consumers upon exit trap '{ kill_consumers; exit 0; }' SIGINT create_fifos init_consumers producer wait
Consumer
#!/bin/bash FIFO_PREF="/tmp/bgfifo" FIFO="$FIFO_PREF$1" LOCK_PREF="/tmo/bgfifo.lock." LOCK="$LOCK_PREF$1" while true; do ## if no message was received, skip to the ## next iteration read line < $FIFO if [[ -z $line ]]; then continue fi ## message was received, just print it echo "$$ -> $line" rmdir $LOCK 2>/dev/null done
Redis & Bash version
Redis provides multiple data structures, and lists are one of them. For lists, it provides blocking and non-blocking operations that act on the head of the list as well as the tail of the list. Using these primitives, we're going to implement the reliable queue pattern 8 according to the Redis documentation. I've also bumped into an error about connections 9 but was able to overcome it.
So in this instance we have 2 queues, q1 and q2. The producer puts messages on q1, the consumer atomically pops an element off of q1 and puts it onto q2. After it's been processed by the consumer, the message is deleted from q2. The second queue (q2) is used to recover from failures(network problems or consumer crashes). If messages are equipped with a timestamp, then their age can be measured, and if they sit in q2 for too much time, they can be transferred back to q1 to be re-processed again (by a consumer).
Flowchart
Producer
#!/bin/bash REDIS_CLI="redis-cli -h 127.0.0.1" n=1 nmax=1000 q1="queue" q2="processing" clean() { echo "DEL $q1" | $REDIS_CLI echo "DEL $q2" | $REDIS_CLI } produce() { while (($n <= $nmax)); do MSG="message $n" echo "LPUSH $q1 \"$MSG\"" | $REDIS_CLI n=$((n+1)) done } clean produce
Consumer
#!/bin/bash REDIS_CLI="redis-cli -h 127.0.0.1" q1="queue" q2="processing" # redis nil reply nil=$(echo -n -e '\r\n') consume() { while true; do # move message to processing queue MSG=$(echo "RPOPLPUSH $q1 $q2" | $REDIS_CLI) if [[ -z "$MSG" ]]; then break fi # processing message echo "$MSG" # remove message from processing queue echo "LREM $q2 1 \"$MSG\"" | $REDIS_CLI >/dev/null done } consume
Perl version using message queues
In this section we use a SysV message queue. The producer will write to it and the consumers will read from it.
The maximum size 10 of a message in a message queue is 8192 bytes and the total size of the message queue is 16384 bytes (although these limits can be changed, but these are the default values).
The advantage is that a consumer succesfuly retrieves a message
from the message queue and then it is removed from the queue. There's
no risk of retrieving just part of the message. In addition, the queue has
a maximum capacity 10 and calls to msgsnd will block until
there is enough room in the queue to write more messages.
Producer
#!/usr/bin/env perl ## This is a basic Producer use strict; use warnings; use Carp; use IPC::SysV qw( IPC_PRIVATE IPC_RMID IPC_CREAT S_IRWXU S_IRUSR S_IWUSR ftok IPC_STAT IPC_PRIVATE MSG_NOERROR); # flush output $| = 1; my $queue_file = "/tmp/queue1"; # create file (will be used to generate the open(my $fh,'>',$queue_file); close($fh); # use file to generate an IPC key value my $msgkey = ftok($queue_file); # check if the IPC key is defined if(!defined $msgkey) { croak "couldn't generate IPC key value"; }; # create the message queue my $ipc_id = msgget( $msgkey, IPC_CREAT | S_IRUSR | S_IWUSR ); my $n = 0; $SIG{INT} = sub { print "Last n=$n\n"; exit 0; }; # start sending messages while(1) { my $mtype = 1; my $buffer_size = 200; $n++; my $buffer = "message $n"; my $msg = pack('V V a200', $mtype, $buffer_size, $buffer); msgsnd( $ipc_id, $msg, 0); }; #sleep 30; # dispose of the queue file #unlink $queue_file;
Consumer
#!/usr/bin/env perl ## This is a basic Consumer use strict; use warnings; use Carp; use IPC::SysV qw( IPC_PRIVATE IPC_RMID IPC_CREAT S_IRWXU S_IRUSR S_IWUSR ftok IPC_STAT IPC_PRIVATE MSG_NOERROR IPC_NOWAIT); use Errno qw(:POSIX); use Time::HiRes qw(usleep); # flush output $| = 1; my $queue_file = "/tmp/queue1"; # create file (will be used to generate the open(my $fh,'>',$queue_file); close($fh); # use file to generate an IPC key value my $msgkey = ftok($queue_file); # check if the IPC key is defined if(!defined $msgkey) { croak "couldn't generate IPC key value"; }; # create the message queue my $ipc_id = msgget( $msgkey, IPC_CREAT | S_IRUSR | S_IWUSR ); my $qempty_tries_max = 1000; my $qempty_tries = $qempty_tries_max; # start sending messages while(1) { my $msg; # read raw message from queue # # IPC_NOWAIT will cause msgrcv to not block and return immediately # with ENOMSG if there are no messages of that type in the message # queue. my $bytes_recv = msgrcv($ipc_id, $msg, 208, 0, IPC_NOWAIT); if($!{ENOMSG}) { $qempty_tries--; if($qempty_tries == 0) { # exit loop because we've exhausted the number of tries last; }; # sleep 1% of a second (we're basically polling for # a new message on the queue. we give up if no message # is found and we exhaust the number of tries) usleep(1_000_000/100); } else { # refill tries if a message was present in the queue $qempty_tries = $qempty_tries_max; }; # skip (no bytes received) next if $bytes_recv == 0; # split the message according to its format my ($mtype,$buffer_size,$buffer) = unpack("V V a200", $msg); print "$buffer\n"; }; warn '[dbg] Queue is empty'; # dispose of the queue file #unlink $queue_file;
Using ipcmd
This will be very similar to the Perl version above. We're still using a
SysV message queue except now we're using Bash and a command-line interface to SysV.
The interface is not as complete (it does provide msgget, msgrcv,
msgsnd, ftok), but it's enough for what's needed here.
The manpage for ipcmd was quite good too, abunding in examples.
Producer
#!/bin/bash ipcmd="/home/user/sources/ipcmd/bin/ipcmd" ftok="$ipcmd ftok" msgget="$ipcmd msgget" msgsnd="$ipcmd msgsnd" msgrcv="$ipcmd msgrcv" queue_file="/tmp/queue.ipcmd" producer() { touch $queue_file msgkey=$($ftok $queue_file) ipc_id=$($msgget -Q "$msgkey") n=1 while true; do msg="message $n" echo "$msg" | $msgsnd -q "$ipc_id" n=$((n+1)) done } producer
Consumer
#!/bin/bash ipcmd="/home/user/sources/ipcmd/bin/ipcmd" ftok="$ipcmd ftok" msgget="$ipcmd msgget" msgsnd="$ipcmd msgsnd" msgrcv="$ipcmd msgrcv" queue_file="/tmp/queue.ipcmd" consumer() { touch $queue_file msgkey=$($ftok $queue_file) echo $msgkey # using -e will only retrieve the id of the queue # instead of attempting to create a new one ipc_id=$($msgget -e -Q "$msgkey") echo $ipc_id while true; do msg=$($msgrcv -q "$ipc_id") echo $msg done } consumer
Conclusion
Using a fifo for a situation other than 1 producer and 1 consumer will cause problems. Initially I was interested in a cheap, minimalistic message queue to use a fifo as a message queue. Upon realizing that this was not viable, I decided to look at some other options.
We haven't looked at situations with multiple producers. Below there's a table that summarizes the specifics of each version described in this blog post.
| version | makes use of | drops messages | deadlock | distributed | longer |
|---|---|---|---|---|---|
| 3rd party software | messages | ||||
| fifo v1 | [ ] | [X] | [ ] | [ ] | ≤ 4096b 3 |
| fifo v2 | [ ] | [ ] | [X] | [ ] | ≤ 4096b |
| fifo v3 | [ ] | [ ] | [ ] | [ ] | ≤ 4096b |
| perl & sysv mq | [ ] | [ ] | [ ] | [ ] | ≤ 8192b 10 |
| bash & ipcmd | [X] | [ ] | [ ] | [ ] | ≤ 8192b |
| bash & redis | [X] | [ ] | [ ] | [X] | ≤ 512MB 11 |
Footnotes:
http://stackoverflow.com/a/34016872/827519
#define PIPE_BUF 4096 /* # bytes in atomic write to a pipe */
Having multiple readers on a single fifo is also mentioned in Beej's networking tutorial:
Finally, what happens if you have multiple readers? Well, strange things happen. Sometimes one of the readers get everything. Sometimes it alternates between readers. Why do you want to have multiple readers, anyway?
The fifo manpage describes how it blocks until both ends are opened:
The kernel maintains exactly one pipe object for each FIFO special file that is opened by at least one process. The FIFO must be opened on both ends (reading and writing) before data can be passed. Normally, opening the FIFO blocks until the other end is opened also.