#最顶头加上from __future__ import absolute_import# celery settingsimport djcelerydjcelery.setup_loader()BROKER_URL = 'redis://localhost:6379'# BROKER_URL = 'redis://:密码@主机地址:端口号/数据库号'CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'CELERY_RESULT_BACKEND = 'redis://localhost:6379'CELERY_ACCEPT_CONTENT = ['application/json']CELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json'CELERYD_MAX_TASKS_PER_CHILD = 40CELERY_TIMEZONE = 'Asia/Shanghai'INSTALLED_APPS = [ 'djcelery',# 添加djcelery]
from __future__ import absolute_import, unicode_literalsfrom djcelery.models import ( TaskState, WorkerState, PeriodicTask, IntervalSchedule, CrontabSchedule,)from xadmin.sites import sitesite.register(IntervalSchedule) # 存储循环任务设置的时间site.register(CrontabSchedule) # 存储定时任务设置的时间site.register(PeriodicTask) # 存储任务site.register(TaskState) # 存储任务执行状态site.register(WorkerState) # 存储执行任务的worker
# __init__.pyfrom __future__ import absolute_importfrom .celery import app as celery_app# celery.pyfrom __future__ import absolute_importimport osfrom celery import Celery, platformsfrom django.conf import settings# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hermes.settings')# hermes主应用名app = Celery('hermes')platforms.C_FORCE_ROOT = Trueapp.config_from_object('django.conf:settings')app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)@app.task(bind=True)def debug_task(self): print('Request: {0!r}'.format(self.request))
from __future__ import absolute_importfrom celery import taskimport timefrom .channels import Cache_data_to_redis# 更新指定日期数据到sms_organizationcount@taskdef readAndWrite(begin,end): begin = str(begin)[:4] + '-' + str(begin)[4:6] + '-' + str(begin)[6:8] end = str(end)[:4] + '-' + str(end)[4:6] + '-' + str(end)[6:8] i = 0 begin_time = time.time() read = Cache_data_to_redis().connection Rcursor = read.cursor() query = "SELECT id from sms_organizationcount WHERE alia_date_time between '" query += begin query += "' and '" query += end query += "'" readSql = "SELECT alia_month_time, alia_date_time, count(*) as total_nums, count(t.`status`=2 or null) as error_nums, name FROM / (select *, DATE_FORMAT(req_time,'%Y-%m') as alia_month_time, DATE_FORMAT(req_time,'%Y-%m-%d') as alia_date_time, / LEFT(body,LOCATE('】',body)) as name from sms_smslog where LOCATE('】',body) >0 / and LEFT(body,1)='【' and DATE_FORMAT(req_time,'%Y-%m-%d') between '" readSql += begin readSql += "' and '" readSql += end readSql += "')" readSql += " as t GROUP BY alia_date_time , name;" Rcursor.execute(readSql) readResult = Rcursor.fetchall() Rcursor.execute(query) query_result = Rcursor.fetchall() deleteSql = "delete from sms_organizationcount where alia_date_time between '%s' and '%s'" % (begin,end) if query_result: delete_record = Cache_data_to_redis().connection Dcursor = delete_record.cursor() Dcursor.execute(deleteSql) delete_record.commit() delete_record.close() for value in readResult: write = Cache_data_to_redis().connection Wcursor = write.cursor() writeSql = "INSERT into sms_organizationcount (alia_month_time, alia_date_time, total_nums, error_nums, `name`) " / " VALUES ('%s', '%s', '%s', '%s', '%s' )" %/ (value['alia_month_time'], value['alia_date_time'], value['total_nums'], value['error_nums'], value['name']) try: Wcursor.execute(writeSql) i += 1 write.commit() except: write.rollback() write.close() read.close() end_time = time.time() pass_time = end_time - begin_time return i, pass_time