Reader level: Introductory

RQ can be used to set up queues for executing long-running tasks on local or remote machines. Some steps on how to install and get started with RQ are listed below.

Installation

Create a virtual environment and we will have to install the following components:

  1. Redis-server
  2. RQ
  3. RQ-scheduler

Install Redis using the following

wget http://download.redis.io/redis-stable.tar.gz
tar xvzf redis-stable.tar.gz
cd redis-stable
make

Run ‘make test’ to make sure things are working properly, followed by ‘sudo make install’ to complete the installation.

Now install ‘RQ’ using the following

pip install rq

Install the RQ scheduler using

pip install rq-scheduler

Running

  1. Start the Redis server by typing the following command
  redis-server

You may want to daemonize this, follow instructions given here to do that.

  1. Start a RQ worker using
   rq worker
  1. If we want to schedule jobs using RQ-scheduler start that as well with
   rqscheduler

Now you are ready to start queuing jobs. You may want to use supervisor to manage rq worker and rqscheduler processes. On a Linux machine you can install using supervisor using one of the following depending on your distribution

sudo yum install supervisor

or

sudo apt-get install -y supervisor

This installs a configuration file at /etc/supervisord.conf that determines what jobs are started and how they are managed. I chose to install RQ under a conda environment so as to isolate that environment. Add the following, which shows how to run RQ as a service from a conda environment, to the end of the files (you will need sudo privileges to edit this file) as shown on the RQ page.

[program:myworker]
; Point the command to the specific rq command you want to run.
; For conda virtual environments, install RQ into your env.
; Also, you probably want to include a settings module to configure this
; worker.  For more info on that, see http://python-rq.org/docs/workers/
environment=PATH='/opt/conda/envs/myenv/bin'
command=/opt/conda/envs/myenv/bin/rq worker -c mysettings high normal low
; process_num is required if you specify >1 numprocs
process_name=%(program_name)s-%(process_num)s

; If you want to run more than one worker instance, increase this
numprocs=1

; This is the directory from which RQ is ran. Be sure to point this to the
; directory where your source code is importable from
directory=/path/to

; RQ requires the TERM signal to perform a warm shutdown. If RQ does not die
; within 10 seconds, supervisor will forcefully kill it
stopsignal=TERM

; These are up to you
autostart=true
autorestart=true

Feel free to edit the ‘process_name’ if you need to and set the environment for your codna environment in ‘environment’ and ‘command’ above. The path given by ‘directory’ is where your code will be executed, so provide an appropriate path so RQ can find your files. Now start the service using

sudo service supervisord start

You can check the status of supervisor using

sudo service supervisord status

Also if you run ‘sudo supervisorctl’ it will display if any of your processes have or have not started, this is a great way to debug if your service has started. Now you can do the same for ‘rqscheduler’ by adding another section as shown above in the supervisord.conf file to start ‘rqscheduler’.

Testing

Now create the following files, the file below is used to create the RQ connection:

#function.py
from redis import Redis
from rq import Queue
import requests
from my_module import wrapper, wrapper_na


q = Queue(connection=Redis())
job = q.enqueue(wrapper, 'http://nvie.com', ttl=400)
job = q.enqueue(wrapper_na )

print("Main function ",job)

This code contains the worker function to execute:

#my_module.py
import requests
from rq import get_current_job
import datetime

def count_words_at_url(url):
    """Just an example function that's called async."""
    resp = requests.get(url)
    return len(resp.text.split())


def wrapper(url):
   dt = datetime.datetime.now()
   f = open('redis_test.out' + str(dt),'w')
   res = count_words_at_url(url)
   print("From module ",res)
   job = get_current_job()
   print('Current job: %s' % (job.id,))
   f.write("Results are " + str(res))
   f.close()
   return(res)

Sometimes jobs need to be repeated perodically, which can be done using the rq-scheduler as shown below:

   # test_scheduler.py
   from redis import Redis
   from rq import Queue
   from rq_scheduler import Scheduler
   from datetime import datetime
   from my_module import wrapper, wrapper_na
   from datetime import timedelta

   scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
   job = scheduler.schedule(scheduled_time=datetime.utcnow(), func=wrapper, args=["http://nvie.com"], repeat=2, interval=60)
   print("Enqueued job ",job)

Debugging issues with RQ

As mentioned above, you can check the status of supervisor with

sudo supervisorctl

which should give you a list of running services that indicate the status

myscheduler:"RQscheduler"        RUNNING   pid 32688, uptime 225 days, 21:17:14
myworker:"RQworker"              RUNNING   pid 32687, uptime 225 days, 21:17:14

You can start or stop a service using

supervisorctl stop RQscheduler
supervisorctl start RQscheduler

Log files for usually be found at a location such as

/var/log/supervisor/supervisord.log