RQ can be used to set up queues for executing long-running tasks on local or remote machines. Some steps on how to install and get started with RQ are listed below.
Create a virtual environment and we will have to install the following components:
Install Redis using the following
wget http://download.redis.io/redis-stable.tar.gz tar xvzf redis-stable.tar.gz cd redis-stable make
Run ‘make test’ to make sure things are working properly, followed by ‘sudo make install’ to complete the installation.
Now install ‘RQ’ using the following
pip install rq
Install the RQ scheduler using
pip install rq-scheduler
- Start the Redis server by typing the following command
You may want to daemonize this, follow instructions given here to do that.
- Start a RQ worker using
- If we want to schedule jobs using RQ-scheduler start that as well with
Now you are ready to start queuing jobs. You may want to use supervisor to manage rq worker and rqscheduler processes. On a Linux machine you can install using supervisor using one of the following depending on your distribution
sudo yum install supervisor
sudo apt-get install -y supervisor
This installs a configuration file at /etc/supervisord.conf that determines what jobs are started and how they are managed. I chose to install RQ under a conda environment so as to isolate that environment. Add the following, which shows how to run RQ as a service from a conda environment, to the end of the files (you will need sudo privileges to edit this file) as shown on the RQ page.
[program:myworker] ; Point the command to the specific rq command you want to run. ; For conda virtual environments, install RQ into your env. ; Also, you probably want to include a settings module to configure this ; worker. For more info on that, see http://python-rq.org/docs/workers/ environment=PATH='/opt/conda/envs/myenv/bin' command=/opt/conda/envs/myenv/bin/rq worker -c mysettings high normal low ; process_num is required if you specify >1 numprocs process_name=%(program_name)s-%(process_num)s ; If you want to run more than one worker instance, increase this numprocs=1 ; This is the directory from which RQ is ran. Be sure to point this to the ; directory where your source code is importable from directory=/path/to ; RQ requires the TERM signal to perform a warm shutdown. If RQ does not die ; within 10 seconds, supervisor will forcefully kill it stopsignal=TERM ; These are up to you autostart=true autorestart=true
Feel free to edit the ‘process_name’ if you need to and set the environment for your codna environment in ‘environment’ and ‘command’ above. The path given by ‘directory’ is where your code will be executed, so provide an appropriate path so RQ can find your files. Now start the service using
sudo service supervisord start
You can check the status of supervisor using
sudo service supervisord status
Also if you run ‘sudo supervisorctl’ it will display if any of your processes have or have not started, this is a great way to debug if your service has started. Now you can do the same for ‘rqscheduler’ by adding another section as shown above in the supervisord.conf file to start ‘rqscheduler’.
Now create the following files, the file below is used to create the RQ connection:
#function.py from redis import Redis from rq import Queue import requests from my_module import wrapper, wrapper_na q = Queue(connection=Redis()) job = q.enqueue(wrapper, 'http://nvie.com', ttl=400) job = q.enqueue(wrapper_na ) print("Main function ",job)
This code contains the worker function to execute:
#my_module.py import requests from rq import get_current_job import datetime def count_words_at_url(url): """Just an example function that's called async.""" resp = requests.get(url) return len(resp.text.split()) def wrapper(url): dt = datetime.datetime.now() f = open('redis_test.out' + str(dt),'w') res = count_words_at_url(url) print("From module ",res) job = get_current_job() print('Current job: %s' % (job.id,)) f.write("Results are " + str(res)) f.close() return(res)
Sometimes jobs need to be repeated perodically, which can be done using the rq-scheduler as shown below:
# test_scheduler.py from redis import Redis from rq import Queue from rq_scheduler import Scheduler from datetime import datetime from my_module import wrapper, wrapper_na from datetime import timedelta scheduler = Scheduler(connection=Redis()) # Get a scheduler for the "default" queue job = scheduler.schedule(scheduled_time=datetime.utcnow(), func=wrapper, args=["http://nvie.com"], repeat=2, interval=60) print("Enqueued job ",job)
Debugging issues with RQ
As mentioned above, you can check the status of supervisor with
which should give you a list of running services that indicate the status
myscheduler:"RQscheduler" RUNNING pid 32688, uptime 225 days, 21:17:14 myworker:"RQworker" RUNNING pid 32687, uptime 225 days, 21:17:14
You can start or stop a service using
supervisorctl stop RQscheduler supervisorctl start RQscheduler
Log files for usually be found at a location such as