Celery是一个使用Python开发的分布式任务调度模块,因此对于大量使用Python构建的系统,使用起来方便。 Celery目前爸爸4.x,仅支持Django 1.8以上版本。 Celery 3.1只可以支持Django1.8一下版本。
Celery官网http://www.celeryproject.org/
Celery帮助文档http://www.celeryproject.org/docs-and-support/
优点:
简单:调用接口简单易用高可用:客户端、Worker之间连接自动重试,Broker自身支持HA快速:单个Celery进程每分钟可用数以百万计的任务灵活:Celery的每一个部分都能扩展,或直接使用,或用户自己定义。Celery库使用前,必须初始化,所得示例叫做"应用application或app"。应用是线程安全的。不同应用在同一进程中,可以使用不同拍照、不同组件、不同结果
from celery import Celery app = Celery('mytask') print(app) @app.task def add(x,y): return x+y print(add.name) #mytask.add print(add) print(app.tasks) print(app.conf) print(*list(app.conf.items()),sep = '\n') 默认使用amqp链接到本地amqp://guest:**@localhost:5672//本次使用Redis注意:上面代码执行,使用add.delay等加入任务到Redis中,在启动celery命令消费Redis的任务,执行并返回结果到Redis中。
添加任务的常用方法
T.delay(arg,kwarg=value) #快捷添加任务的方法 T:被app.task装饰的函数args :位置参数kwargs:关键字参数 T.apply_async((args,),{‘kwarg’:value},countdown=?,expires=?) args 位置参数,是个元组kwargs 关键字参数,是个字典contdown 延迟多久执行,默认秒expires 多久后过期,默认秒执行任务:如果在Linux下能出现下面问题,可如下配置
from celery import platforms # Linux下,默认不允许root用户启动celery,可使用下面的配置 platforms.C_FORCE_ROOT = True使用命令执行Redis中的任务(默认在pycharm控制台中使用)
$ celery -A test1 worker --loglevel=INFO --concurrency=5 -n worker1@%n
-A APP, --app App指定app名称,App是模块名–loglevel 指定日志级别-n 名称,%n指主机名–concurrency 指定并发多进程数,缺省值是CPU数windows下可能出现下面问题.
[2019-07-25 17:09:29,477: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)') Traceback (most recent call last): File "d:\mypythonuse\mygitpythonthree\venv\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "d:\mypythonuse\mygitpythonthree\venv\lib\site-packages\celery\app\trace.py", line 544, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0) [2019-07-25 17:09:30,354: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)') Traceback (most recent call last): File "d:\mypythonuse\mygitpythonthree\venv\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "d:\mypythonuse\mygitpythonthree\venv\lib\site-packages\celery\app\trace.py", line 544, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0) 安装eventlet解决问题pip install eventlet
包括win10,建议还是按照eventlet 重新执行任务
celery -A test1 worker -p eventlet --loglevel=INFO --concurrency=5 -n worker1@%n
-P ,–pool 指定进程池实现,默认prefork,windows下使用eventlet运行日志如下 (venv) D:\MyPythonUse\MyGitPythonThree>celery -A text worker -P eventlet --loglevel=INFO --concurrency=4 -n worker1%n -------------- celery@worker1gdy v4.4.0rc2 (cliffs) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 2019-07-25 17:39:37 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: mytask:0x1a93ee6cc18 - ** ---------- .> transport: redis://192.168.61.109:6379/0 - ** ---------- .> results: redis://192.168.61.109:6379/1 - *** --- * --- .> concurrency: 4 (eventlet) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . my_add [2019-07-25 17:39:37,834: INFO/MainProcess] Connected to redis://192.168.61.109:6379/0 [2019-07-25 17:39:37,859: INFO/MainProcess] mingle: searching for neighbors [2019-07-25 17:39:38,909: INFO/MainProcess] mingle: all alone [2019-07-25 17:39:38,963: INFO/MainProcess] celery@worker1gdy ready. [2019-07-25 17:39:38,981: INFO/MainProcess] pidbox: Connected to redis://192.168.61.109:6379/0. [2019-07-25 17:39:39,067: INFO/MainProcess] Received task: my_add[c3e8cfe7-e9d9-49c6-94ff-e6cf5f6a8d83] [2019-07-25 17:39:39,069: WARNING/MainProcess] start run add x=4,y=5 [2019-07-25 17:39:39,076: INFO/MainProcess] Received task: my_add[ac9bd504-c027-4c9a-9ab2-13c4e5c791f1] ETA:[2019-07-25 17:39:58.654164+08:00] [2019-07-25 17:39:44,070: WARNING/MainProcess] end run ret = 9 [2019-07-25 17:39:44,077: INFO/MainProcess] Task my_add[c3e8cfe7-e9d9-49c6-94ff-e6cf5f6a8d83] succeeded in 5.0s: 9 [2019-07-25 17:39:58,663: WARNING/MainProcess] start run add x=10,y=20 [2019-07-25 17:40:03,669: WARNING/MainProcess] end run ret = 30 [2019-07-25 17:40:03,672: INFO/MainProcess] Task my_add[ac9bd504-c027-4c9a-9ab2-13c4e5c791f1] succeeded in 5.0s: 30在Django全局目录中(settings.pys所在目录)
定义一个celery.py文件
""" author:xdd date:2019-07-25 20:21 """ from __future__ import absolute_import, unicode_literals import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blog.settings') #必须修改模块名 app = Celery("xdd") app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() app.conf.update( enable_utc=True, timezone="Asia/Shanghai" ) # 配置redis派发任务,任务存放地址 app.conf.broker_url = 'redis://192.168.61.109:6379/0' # 配置redis任务状态返回值数据库 app.conf.result_backend = 'redis://192.168.61.109:6379/1' # 如果超过visibility_timeout,Celery会认为此任务失败 # 会重分配其他worker执行该任务,这样会造成重复执行。visibility_timeout这个值大一些 # 注意,如果慢任务执行时长超过visibility_timeout依然会多执行 app.conf.broker_transport_options = {"visibility_timeout": 3600 * 12} # 12 hours修改__init__.py文件
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app __all__ = ('celery_app',)在user应用下
新建文件tasks.py中新建任务
""" author:xdd date:2019-07-25 20:32 """ from __future__ import absolute_import, unicode_literals from blog.celery import app from django.core.mail import send_mail from blog import settings import datetime @app.task(name="server_email") def email(active_url,email=[]): """发送邮件""" print("开始发送") send_mail( "激活邮件", "Here is the message", settings.EMAIL_HOST_USER, #服务器的发件箱 email,#目标,收件箱 fail_silently = False, html_message="点击此链接激活邮件<a href='{0}'>{0}</a> 时间:{1:%Y-%m-%d %H:%M:%S}".format(active_url,datetime.datetime.now()) ) print("发送完成")views.py视图函数中调用
from .tasks import email # 测试函数 def text(request): try: print("aa------- --------") email.delay("www.baidu.com",["1263351411@qq.com"]) return HttpResponse("发送成功",status=201) except Exception as e: logging.info(e) return JsonResponse({"error":"邮件发送失败"},status=400)以后放在注册函数中发送邮件。
注意:目前在Python3.7部署时,发邮件可能会报wrap_socket()错。可以使用python3.6版本
urls.py路由视图函数
""" author:xdd date:2019-07-17 22:09 """ from django.conf.urls import url from .views import reg,login,text,logout from django.http import HttpResponse from user.models import User urlpatterns = [ url(r'^$',reg), #用户注册 url(r'^login$',login), #用户登录 url(r'^logout$',logout), #用户登出 url(r'^text$',text), ]访问http://127.0.0.1:8000/users/text会调用text视图函数,执行email.delay(),会在redis中增加任务。
执行任务,下面命令会从redis中拿任务执行
celery -A blog -P eventlet worker --loglevel=INFO --concurrency=4 -n worker@%n
