如何使用redis构建异步任务处理程序

November 3rd 2015  | Tags: python, redis

2015-11-03

如果一些任务没必要马上知道结果,可以将其放入队列中,让后台处理程序去处理,这同时也达到了异步的效果。本文围绕python的rq模块,介绍如何使用redis构建异步任务处理程序。

rq官方地址: http://python-rq.org/
Github: https://github.com/nvie/rq

本文分为以下几个部分:

  • 如何安装rq
  • 命令行工具的实现
  • rq如何在redis中存储数据
  • rq的设计思路
  • 关于rq的源码
  • python、redis基础

如何安装rq

方法1:

解压源码,将rq的目录放在PYTHONPATH变量里。然后:

export PYTHONPATH=$PYTHONPATH:/home/letian/Desktop/rq/

方法2:

$ sudo python setup.py install

方法3:

该方法是将rq安装到当前用户目录,我也是采用的该方法:

$ python setup.py install --user
...
Installed /home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg
Processing dependencies for rq==0.5.6
...

另外,rq会在$HOME/.local/bin安装几个命令行工具(rq、rqinfo、rqworker),所以需要:

$ export PATH=$PATH:$HOME/.local/bin

注意,rq对click包有依赖,如果安装rq时出现问题,可以先安装click

相关资料

Installing Python Modules
What is the simplest way to do a user-local install of a python package?
How to install python modules without root access?

命令行工具的实现

我们看一下命令行工具的实现:

首先rq源码中的setup.py中有以下内容:

entry_points={
    'console_scripts': [
        'rq = rq.cli:main',

        # NOTE: rqworker/rqinfo are kept for backward-compatibility,
        # remove eventually (TODO)
        'rqinfo = rq.cli:info',
        'rqworker = rq.cli:worker',
    ],
},

很容易看懂,rq就是执行rq.cli模块下main函数,另外两个类似。

查看rqworker的代码:

$ cd ~/.local/bin
letian@myhost:~/.local/bin

$ cat rqworker 
# !/usr/bin/python
# EASY-INSTALL-ENTRY-SCRIPT: 'rq==0.5.6','console_scripts','rqworker'
__requires__ = 'rq==0.5.6'
import sys
from pkg_resources import load_entry_point

if __name__ == '__main__':
    sys.exit(
        load_entry_point('rq==0.5.6', 'console_scripts', 'rqworker')()
    )

其中rq=0.5.6代表着包site-packages/rq-0.5.6-py2.7.egg,在这里,实际位置是~/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg。进入该目录,查看EGG-INFO/entry_points.txt的内容:

$ cat EGG-INFO/entry_points.txt 
[console_scripts]
rq = rq.cli:main
rqinfo = rq.cli:info
rqworker = rq.cli:worker

相关资料:
pkg_resources—-Entry Points为程序提供扩展点
Package Discovery and Resource Access using pkg_resources

rq如何在redis中存储数据

编写代码

按照rq官网给的例子写代码:

建立项目目录python-code和文件,结构如下:

$ tree python-code/
python-code/
├── my_module.py
└── test.py

my_module.py内容如下:

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

test.py内容如下:

from redis import Redis
from rq import Queue
from my_module import count_words_at_url

q = Queue(connection=Redis())
result = q.enqueue(count_words_at_url, 'http://www.baidu.com')
print result

执行test.py

$ python test.py 
<Job ca82af29-5744-4d62-afe0-3cba45b2d31d: my_module.count_words_at_url('http://www.baidu.com')>

查看redis

现在看下redis:

$ redis-cli 
127.0.0.1:6379> KEYS *
1) "rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d"
2) "rq:queue:default"
3) "rq:queues"

出现三个键值对。

看看rq:queues有什么:

127.0.0.1:6379> TYPE rq:queues
set
127.0.0.1:6379> SMEMBERS rq:queues
1) "rq:queue:default"

rq:queues是一个集合,保存着有哪些队列。

rq:queue:default就是当一个队列来用了:

127.0.0.1:6379> TYPE rq:queue:default
list
127.0.0.1:6379> LRANGE rq:queue:default 0 12
1) "ca82af29-5744-4d62-afe0-3cba45b2d31d"

