dify-run / api /extensions /ext_celery.py
Severian's picture
initial commit
a8b3f00
raw
history blame contribute delete
2.71 kB
from datetime import timedelta
from celery import Celery, Task
from celery.schedules import crontab
from flask import Flask
from configs import dify_config
def init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
broker_transport_options = {}
if dify_config.CELERY_USE_SENTINEL:
broker_transport_options = {
"master_name": dify_config.CELERY_SENTINEL_MASTER_NAME,
"sentinel_kwargs": {
"socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT,
},
}
celery_app = Celery(
app.name,
task_cls=FlaskTask,
broker=dify_config.CELERY_BROKER_URL,
backend=dify_config.CELERY_BACKEND,
task_ignore_result=True,
)
# Add SSL options to the Celery configuration
ssl_options = {
"ssl_cert_reqs": None,
"ssl_ca_certs": None,
"ssl_certfile": None,
"ssl_keyfile": None,
}
celery_app.conf.update(
result_backend=dify_config.CELERY_RESULT_BACKEND,
broker_transport_options=broker_transport_options,
broker_connection_retry_on_startup=True,
)
if dify_config.BROKER_USE_SSL:
celery_app.conf.update(
broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration
)
celery_app.set_default()
app.extensions["celery"] = celery_app
imports = [
"schedule.clean_embedding_cache_task",
"schedule.clean_unused_datasets_task",
"schedule.create_tidb_serverless_task",
"schedule.update_tidb_serverless_status_task",
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
beat_schedule = {
"clean_embedding_cache_task": {
"task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",
"schedule": timedelta(days=day),
},
"clean_unused_datasets_task": {
"task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",
"schedule": timedelta(days=day),
},
"create_tidb_serverless_task": {
"task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task",
"schedule": crontab(minute="0", hour="*"),
},
"update_tidb_serverless_status_task": {
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
"schedule": crontab(minute="30", hour="*"),
},
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
return celery_app