扫码关注官方订阅号
走同样的路,发现不同的人生
# coding: utf-8 from celery import Celery from flask import Flask def make_celery(app): celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery app = Flask(__name__) app.config.update( CELERY_BROKER_URL='amqp://guest@localhost//', CELERY_RESULT_BACKEND='amqp://guest@localhost//' ) celery = make_celery(app) @celery.task(name='add_together') def add_together(a, b): return a + b if __name__ == '__main__': result = add_together.delay(23, 42) result.wait()
由于你没有提供错误信息,只能改对之后告诉你了 调用celery -A test.celery worker --loglevel=info启动celery,然后用 python test.py调用脚本
celery -A test.celery worker --loglevel=info
celery
python test.py
微信扫码关注PHP中文网服务号
QQ扫码加入技术交流群
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号
PHP学习
技术支持
返回顶部
由于你没有提供错误信息,只能改对之后告诉你了
调用
celery -A test.celery worker --loglevel=info启动celery,然后用python test.py调用脚本