default队列中有一个任务,任务的标识是ca82af29-5744-4d62-afe0-3cba45b2d31d,这正好和rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d对应。

接着看看这个job里有什么:

127.0.0.1:6379> TYPE rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d
hash
127.0.0.1:6379> HGETALL rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d
 1) "origin"
 2) "default"
 3) "status"
 4) "queued"
 5) "created_at"
 6) "2015-11-03T02:42:35Z"
 7) "enqueued_at"
 8) "2015-11-03T02:42:35Z"
 9) "data"
10) "\x80\x02(X\x1c\x00\x00\x00my_module.count_words_at_urlq\x01NU\x14http://www.baidu.comq\x02\x85q\x03}q\x04tq\x05."
11) "description"
12) "my_module.count_words_at_url('http://www.baidu.com')"
13) "timeout"
14) "180"

这个job的信息是以hash形式存储的,奇数行是key,偶数是value。我们看下data的内容:

$ ipython
>>> aa
'\x80\x02(X\x1c\x00\x00\x00my_module.count_words_at_urlq\x01NU\x14http://www.baidu.comq\x02\x85q\x03}q\x04tq\x05.'
>>> import pickle
>>> pickle.loads(aa)
(u'my_module.count_words_at_url', None, ('http://www.baidu.com',), {})

不在项目目录中执行rqworker

$ rqworker 
13:38:17 RQ worker u'rq:worker:myhost.10179' started, version 0.5.6
13:38:17 
13:38:17 *** Listening on default...
13:38:17 default: my_module.count_words_at_url('http://www.baidu.com') (ca82af29-5744-4d62-afe0-3cba45b2d31d)
13:38:17 ImportError: No module named my_module
Traceback (most recent call last):
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/worker.py", line 568, in perform_job
    rv = job.perform()
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 495, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 206, in func
    return import_attribute(self.func_name)
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/utils.py", line 150, in import_attribute
    module = importlib.import_module(module_name)
  File "/usr/lib/python2.7/importlib/__init__.py", line 37, in import_module
    __import__(name)
ImportError: No module named my_module
Traceback (most recent call last):
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/worker.py", line 568, in perform_job
    rv = job.perform()
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 495, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py", line 206, in func
    return import_attribute(self.func_name)
  File "/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/utils.py", line 150, in import_attribute
    module = importlib.import_module(module_name)
  File "/usr/lib/python2.7/importlib/__init__.py", line 37, in import_module
    __import__(name)
ImportError: No module named my_module
13:38:17 Moving job to u'failed' queue
13:38:17 
13:38:17 *** Listening on default...

执行失败,因为No module named my_module

注意,rqworker输出的第一行中rq:worker:myhost.10179是该worker的标识。

我们看一下redis中内容的变化:

127.0.0.1:6379> KEYS *
1) "rq:queue:failed"
2) "rq:workers"
3) "rq:queues"
4) "rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d"
5) "rq:worker:myhost.10179"

新增加了一个rq:queue:failed队列,rq:workers集合。

查看rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d的变化:

127.0.0.1:6379> HGETALL rq:job:ca82af29-5744-4d62-afe0-3cba45b2d31d
 1) "ttl"
 2) "-1"
 3) "created_at"
 4) "2015-11-03T02:42:35Z"
 5) "ended_at"
 6) "2015-11-03T05:38:17Z"
 7) "origin"
 8) "default"
 9) "enqueued_at"
10) "2015-11-03T02:42:35Z"
11) "exc_info"
12) "Traceback (most recent call last):\n  File \"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/worker.py\", line 568, in perform_job\n    rv = job.perform()\n  File \"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py\", line 495, in perform\n    self._result = self.func(*self.args, **self.kwargs)\n  File \"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/job.py\", line 206, in func\n    return import_attribute(self.func_name)\n  File \"/home/letian/.local/lib/python2.7/site-packages/rq-0.5.6-py2.7.egg/rq/utils.py\", line 150, in import_attribute\n    module = importlib.import_module(module_name)\n  File \"/usr/lib/python2.7/importlib/__init__.py\", line 37, in import_module\n    __import__(name)\nImportError: No module named my_module\n"
13) "data"
14) "\x80\x02(X\x1c\x00\x00\x00my_module.count_words_at_urlq\x01NU\x14http://www.baidu.comq\x02\x85q\x03}q\x04tq\x05."
15) "description"
16) "my_module.count_words_at_url('http://www.baidu.com')"
17) "timeout"
18) "180"
19) "status"
20) "failed"

