RQ can be used to set up queues for executing long-running tasks on local or remote machines. Some steps on how to install and get started with RQ are listed below.
Installation
Create a virtual environment and we will have to install the following components:
- Redis-server
- RQ
- RQ-scheduler
Install Redis using the following
wget http://download.redis.io/redis-stable.tar.gz
tar xvzf redis-stable.tar.gz
cd redis-stable
make
Run ‘make test’ to make sure things are working properly, followed by ‘sudo make install’ to complete the installation.
Now install ‘RQ’ using the following
pip install rq
Install the RQ scheduler using
pip install rq-scheduler
Running
- Start the Redis server by typing the following command
redis-server
You may want to daemonize this, follow instructions given here to do that.
- Start a RQ worker using
rq worker
- If we want to schedule jobs using RQ-scheduler start that as well with
rqscheduler
Now you are ready to start queuing jobs. You may want to use supervisor to manage rq worker and rqscheduler processes. On a Linux machine you can install using supervisor using one of the following depending on your distribution
sudo yum install supervisor
or
sudo apt-get install -y supervisor
This installs a configuration file at /etc/supervisord.conf that determines what jobs are started and how they are managed. I chose to install RQ under a conda environment so as to isolate that environment. Add the following, which shows how to run RQ as a service from a conda environment, to the end of the files (you will need sudo privileges to edit this file) as shown on the RQ page.
[program:myworker]
; Point the command to the specific rq command you want to run.
; For conda virtual environments, install RQ into your env.
; Also, you probably want to include a settings module to configure this
; worker. For more info on that, see http://python-rq.org/docs/workers/
environment=PATH='/opt/conda/envs/myenv/bin'
command=/opt/conda/envs/myenv/bin/rq worker -c mysettings high normal low
; process_num is required if you specify >1 numprocs
process_name=%(program_name)s-%(process_num)s
; If you want to run more than one worker instance, increase this
numprocs=1
; This is the directory from which RQ is ran. Be sure to point this to the
; directory where your source code is importable from
directory=/path/to
; RQ requires the TERM signal to perform a warm shutdown. If RQ does not die
; within 10 seconds, supervisor will forcefully kill it
stopsignal=TERM
; These are up to you
autostart=true
autorestart=true
Feel free to edit the ‘process_name’ if you need to and set the environment for your codna environment in ‘environment’ and ‘command’ above. The path given by ‘directory’ is where your code will be executed, so provide an appropriate path so RQ can find your files. Now start the service using
sudo service supervisord start
You can check the status of supervisor using
sudo service supervisord status
Also if you run ‘sudo supervisorctl’ it will display if any of your processes have or have not started, this is a great way to debug if your service has started. Now you can do the same for ‘rqscheduler’ by adding another section as shown above in the supervisord.conf file to start ‘rqscheduler’.
Testing
Now create the following files, the file below is used to create the RQ connection:
#function.py
from redis import Redis
from rq import Queue
import requests
from my_module import wrapper, wrapper_na
q = Queue(connection=Redis())
job = q.enqueue(wrapper, 'http://nvie.com', ttl=400)
job = q.enqueue(wrapper_na )
print("Main function ",job)
This code contains the worker function to execute:
#my_module.py
import requests
from rq import get_current_job
import datetime
def count_words_at_url(url):
"""Just an example function that's called async."""
resp = requests.get(url)
return len(resp.text.split())
def wrapper(url):
dt = datetime.datetime.now()
f = open('redis_test.out' + str(dt),'w')
res = count_words_at_url(url)
print("From module ",res)
job = get_current_job()
print('Current job: %s' % (job.id,))
f.write("Results are " + str(res))
f.close()
return(res)
Sometimes jobs need to be repeated perodically, which can be done using the rq-scheduler as shown below:
# test_scheduler.py
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
from datetime import datetime
from my_module import wrapper, wrapper_na
from datetime import timedelta
scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue
job = scheduler.schedule(scheduled_time=datetime.utcnow(), func=wrapper, args=["http://nvie.com"], repeat=2, interval=60)
print("Enqueued job ",job)
Debugging issues with RQ
As mentioned above, you can check the status of supervisor with
sudo supervisorctl
which should give you a list of running services that indicate the status
myscheduler:"RQscheduler" RUNNING pid 32688, uptime 225 days, 21:17:14
myworker:"RQworker" RUNNING pid 32687, uptime 225 days, 21:17:14
You can start or stop a service using
supervisorctl stop RQscheduler
supervisorctl start RQscheduler
Log files for usually be found at a location such as
/var/log/supervisor/supervisord.log