可以看到出现了新的键值对,例如ended_atexc_infostatus的值也发生了变化:由queued变成了failed

查看rq:workers中的内容:

127.0.0.1:6379> TYPE rq:workers
set
127.0.0.1:6379> SMEMBERS rq:workers
1) "rq:worker:myhost.10179"

有一个worker,其中myhost是hostname,10179是该worker的pid,查看其信息:

127.0.0.1:6379> TYPE rq:worker:myhost.10179
hash

127.0.0.1:6379> HGETALL rq:worker:myhost.10179
1) "birth"
2) "2015-11-03T05:38:17Z"
3) "queues"
4) "default"
5) "state"
6) "idle"
7) "current_job"
8) "ca82af29-5744-4d62-afe0

查看队列rq:queue:default中的内容:

127.0.0.1:6379> LRANGE rq:queue:default 0 12
(empty list or set)

查看队列rq:queue:failed中的内容:

127.0.0.1:6379> LRANGE rq:queue:failed 0 12
1) "ca82af29-5744-4d62-afe0-3cba45b2d31d"

应该这样执行

把上面运行的rqworker停掉,清空redis:

127.0.0.1:6379> FLUSHALL
OK

重新在redis中添加job:

$ python test.py 
<Job 4122eed3-d521-4738-8cf6-20d9501d0ab0: my_module.count_words_at_url('http://www.baidu.com')>

然后,在项目目录下执行rqworker

$ rqworker 
20:17:21 RQ worker u'rq:worker:myhost.27283' started, version 0.5.6
20:17:21 
20:17:21 *** Listening on default...
20:17:21 default: my_module.count_words_at_url('http://www.baidu.com') (4122eed3-d521-4738-8cf6-20d9501d0ab0)
20:17:22 Job OK

查看redis:

# rqworker运行之前
127.0.0.1:6379> KEYS *
1) "rq:job:4122eed3-d521-4738-8cf6-20d9501d0ab0"
2) "rq:queue:default"
3) "rq:queues"

# rqworker运行之后
127.0.0.1:6379> KEYS *
1) "rq:worker:myhost.27283"
2) "rq:workers"
3) "rq:finished:default"
4) "rq:queues"
5) "rq:job:4122eed3-d521-4738-8cf6-20d9501d0ab0"

127.0.0.1:6379> HGETALL rq:job:4122eed3-d521-4738-8cf6-20d9501d0ab0
 1) "ttl"
 2) "-1"
 3) "created_at"
 4) "2015-11-03T12:17:12Z"
 5) "ended_at"
 6) "2015-11-03T12:17:22Z"
 7) "origin"
 8) "default"
 9) "enqueued_at"
10) "2015-11-03T12:17:12Z"
11) "result"
12) "\x80\x02K-."
13) "data"
14) "\x80\x02(X\x1c\x00\x00\x00my_module.count_words_at_urlq\x01NU\x14http://www.baidu.comq\x02\x85q\x03}q\x04tq\x05."
15) "timeout"
16) "180"
17) "description"
18) "my_module.count_words_at_url('http://www.baidu.com')"
19) "status"
20) "finished"

# zset是有序集合
127.0.0.1:6379> TYPE rq:finished:default
zset

127.0.0.1:6379> ZRANGE rq:finished:default 0 -1
1) "4122eed3-d521-4738-8cf6-20d9501d0ab0"

可以看到job的statusfinishedresult\x80\x02K-.,用pickle处理这个值:

>>> import pickle
>>> pickle.loads("\x80\x02K-.")
45

注意,可以同时执行多个rqworker

rq的设计思路

redis中如何存储一个job

一个job,在这里就是一个函数或者可执行对象。

job的id由uuid模块生成,加上固定前缀rq:job:,作为该job在redis中的key。value是hash类型,同样以键值对保存该job的信息。一个job的信息包括:入队时间,job使用的函数(或者可执行对象)本身的名称、所属的module名称,函数(或者可执行对象)的参数,状态(排队中、完成、失败等),执行结果等。这些信息有些是直接保存,有些用pickle模块处理之后才保存。

如何将job添加到队列

假设待处理job队列只有一个,即默认的rq:queue:default,这个队列名称同时也是它在redis中的键,对应的值是list类型,在这个list中记录job的id即可。

job如果执行失败,则将其记录到失败队列(即rq:queue:failed)中。如果执行成功,则记录到rq:finished:default对应的有序集合中。

rqworker命令如何工作

rqworker从给定的队列中取出一个job,提取job的信息,其中最重要的是:

  • job使用的函数(或者可执行对象)所属的module名称
  • job使用的函数(或者可执行对象)本身的名称
  • job使用的函数(或者可执行对象)的参数

job基本是我们自己写的,都是放在当前项目下的某个模块中,rqworker使用importlib库导入模块,使用getattr函数从模块中拿到job使用的函数(或者可执行对象),然后就可以执行任务了。也因为这个原因,rqworker命令需要在当前项目的目录中执行。

rqworker在处理完一个job后,将该job放入成功队列或者失败队列。

rqworker在执行job时候会fork一个子进程去处理job,但是父进程会等待子进程完成,所以认为rqworker是串行执行的。若要加快处理速度,可以执行多个rqworker。

如果要在多台主机上去处理一个任务队列执行rqworker,这些主机都应该部署项目源码。

job依赖

假设场景是job B需要在job A完成后再执行,那么:

  • 在job A中保存job B的的id,也就是在redis中保存依赖关系。
  • 如果job A的状态是已经完成,那么直接把job B放入待处理任务队列。
  • 如果job A的状态不是已经完成,把job B的状态设置为延迟(deferred)。
  • rqworker执行完job A后发现了job B对job A的依赖,将job B放入待处理队列。

上面的部分操作在rq的实现中用到了redis事务。

关于rq的源码

可以到github上下载rq的源码。源码中最主要的是三个文件:queue.pyworker.pyjob.py。 如果要阅读,不建议从细节下手,建议由顶向下。local.py中的代码也很有趣。 我对部分代码做了注释,放在了 https://github.com/letiantian/rq-source

python、redis基础

以下是阅读源码时候遇到的一些基本概念,很基础,但是我之前不了解/熟练。

python操作redis

$ sudo pip install redis
import redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
r.set('foo', 'bar')
r.get('foo')

示例2:

import redis
r = redis.StrictRedis(host='localhost', port=6300, db=0)
r.set(r, 'bar')
print r
print r.get(r)

运行结果输出:

StrictRedis<ConnectionPool<Connection<host=localhost,port=6300,db=0>>>
bar

看下redis:

$ redis-cli -h 127.0.0.1 -p 6300
127.0.0.1:6300> KEYS *
1) "StrictRedis<ConnectionPool<Connection<host=localhost,port=6300,db=0>>>"
2) "name"
127.0.0.1:6300> exit

python __slots__

默认情况下每个类都会有一个dict,通过dict访问,这个dict维护了这个实例的所有属性。slots的作用是阻止在实例化类时为实例分配dict。具体见http://blog.csdn.net/tianqio/article/details/2374086

python thread下的get_ident()函数

返回当前线程的标识,是一个非0的整数。

Return the ‘thread identifier’ of the current thread. This is a nonzero integer. Its value has no direct meaning; it is intended as a magic cookie to be used e.g. to index a dictionary of thread-specific data. Thread identifiers may be recycled when a thread exits and another thread is created.

python property函数

摘自http://blog.csdn.net/yatere/article/details/6658457

就是为类定义一个属性。

class C(object):
    def __init__(self): self._x = None
    def getx(self): print "get x";return self._x
    def setx(self, value): print "set x"; self._x = value
    def delx(self): print "del x";del self._x
    x = property(getx, setx, delx, "I'm the 'x' property.")

使用

>>> t=C()
>>> t.x
get x
>>> t.x="en"
set x
>>> print t.x
get x
en
>>> del t.x
del x
>>> t.x
get x

python property装饰器

class C(object):
    def __init__(self):
        self.__x = 10

    @property
    def x(self):
        return self.__x

if __name__ == "__main__":
    c = C()
    print c.x   # 10
    c.x = 10    # 报错 AttributeError: can’t set attribute

python all函数的用法

>>> print all.__doc__
all(iterable) -> bool

Return True if bool(x) is True for all values x in the iterable.
If the iterable is empty, return True.
>>> all([1==1, 'a' != 'b'])
True

function模块下的partial函数

https://docs.python.org/2/library/functools.html#functools.partial

举个例子,先看一下int函数的用法:

>>> print int.__doc__
int(x=0) -> int or long
int(x, base=10) -> int or long

Convert a number or string to an integer, or return 0 if no arguments
are given.  If x is floating point, the conversion truncates towards zero.
If x is outside the integer range, the function returns a long instead.

If x is not a number or if base is given, then x must be a string or
Unicode object representing an integer literal in the given base.  The
literal can be preceded by '+' or '-' and be surrounded by whitespace.
The base defaults to 10.  Valid bases are 0 and 2-36.  Base 0 means to
interpret the base from the string as an integer literal.
>>> int('0b100', base=0)
4
>>> from functools import partial
>>> basetwo = partial(int, base=2)
>>> basetwo.__doc__ = 'Convert base 2 string to an int.'
>>> basetwo('10010')
18

function模块下的total_ordering数

https://docs.python.org/2/library/functools.html#functools.total_ordering

是一个装饰器,使相同类的对象之间可以使用各种比较符号。

python相对导入和绝对导入

Python Relative and Absolute Import

python signal

SIG_DFL:默认信号处理程序。
SIG_IGN:忽略信号的处理程序。
SIGALRM是在定时器终止时发送给进程的信号。
signal.alarm函数。将signal文档,signal — Set handlers for asynchronous events

signal.alarm(time)
If time is non-zero, this function requests that a SIGALRM signal be sent to the process in time seconds. Any previously scheduled alarm is canceled (only one alarm can be scheduled at any time). The returned value is then the number of seconds before any previously set alarm was to have been delivered. If time is zero, no alarm is scheduled, and any scheduled alarm is canceled. If the return value is zero, no alarm is currently scheduled. (See the Unix man page alarm(2).) Availability: Unix.

python from contextlib import contextmanager

python中关于with及contextlib的用法

示例:

from contextlib import contextmanager

@contextmanager
def make_context() :
    print 'enter'
    try :
        yield {}
    except RuntimeError, err :
        print 'error' , err
    finally :
        print 'exit'

with make_context() as value :
    print value

运行结果:

enter
{}
exit

python __all__

__all__可用于模块导入时限制,如:
from module import *
此时被导入模块若定义了all属性,则只有all内指定的属性、方法、类可被导入~
若没定义,则模块内的所有将被导入。

http://leequery.blog.163.com/blog/static/16842209620117184345180/

python logging.config.dictConfig

https://docs.python.org/2/library/logging.config.html#logging.config.dictConfig

python: complete example of dict for logging.config.dictConfig?

python 类方法、静态方法

Python中类方法和静态方法

python importlib

用于导入字符串指定的模块。
importlib – Convenience wrappers for __import__()

python __name__和__module__

>>> import pickle as pk
>>> pk.load.__name__
'load'
>>> pk.load.__module__
'pickle'
>>> int.__module__
'__builtin__'
>>> __builtin__.int(1.23)
1

python inspect模块

inspect — Inspect live objects
Why is there a difference between inspect.ismethod and inspect.isfunction from python 2 -> 3?


redis pipeline

Redis 新特性—pipeline(管道)
Redis编程实践【pipeline和事务】

redis 事务

Redis编程实践【pipeline和事务】

(完)