Разработчики Django часто сталкиваются с проблемой, когда пользователь нажимает кнопку отправки формы и... ждёт. Секунды растягиваются в минуты, терпение иссякает, а интерфейс приложения замирает. Причина этой неприятной ситуации кроется в природе синхронных операций, которые блокируют основной поток исполнения, пока задача не будет полностью завершена. Обычный Django-проект выполняет все операции последовательно — это означает, что если, например, функция отправки электронной почты занимает 5 секунд, то пользователь вынужден ждать весь этот промежуток времени, прежде чем увидит результат своих действий. В небольших проектах с малым потоком пользователей эта задержка может быть незаметной, но когда приложение масштабируется, проблема становится критичной.
Здесь и может помочь Celery — распределённая очередь задач, которая позволяет выполнять операции асинхронно, вне основного потока Django. Это своего рода рабочий который берёт сложные и долгие задачи и выполняет их параллельно, освобождая основное приложение для обработки новых запросов.Celery решает целый спектр задач:- Отправка писем и сообщений.
- Обработка и трансформация изображений.
- Анализ текста и проверка данных.
- Работа с внешними API и веб-сервисами.
- Обработка и анализ больших массивов данных.
- Генерация отчётов и документов.
Что особенно ценно, Celery позволяет не только выполнять операции в фоновом режиме, но и планировать их на определённое время, создавая периодические задачи. Конечно, Celery не является единственным решением для асинхронных операций в Django. Среди альтернатив можно выделить:
Huey — простая и лёгкая библиотека для организации очередей задач, которая отлично работает на Windows (в отличие от Celery, которая с версии 4 потеряла официальную поддержку Windows).
Django-RQ — очередь задач на основе Redis, предлагающая минималистичный интерфейс и простту настройки.
Dramatiq — относительно новое решение, ориентированное на надёжность и производительность.
Django Channels — хотя это решение создано в первую очередь для WebSockets, оно может использоваться и для асинхронной обработки задач.
Выбор конкретного инструмента зависит от требований проекта, но Celery зарекомендовал себя как зрелое и проверенное временем решение с богатым функционалом и хорошей интеграцией с Django. Особенно полезно это сочетание для проектов, где требуется обрабатывать ресурсоёмкие задачи без ущерба для пользовательского опыта.
Основы работы Celery
Celery представляет собой распределенную систему для обработки больших объемов сообщений, способную обрабатывать миллионы задач в минуту с минимальными задержками.
Архитектура Celery
Архитектура Celery состоит из 4 основных компонентов:
1. Производители задач (Producers) — это части кода, которые создают задачи и отправляют их в очередь. В контексте Django такими производителями обычно выступают представления (views) или другие компоненты приложения.
2. Брокер сообщений (Message Broker) — промежуточное хранилище, которое принимает задачи от производителей и передает их исполнителям. Брокер действует как посредник между вашим Django-приложением и процессами Celery.
3. Исполнители задач (Workers) — процессы, которые постоянно проверяют очереди задач и выполняют поступающие задания. Они работают независимо от основного приложения, могут быть запущены на том же сервере или распределены по нескольким машинам.
4. База результатов (Results Backend) — опциональный компонент, который хранит информацию о выполненных задачах и их результатах. Это особенно полезно, если ваше приложение должно отслеживать статус задач или использовать их результаты.
Такая архитектура обеспечивает высокую степень масштабируемости и отказоустойчивости. Вы можете добавлять новых исполнителей по мере роста нагрузки, а при выходе из строя одного из них задачи перераспределяются между остальными.
Брокеры сообщений и их выбор
Брокер сообщений играет ключевую роль в работе Celery, поэтому его выбор критически важен. Наиболее популярные варианты:
Redis — быстрый хранилище данных в памяти, который часто используется как брокер для Celery благодаря своей скорости и простоте настройки. Redis может выступать одновременно как брокер сообщений и как хранилище результатов, что упрощает инфраструктуру проекта.
Python
Скопировано | 1
2
3
| # Пример настройки Redis как брокера и хранилища результатов
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' |
|
RabbitMQ — специализированный брокер сообщений, реализующий протокол AMQP. Считается наиболее надежным выбором для производственной среды, особенно при высоких нагрузках. RabbitMQ обеспечивает гарантированную доставку сообщений даже в случае сбоев.
Python
Скопировано | 1
2
| # Пример настройки RabbitMQ
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//' |
|
Amazon SQS, Azure Service Bus или Apache Kafka — облачные сервисы и решения корпоративного уровня, которые могут использоваться в качестве брокеров для Celery в крупных распределенных системах. При выборе брокера стоит учитывать:- Объем и характер задач.
- Требования к надежности доставки сообщений.
- Инфраструктурные ограничения.
- Опыт команды с конкретными технологиями.
Для большинства проектов на Django начальный выбор часто падает на Redis из-за его простоты и универсальности.
Установка и базовая настройка
Установка Celery достаточно проста и выполняется через pip:
Для работы с Redis дополнительно потребуется установить клиентскую библиотеку:
После установки пакетов необходимо настроить экземпляр Celery в вашем Django-проекте. Обычно это делается путем создания файла celery.py в директории проекта, рядом с settings.py :
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # myproject/celery.py
import os
from celery import Celery
# Устанавливаем переменную окружения для настроек Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Создаем экземпляр Celery
app = Celery('myproject')
# Загружаем настройки из settings.py (с префиксом CELERY_)
app.config_from_object('django.conf:settings', namespace='CELERY')
# Автоматически обнаруживаем задачи в приложениях Django
app.autodiscover_tasks() |
|
Затем необходимо обновить файл __init__.py вашего проекта, чтобы гарантировать, что приложение Celery загружается при запуске Django:
Python
Скопировано | 1
2
3
4
| # myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',) |
|
Сериализация данных в Celery: возможности и ограничения
Когда вы передаете задачу в Celery, данные должны быть сериализованы для передачи через брокер сообщений. По умолчанию Celery использует JSON для сериализации, но также поддерживает другие форматы:
JSON — наиболее универсальный формат, но имеет ограничения по типам данных,
Pickle — поддерживает большинство Python-объектов, но менее безопасен,
YAML — альтернатива JSON с более широкими возможностями,
MessagePack — бинарный формат, более компактный, чем JSON.
Выбор сериализатора настраивается в конфигурации Celery:
Python
Скопировано | 1
2
3
4
| # Пример настройки сериализации в settings.py
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json' |
|
При использовании Celery важно помнить, что не все типы данных могут быть корректно сериализованы. Особую осторожность следует проявлять с:- Объектами Django ORM (модели, QuerySets).
- Сложными вложенными структурами.
- Функциями и методами.
- Файловыми объектами и дескрипторами.
Чтобы избежать проблем с сериализацией, рекомендуется передавать в задачи только простые типы данных: числа, строки, списки, словари и т.п. Если необходимо работать с моделями Django, лучше передавать их идентификаторы, а не сами объекты.
Python
Скопировано | 1
2
3
4
5
6
7
8
9
| # Неправильно: передача объекта модели
def process_user_task(user):
# ...
# Правильно: передача идентификатора
def process_user_task(user_id):
from myapp.models import User
user = User.objects.get(id=user_id)
# ... |
|
Режимы выполнения задач: eager и асинхронный
Celery предлагает два основных режима выполнения задач, которые определяют поведение системы при обработке заданий:
Асинхронный режим — стандартный способ работы Celery в производственной среде. Задачи отправляются в очередь через брокер сообщений и выполняются отдельными процессами-воркерами. Этот режим обеспечивает подлинную асинхронность и масштабируемость:
Python
Скопировано | 1
2
3
4
5
| # Отправка задачи в асинхронном режиме
from myapp.tasks import process_data
result = process_data.delay(arg1, arg2)
# или с дополнительными параметрами
result = process_data.apply_async(args=[arg1, arg2], countdown=10) |
|
Eager режим (нетерпеливый) — в этом режиме задачи выполняются синхронно в текущем процессе, без использования брокера и воркеров. Особенно полезен для тестирования и отладки:
Python
Скопировано | 1
2
3
| # Настройка eager режима в settings.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True # Распространение исключений |
|
Переключение между режимами может осуществляться глобально через настройки или локально для отдельных задач:
Python
Скопировано | 1
2
3
4
| # Временное включение eager режима для конкретного вызова
with app.conf.eager_propagates_exceptions.override(True):
with app.conf.task_always_eager.override(True):
task.delay() |
|
При разработке приложения типичной практикой является использование eager режима в среде разработки и тестирования, переключаясь на асинхронный режим в продакшн-окружении.
Стратегии ретрая и обработки ошибок в брокерах сообщений
Одно из ключевых преимуществ Celery — возможность настраивать механизмы повторных попыток (retry) при возникновении ошибок. В распределенных системах отказы неизбежны, и грамотная стратегия их обработки критически важна.
Базовая стратегия повторных попыток настраивается при определении задачи:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
import logging
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_payment(self, payment_id):
try:
# Код обработки платежа
payment_gateway.process(payment_id)
except PaymentGatewayError as exc:
try:
# Логирование ошибки
logging.error(f"Payment error: {exc}")
# Повторная попытка через 60 секунд
raise self.retry(exc=exc)
except MaxRetriesExceededError:
# Действия после исчерпания попыток
notify_admin.delay(f"Payment {payment_id} failed after retries")
raise |
|
В этом примере:
bind=True — делает доступным объект задачи через self ,
max_retries=3 — ограничивает количество повторных попыток,
default_retry_delay=60 — устанавливает задержку между попытками в секундах.
Для более сложных сценариев Celery поддерживает экспоненциальную задержку между попытками:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
| @shared_task(bind=True, max_retries=5)
def fetch_api_data(self, endpoint):
try:
response = requests.get(endpoint, timeout=5)
response.raise_for_status()
return response.json()
except (requests.RequestException, ValueError) as exc:
# Экспоненциальная задержка: 2^retry * базовая_задержка
retry_backoff = 2 ** self.request.retries * 30
# Случайный "джиттер" для предотвращения "эффекта громады"
retry_jitter = random.uniform(0, 10)
retry_delay = retry_backoff + retry_jitter
raise self.retry(exc=exc, countdown=int(retry_delay)) |
|
Такой подход позволяет избежать ситуации, когда после сбоя все воркеры одновременно пытаются выполнить повторы, создавая новую волну нагрузки на систему.
Настройка запуска Celery в production-окружениях
В производственной среде правильная настройка Celery критически важна для обеспечения стабильности и эффективности работы.
Запуск воркеров с Supervisor или Systemd
Для надежного управления процессами Celery рекомендуется использовать менеджеры процессов, такие как Supervisor или Systemd:
Пример конфигурации Supervisor (/etc/supervisor/conf.d/celery.conf ):
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
| [program:celery]
command=/path/to/venv/bin/celery -A myproject worker --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600 |
|
Настройка параллелизма и конкурентности
Celery предлагает различные модели конкурентности:- prefork (процессы, по умолчанию).
- eventlet (сопрограммы).
- gevent (сопрограммы).
- solo (один процесс).
Выбор зависит от характера ваших задач:
Bash
Скопировано | 1
2
3
4
5
| # Запуск с 8 процессами (prefork)
celery -A myproject worker --concurrency=8 --loglevel=INFO
# Запуск с eventlet (для задач с I/O-bound операциями)
celery -A myproject worker --concurrency=100 --pool=eventlet --loglevel=INFO |
|
Управление ресурсами
Для предотвращения перегрузки системы можно настроить ограничения по памяти и времени выполнения:
Python
Скопировано | 1
2
3
| # Ограничения в настройках Celery
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # Рестарт воркера после 100 задач
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 200000 # 200MB |
|
Безопасность
В производственной среде важно обеспечить безопасность:
Python
Скопировано | 1
2
3
4
5
| # Защита от подделки сообщений
CELERY_BROKER_URL = 'redis://:strong_password@redis.example.com:6379/0'
# Ограничение типов сериализации
CELERY_ACCEPT_CONTENT = ['json'] # Отключаем потенциально опасный pickle |
|
Мониторинг
Для отслеживания состояния системы в production настройте интеграцию с системами мониторинга:
Python
Скопировано | 1
2
3
| # Настройка интеграции с Prometheus
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True |
|
Также рекомендуется использовать инструменты визуализации, такие как Flower или Celery Exporter, для наблюдения за работой очередей и исполнителей в реальном времени.
От Celery к RabbitMQ или от RabbitMQ к Celery ? Всем день добрый.
Отработать данные чисто в Rabbit получилось без труда.
Но, разбирая работу... Django и celery shared_task Как запустить shared_task в асинхронном режиме?
Есть у меня celery таска
from celery import... Celery + Django запуск задач по расписанию Юзаю чистый celery (без django-celery-beat).
Конфиг:
app.conf.beat_schedule = {
... Django+celery получение результатов фоновых задач Добрый день!
Из views.py запускаю ресурсозатратные отчеты, результаты отображаю в html или...
Интеграция с Django
После знакомства с основами Celery пришло время интегрировать его в Django-проект. Интеграция Celery с Django не представляет особой сложности, но требует внимания к деталям и понимания принципов взаимодействия этих технологий.
Настройка Django-проекта для работы с Celery
Первый шаг — добавление необходимых настроек в файл settings.py вашего проекта. После установки Celery и выбранного брокера сообщений (в нашем примере будет использоваться Redis) нужно задать ключевые параметры:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
| [H2]settings.py[/H2]
# Настройки Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0' # URL брокера сообщений
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Хранилище результатов
# Дополнительные настройки (опционально)
CELERY_ACCEPT_CONTENT = ['json'] # Разрешенные форматы сериализации
CELERY_TASK_SERIALIZER = 'json' # Формат сериализации задач
CELERY_RESULT_SERIALIZER = 'json' # Формат сериализации результатов
CELERY_TIMEZONE = 'Europe/Moscow' # Временная зона для планировщика задач |
|
Следующий этап — создание файла celery.py в корневой директории проекта (рядом с файлом settings.py ). Этот файл содержит конфигурацию Celery-приложения:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # myproject/celery.py
import os
from celery import Celery
# Устанавливаем переменную окружения с настройками Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Создаем экземпляр приложения Celery
app = Celery('myproject')
# Загружаем настройки из Django settings (с префиксом CELERY_)
app.config_from_object('django.conf:settings', namespace='CELERY')
# Автоматически находим задачи в приложениях Django
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}') |
|
Чтобы убедиться, что Celery загружается при запуске Django, нужно модифицировать файл __init__.py проекта:
Python
Скопировано | 1
2
3
4
5
6
7
| # myproject/__init__.py
from __future__ import absolute_import, unicode_literals
# Импортируем Celery-приложение
from .celery import app as celery_app
__all__ = ('celery_app',) |
|
Теперь Django будет автоматически искать файлы tasks.py в каждом зарегистрированном приложении. Именно в этих файлах будут определяться асинхронные задачи.
Создание первой асинхронной задачи
Допустим, у нас есть приложение users внутри проекта Django, и мы хотим добавить асинхронную отправку email при регистрации пользователя. Для этого создадим файл tasks.py в директории приложения:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| # users/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from time import sleep
@shared_task
def send_welcome_email(user_id):
from users.models import User # Импортируем здесь для избежания циклических импортов
try:
user = User.objects.get(id=user_id)
# Имитация длительной операции
sleep(5)
send_mail(
'Добро пожаловать на наш сайт!',
f'Здравствуйте, {user.username}! Спасибо за регистрацию.',
'noreply@example.com',
[user.email],
fail_silently=False,
)
return f"Email успешно отправлен пользователю {user.email}"
except User.DoesNotExist:
return f"Пользователь с ID {user_id} не найден" |
|
Обратите внимание на декоратор @shared_task . Он используется вместо @app.task , когда задача определяется вне файла, где создан экземпляр Celery. Это позволяет приложениям Django быть переиспользуемыми, так как они не привязаны к конкретному экземпляру Celery. Теперь чтобы вызвать эту задачу в представлении Django, используем метод .delay() :
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| # users/views.py
from django.shortcuts import render, redirect
from django.contrib import messages
from .forms import RegistrationForm
from .tasks import send_welcome_email
def register(request):
if request.method == 'POST':
form = RegistrationForm(request.POST)
if form.is_valid():
user = form.save()
# Запускаем асинхронную задачу
send_welcome_email.delay(user.id)
messages.success(request, 'Регистрация успешна! Проверьте вашу почту.')
return redirect('home')
else:
form = RegistrationForm()
return render(request, 'users/register.html', {'form': form}) |
|
Метод .delay() — это сокращение для .apply_async() , который позволяет указать дополнительные параметры вызова:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
| # Вариант с apply_async для более гибкой настройки
send_welcome_email.apply_async(
args=[user.id],
countdown=10, # Задержка перед выполнением (сек)
expires=300, # Время, после которого задача аннулируется
retry=True, # Автоматический повтор при ошибке
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.5,
}
) |
|
Паттерны организации кода для асинхронных задач
При работе с Celery в Django проектах сформировалось несколько подходов к организации кода:
1. Модули задач в каждом приложении:
Самый распространённый подход — создание файла tasks.py в каждом приложении Django, где определяются задачи, связанные с функциональностью этого приложения. Это хорошо согласуется с принципом модульности Django.
2. Отдельное приложение для задач:
В крупных проектах со множеством задач иногда создают специальное приложение (например, tasks или async_jobs ), которое содержит все асинхронные задачи проекта, сгруппированные по категориям:
Python
Скопировано | 1
2
3
4
5
6
| tasks/
__init__.py
emails.py
reports.py
cleanup.py
... |
|
3. Декомпозиция по доменам:
В сложных проектах с сервис-ориентированной архитектурой задачи часто группируются по доменам бизнес-логики, независимо от структуры Django-приложений:
Python
Скопировано | 1
2
3
4
5
6
| # payments/services.py
from myproject.tasks.registry import register_task
@register_task
def process_payment(payment_id):
# Логика обработки платежа |
|
4. Композиция задач:
Для сложных многошаговых процессов эффективно использовать композицию задач через цепочки:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
| from celery import chain
# Создание последовательной цепочки задач
workflow = chain(
validate_order.s(order_id),
process_payment.s(),
send_confirmation.s()
)
# Запуск цепочки
result = workflow.delay() |
|
Независимо от выбранного подхода, рекомендуется следовать этим правилам:- Делайте задачи идемпотентными (повторный запуск не должен вызывать проблем).
- Избегайте передачи в задачи объектов моделей Django напрямую.
- Документируйте входные параметры и возвращаемые значения задач.
- Структурируйте логику так, чтобы она была тестируемой отдельно от Celery.
Интеграция с Django ORM: особенности и подводные камни
Работа с моделями Django в задачах Celery требует особого внимания. Хотя это выглядит простым, здесь скрывается ряд неочевидных проблем, которые могут привести к непредсказуемому поведению приложения.
Первая и самая распространённая ошибка — передача объектов моделей напрямую в задачи:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
| # НЕПРАВИЛЬНО:
@shared_task
def process_article(article):
article.publish() # Вызовет ошибку сериализации
# ПРАВИЛЬНО:
@shared_task
def process_article(article_id):
from blog.models import Article
article = Article.objects.get(id=article_id)
article.publish() |
|
Причина этой проблемы — в механизме сериализации. Celery сериализует аргументы задачи перед отправкой в очередь, но объекты Django не могут быть корректно сериализованы стандартными методами.
Второй важный момент — состояние базы данных может измениться между моментом постановки задачи в очередь и её выполнением:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| @shared_task
def notify_subscribers(article_id):
from blog.models import Article, Subscription
try:
article = Article.objects.get(id=article_id)
except Article.DoesNotExist:
return "Статья больше не существует"
# Используем актуальные данные из БД, а не устаревшие
subscriptions = Subscription.objects.filter(
category=article.category,
is_active=True # Проверяем статус на момент выполнения
)
# Остальной код... |
|
Третья особенность связана с транзакциями. Если вы создаёте задачу внутри транзакции Django, и задача должна работать с объектами, созданными в этой же транзакции, может возникнуть ситуация гонки — задача начнёт выполняться до завершения транзакции:
Python
Скопировано | 1
2
3
4
5
6
| # В представлении или сервисе
@transaction.atomic
def create_order(request):
order = Order.objects.create(user=request.user)
# Задача может запуститься ДО фиксации транзакции
process_order.delay(order.id) # Риск гонки |
|
Решение — использовать on_commit для отложенного вызова задач:
Python
Скопировано | 1
2
3
4
5
6
7
| from django.db import transaction
@transaction.atomic
def create_order(request):
order = Order.objects.create(user=request.user)
# Задача будет поставлена в очередь ТОЛЬКО после успешной фиксации
transaction.on_commit(lambda: process_order.delay(order.id)) |
|
Мониторинг выполнения задач
Отслеживание статуса и производительности асинхронных задач критически важно для выявления проблем и узких мест в работе приложения.
Базовый мониторинг статуса задач можно реализовать через API результатов Celery:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
| from celery.result import AsyncResult
def check_task_status(request, task_id):
task = AsyncResult(task_id)
context = {
'task_id': task_id,
'status': task.status,
'result': task.result if task.successful() else None,
}
return render(request, 'task_status.html', context) |
|
Для более продвинутого мониторинга в реальном времени можно использовать Flower — веб-инструмент для мониторинга и администрирования Celery. Устанавливается просто:
И запускается командой:
Bash
Скопировано | 1
| celery -A myproject flower --port=5555 |
|
Flower предоставляет веб-интерфейс с богатой информацией о воркерах, очередях, выполняющихся и завершённых задачах, с графиками и статистикой.
Конфигурация безопасного доступа к Celery в Django
Безопасность системы асинхронных задач требует внимания к нескольким аспектам:
1. Защита брокера сообщений — использование аутентификации и авторизации:
Python
Скопировано | 1
2
3
4
5
| # Для Redis
CELERY_BROKER_URL = 'redis://:сложный_пароль@redis-сервер:6379/0'
# Для RabbitMQ
CELERY_BROKER_URL = 'amqp://пользователь:пароль@rabbitmq-сервер:5672/vhost' |
|
2. Ограничение доступа к веб-интерфейсу Flower через аутентификацию:
Bash
Скопировано | 1
| celery -A myproject flower --port=5555 --basic-auth=user:pass |
|
3. Защита от инъекций кода через ограничение форматов сериализации:
Python
Скопировано | 1
| CELERY_ACCEPT_CONTENT = ['json'] # Исключаем pickle из соображений безопасности |
|
Управление зависимостями между асинхронными задачами
В сложных системах задачи редко существуют изолированно — часто одна задача зависит от результатов другой или должна выполняться после неё.
Celery предоставляет несколько паттернов для организации взаимодействия задач:
1. Цепочки (Chains) — последовательное выполнение задач:
Python
Скопировано | 1
2
3
4
5
6
7
| from celery import chain
result = chain(
extract_data.s(url), # Первая задача
transform_data.s(), # Вторая получит результат первой
load_data.s(target) # Третья получит результат второй
)() |
|
2. Группы (Groups) — параллельное выполнение набора задач:
Python
Скопировано | 1
2
3
4
| from celery import group
urls = ['http://example1.com', 'http://example2.com', ...]
result = group(fetch_url.s(url) for url in urls)() |
|
3. Хорды (Chords) — выполнение задачи после завершения группы задач:
Python
Скопировано | 1
2
3
4
5
6
| from celery import chord
result = chord(
(process_part.s(data_part) for data_part in partitioned_data),
aggregate_results.s() # Эта задача запустится после выполнения всех частей
)() |
|
4. Сигнатуры с обратными вызовами для более гибкого управления:
Python
Скопировано | 1
2
3
4
5
| task.apply_async(
args=[arg1, arg2],
link=success_callback.s(),
link_error=error_callback.s()
) |
|
Такие инструменты позволяют создавать сложные рабочие процессы с параллельными и последовательными шагами, условной логикой и обработкой ошибок.
Практические примеры применения
Теория без практики мертва, поэтому давайте рассмотрим конкретные сценарии, где Celery действительно спасает положение в Django-приложениях. Каждый из этих примеров — не просто надуманная ситуация, а реальная проблема, с которой сталкиваются разработчики.
Обработка тяжелых вычислений
Представьте, что ваше приложение выполняет сложные математические вычисления, например, анализ статистических данных или обработку финансовых моделей. Выполнение таких операций напрямую в представлении привело бы к "зависанию" запроса.
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| # tasks.py
@shared_task(bind=True, max_retries=3)
def analyze_dataset(self, dataset_id):
from analytics.models import Dataset, AnalysisResult
try:
dataset = Dataset.objects.get(id=dataset_id)
# Имитация ресурсоемких вычислений
results = {}
start_time = time.time()
# Допустим, здесь используется pandas, numpy или другая
# библиотека для анализа данных
import pandas as pd
df = pd.DataFrame(dataset.get_data())
# Несколько тяжелых операций
results['mean'] = df.mean().to_dict()
results['correlation'] = df.corr().to_dict()
results['regression'] = perform_regression_analysis(df)
# Сохраняем результаты
AnalysisResult.objects.create(
dataset=dataset,
results=results,
processing_time=time.time() - start_time
)
return f"Анализ датасета {dataset_id} завершен успешно"
except Dataset.DoesNotExist:
return f"Датасет {dataset_id} не найден"
except Exception as exc:
self.retry(exc=exc, countdown=60*5) # Повторная попытка через 5 минут |
|
В представлении вызов этой задачи будет мгновенным, позволяя пользователю продолжить работу:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
| @login_required
def analyze_view(request, dataset_id):
if request.method == 'POST':
task = analyze_dataset.delay(dataset_id)
request.session['analysis_task_id'] = task.id
messages.success(
request,
"Анализ запущен! Вы получите уведомление, когда он завершится."
)
return redirect('dataset_detail', dataset_id=dataset_id)
return render(request, 'analytics/analyze_form.html', {
'dataset_id': dataset_id
}) |
|
Отправка электронных писем
Мы уже видели пример с отправкой приветственного email, но давайте рассмотрим более сложный случай — массовую рассылку с персонализацией:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| @shared_task(rate_limit='100/m') # Ограничение: 100 писем в минуту
def send_newsletter(campaign_id, recipient_ids=None):
from marketing.models import Campaign, Subscriber
campaign = Campaign.objects.get(id=campaign_id)
# Если список получателей не указан, берем всех активных подписчиков
if not recipient_ids:
recipients = Subscriber.objects.filter(
is_active=True,
subscribed_to_campaigns=True
)
else:
recipients = Subscriber.objects.filter(id__in=recipient_ids)
# Запускаем индивидуальные задачи для каждого получателя
sent_count = 0
for recipient in recipients:
# Используем транзакционную задачу для каждого получателя
send_personalized_email.delay(
campaign_id,
recipient.id
)
sent_count += 1
return f"Запланировано отправок: {sent_count}"
@shared_task(bind=True, max_retries=3)
def send_personalized_email(self, campaign_id, subscriber_id):
from marketing.models import Campaign, Subscriber, EmailLog
try:
campaign = Campaign.objects.get(id=campaign_id)
subscriber = Subscriber.objects.get(id=subscriber_id)
# Персонализация шаблона
context = {
'first_name': subscriber.first_name,
'last_name': subscriber.last_name,
'unsubscribe_url': f"https://example.com/unsubscribe/{subscriber.unsubscribe_token}/"
}
# Рендеринг HTML и текстовой версии
html_content = render_to_string(
'emails/newsletter.html',
{**campaign.template_context, **context}
)
text_content = strip_tags(html_content)
# Отправка сообщения
msg = EmailMultiAlternatives(
campaign.subject,
text_content,
settings.DEFAULT_FROM_EMAIL,
[subscriber.email]
)
msg.attach_alternative(html_content, "text/html")
msg.send()
# Логируем успешную отправку
EmailLog.objects.create(
campaign=campaign,
subscriber=subscriber,
status='sent'
)
return f"Email для {subscriber.email} отправлен"
except (Campaign.DoesNotExist, Subscriber.DoesNotExist) as e:
# Логируем ошибку
return f"Ошибка: {str(e)}"
except Exception as exc:
# Если проблема с отправкой, попробуем повторить
try:
self.retry(exc=exc, countdown=60*30) # Повтор через 30 минут
except self.MaxRetriesExceededError:
# Логируем окончательную неудачу
EmailLog.objects.create(
campaign_id=campaign_id,
subscriber_id=subscriber_id,
status='failed',
error_message=str(exc)
)
return f"Не удалось отправить email после нескольких попыток: {str(exc)}" |
|
Такая структура позволяет:
1. Контролировать скорость отправки, чтобы не перегрузить почтовый сервер.
2. Персонализировать каждое письмо.
3. Повторять отправку при временных сбоях.
4. Вести подробную статистику доставки.
Работа с внешними API
Обращения к внешним API — ещё один отличный кандидат для Celery-задач, особенно если API имеет ограничения по частоте запросов:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| @shared_task(bind=True, max_retries=5, rate_limit='5/m')
def fetch_currency_rates(self):
"""Получает текущие курсы валют из внешнего API и сохраняет в базу"""
import requests
from finance.models import CurrencyRate
API_URL = "https://api.exchangerate-api.com/v4/latest/USD"
try:
response = requests.get(API_URL, timeout=10)
response.raise_for_status() # Вызовет исключение при HTTP-ошибке
data = response.json()
rates = data.get('rates', {})
# Сохраняем курсы в базу
batch = []
timestamp = timezone.now()
for currency, rate in rates.items():
batch.append(CurrencyRate(
currency_code=currency,
rate=Decimal(str(rate)), # Безопасное преобразование float в Decimal
source='exchangerate-api',
timestamp=timestamp
))
# Массовое создание записей
CurrencyRate.objects.bulk_create(batch)
return f"Обновлено {len(batch)} валютных курсов"
except requests.RequestException as exc:
# Экспоненциальная задержка между повторами
retry_backoff = 2 ** self.request.retries
self.retry(exc=exc, countdown=retry_backoff * 60) |
|
В этом примере мы реализовали:- Ограничение частоты запросов (
rate_limit='5/m' ).
- Экспоненциальную задержку между повторами.
- Обработку ошибок сети.
- Пакетное сохранение результатов.
Асинхронная обработка загружаемых пользователем файлов
Обработка загружаемых файлов часто требует значительных ресурсов, особенно если речь идёт о работе с изображениями или большими документами:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| @shared_task
def process_uploaded_image(image_id):
from gallery.models import Image
try:
img = Image.objects.get(id=image_id)
original_path = img.file.path
# Создаем миниатюры разных размеров
from PIL import Image as PILImage
image = PILImage.open(original_path)
# Создаем миниатюру
thumb_path = original_path.replace('.', '_thumb.')
image.copy()
image.thumbnail((200, 200))
image.save(thumb_path, quality=90)
# Создаем версию среднего размера
medium_path = original_path.replace('.', '_medium.')
image.copy()
image.thumbnail((800, 800))
image.save(medium_path, quality=90)
# Оптимизируем оригинал (например, сжимаем JPEG)
if original_path.lower().endswith('.jpg') or original_path.lower().endswith('.jpeg'):
optimized = PILImage.open(original_path)
optimized.save(original_path, quality=85, optimize=True)
# Обновляем метаданные в базе
img.width, img.height = image.size
img.processed = True
img.thumbnail_url = thumb_path.replace(settings.MEDIA_ROOT, settings.MEDIA_URL)
img.medium_url = medium_path.replace(settings.MEDIA_ROOT, settings.MEDIA_URL)
img.save()
return f"Изображение {image_id} обработано успешно"
except Image.DoesNotExist:
return f"Изображение {image_id} не найдено"
except Exception as e:
# Логируем ошибку и обновляем статус
img.processed = False
img.processing_error = str(e)
img.save()
raise |
|
Таким образом, пользователь может загрузить файл и увидеть мгновенный отклик системы, пока обработка происходит в фоне.
Продвинутые техники
После освоения базовых возможностей Celery пора погрузиться в более сложные и мощные техники, которые помогут вывести работу с асинхронными задачами на новый уровень. Эти продвинутые возможности позволяют создавать масштабируемые, надёжные и легко поддерживаемые системы.
Периодические задачи и Celery Beat
Для многих приложений критически важна возможность выполнять задачи по расписанию – будь то ночное резервное копирование, еженедельные отчёты или ежеминутное обновление кэша. Celery Beat – встроенный планировщик, который решает эту задачу.
Настройка периодических задач выполняется в файле settings.py :
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| # settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'ежедневный-отчет': {
'task': 'reports.tasks.generate_daily_report',
'schedule': crontab(hour=7, minute=30), # Каждый день в 7:30
},
'обновление-курсов-валют': {
'task': 'finance.tasks.update_exchange_rates',
'schedule': 3600.0, # Каждый час (в секундах)
'args': (["USD", "EUR", "GBP"],)
},
'очистка-сессий': {
'task': 'core.tasks.clean_expired_sessions',
'schedule': crontab(hour=3, minute=0, day_of_week='mon'), # По понедельникам в 3:00
}
} |
|
Для запуска планировщика нужно выполнить команду:
Bash
Скопировано | 1
| celery -A myproject beat |
|
В производственной среде Beat рекомендуется запускать вместе с воркерами или как отдельный сервис под управлением Supervisor:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
| [program:celerybeat]
command=/path/to/venv/bin/celery -A myproject beat --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat.log
autostart=true
autorestart=true |
|
Если вам нужна более гибкая настройка расписания, можно хранить его в базе данных с помощью django-celery-beat :
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| from django_celery_beat.models import PeriodicTask, IntervalSchedule
# Создаем интервал - каждые 10 минут
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.MINUTES,
)
# Создаем периодическую задачу
PeriodicTask.objects.create(
interval=schedule,
name='Синхронизация данных',
task='data.tasks.sync_external_data',
args=json.dumps(['arg1', 'arg2']),
kwargs=json.dumps({
'param1': 'value1',
}),
expires=datetime.utcnow() + timedelta(days=30),
) |
|
Это позволяет динамически менять расписание через API или административную панель Django без перезапуска сервера.
Обработка ошибок и ретраи
Надёжная система асинхронных задач требует хорошо продуманной стратегии обработки ошибок. Celery предлагает несколько механизмов:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| @shared_task(
bind=True,
max_retries=5,
default_retry_delay=60,
autoretry_for=(RequestException, ConnectionError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True
)
def process_payment(self, payment_id):
try:
# Основная логика обработки платежа
payment = Payment.objects.get(id=payment_id)
response = requests.post(
'https://payment-gateway.example/process',
json=payment.get_payload(),
timeout=15
)
if response.status_code >= 400:
# Пример специфичной обработки ошибок
error_data = response.json()
if error_data.get('error_code') == 'insufficient_funds':
# Не повторяем, если недостаточно средств
payment.status = 'failed'
payment.error_message = 'Недостаточно средств'
payment.save()
return False
elif error_data.get('error_code') == 'gateway_timeout':
# Специальная задержка для этого типа ошибок
raise self.retry(countdown=120, exc=Exception("Таймаут шлюза"))
else:
# Стандартная обработка других ошибок
raise self.retry(exc=Exception(f"Ошибка платежного шлюза: {error_data}"))
# Успешная обработка
payment.status = 'completed'
payment.save()
return True
except Payment.DoesNotExist:
# Логируем и не повторяем
logger.error(f"Payment {payment_id} not found")
return False |
|
В этом примере используются:
autoretry_for — автоматический повтор для указанных исключений.
retry_backoff — экспоненциальное увеличение задержки между попытками.
retry_backoff_max — максимальная задержка (10 минут).
retry_jitter — добавление случайного компонента к задержке.
Для более сложных сценариев можно использовать задачи-обработчики ошибок:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| @shared_task(bind=True)
def process_data(self, data_id):
try:
# Основная логика
return result
except Exception as exc:
# Запускаем задачу-обработчик ошибок
handle_task_error.delay(
task_id=self.request.id,
task_name=self.name,
args=self.request.args,
kwargs=self.request.kwargs,
exception=str(exc),
traceback=traceback.format_exc()
)
# Перебрасываем исключение дальше
raise |
|
Масштабирование воркеров
С ростом нагрузки на приложение необходимо эффективно масштабировать систему воркеров Celery. Существует несколько стратегий:
Вертикальное масштабирование — увеличение числа воркеров на одном сервере:
Bash
Скопировано | 1
2
| # Запуск 8 воркеров на одной машине
celery -A myproject worker --concurrency=8 --loglevel=INFO |
|
Горизонтальное масштабирование — добавление новых серверов с воркерами:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
| # Конфигурация для разных типов воркеров
CELERY_WORKER_POOL_RESTARTS = True # Автоматический перезапуск пула
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Контроль prefetch для баланса нагрузки
# Настройки для специализированных очередей
CELERY_TASK_ROUTES = {
'myapp.tasks.process_video': {'queue': 'media'},
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.generate_report': {'queue': 'reports'},
} |
|
Запуск специализированных воркеров:
Bash
Скопировано | 1
2
3
4
5
| # Воркер для обработки видео с повышенной конкурентностью
celery -A myproject worker -Q media --concurrency=4 --loglevel=INFO
# Воркер для отправки email с ограниченной конкурентностью
celery -A myproject worker -Q emails --concurrency=2 --loglevel=INFO |
|
Автоматическое масштабирование — с помощью флага --autoscale :
Bash
Скопировано | 1
2
| # Автомасштабирование от 5 до 20 воркеров в зависимости от нагрузки
celery -A myproject worker --autoscale=20,5 --loglevel=INFO |
|
В контейнерной среде (Docker, Kubernetes) можно настроить автоматическое масштабирование числа подов с воркерами на основе метрик очередей.
Профилирование и оптимизация производительности
Для высоконагруженных систем критично понимать, где скрываются узкие места в асинхронных задачах.
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| @shared_task
def optimized_task(large_dataset_id):
from django.db import connection
# Отключаем автокоммит для пакетных операций
old_autocommit = connection.autocommit
connection.autocommit = False
try:
dataset = LargeDataset.objects.get(id=large_dataset_id)
items = dataset.items.all()
# Обрабатываем данные пакетами для экономии памяти
batch_size = 1000
for i in range(0, items.count(), batch_size):
batch = items[i:i+batch_size]
# Производим необходимые операции с пакетом
process_batch(batch)
# Выполняем промежуточный коммит
connection.commit()
return "Обработка завершена успешно"
finally:
# Восстанавливаем исходное значение автокоммита
connection.autocommit = old_autocommit |
|
Для профилирования задач можно использовать декораторы:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| def profile_task(func):
@wraps(func)
def wrapper(*args, **kwargs):
import cProfile, pstats, io
from pstats import SortKey
profile = cProfile.Profile()
profile.enable()
try:
result = func(*args, **kwargs)
return result
finally:
profile.disable()
# Выводим статистику профилирования
s = io.StringIO()
ps = pstats.Stats(profile, stream=s).sort_stats(SortKey.CUMULATIVE)
ps.print_stats(20) # Топ-20 самых затратных операций
# Сохраняем результаты
TaskProfile.objects.create(
task_name=func.__name__,
args=str(args),
kwargs=str(kwargs),
profile_data=s.getvalue()
)
return wrapper |
|
Организация тестирования асинхронных операций
Тестирование асинхронных задач связано с рядом сложностей, ведь они выполняются отдельно от основного потока и часто зависят от внешних систем. В Django можно реализовать эффективное тестирование Celery-задач несколькими способами:
Использование CELERY_TASK_ALWAYS_EAGER
Самый простой подход — переключить Celery в синхронный режим в тестовом окружении:
Python
Скопировано | 1
2
3
| # settings/test.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True # Пробрасывать исключения |
|
Это позволяет тестировать задачи как обычные функции:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
| from django.test import TestCase
from myapp.tasks import process_upload
class TaskTestCase(TestCase):
def test_process_upload(self):
# Задача выполнится синхронно
result = process_upload.delay(file_id=1)
self.assertTrue(result.successful())
self.assertEqual(result.result, "ожидаемый_результат") |
|
Изоляция задач с моками
Для более точного тестирования можно использовать патчинг и моки:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| from unittest.mock import patch
from django.test import TestCase
from myapp.tasks import send_notification
from myapp.models import User
class NotificationTaskTest(TestCase):
def setUp(self):
self.user = User.objects.create(email="test@example.com")
@patch('myapp.tasks.send_mail')
def test_send_notification(self, mock_send_mail):
# Вызываем задачу напрямую
send_notification(self.user.id)
# Проверяем, что send_mail был вызван с правильными аргументами
mock_send_mail.assert_called_once()
args, kwargs = mock_send_mail.call_args
self.assertEqual(kwargs['recipient_list'], [self.user.email]) |
|
Интеграционное тестирование с реальным Celery
Для полноценного тестирования можно запустить реальные воркеры в тестовой среде:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import time
from django.test import TransactionTestCase
from celery.result import AsyncResult
from myapp.tasks import complex_task
class CeleryIntegrationTest(TransactionTestCase):
def test_complex_task_integration(self):
# Запускаем реальную асинхронную задачу
task = complex_task.delay(param1="test", param2=123)
# Дожидаемся результата с таймаутом
timeout = 10
start_time = time.time()
while time.time() - start_time < timeout:
if AsyncResult(task.id).ready():
break
time.sleep(0.1)
# Проверяем результат
result = AsyncResult(task.id)
self.assertTrue(result.ready())
self.assertTrue(result.successful())
self.assertEqual(result.result, "ожидаемый_результат") |
|
Работа с очередями приоритетов
В реальных системах не все задачи одинаково важны. Celery позволяет организовать очереди с приоритетами:
Python
Скопировано | 1
2
3
4
5
6
| # settings.py
CELERY_TASK_ROUTES = {
'myapp.tasks.critical_notification': {'queue': 'high_priority'},
'myapp.tasks.daily_report': {'queue': 'low_priority'},
'myapp.tasks.*': {'queue': 'default'},
} |
|
Запуск воркеров с учётом приоритета:
Bash
Скопировано | 1
2
3
4
5
| # Высокоприоритетный воркер с большим числом процессов
celery -A myproject worker -Q high_priority,default --prefetch-multiplier=1 --concurrency=10
# Низкоприоритетный воркер
celery -A myproject worker -Q low_priority --prefetch-multiplier=1 --concurrency=3 |
|
Для Redis можно настроить более тонкую систему приоритетов:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| class TaskPriority:
HIGH = 0
MEDIUM = 3
LOW = 6
BACKGROUND = 9
@shared_task(bind=True)
def prioritized_task(self, importance, data):
# Логика задачи
pass
# Вызов с указанием приоритета
prioritized_task.apply_async(
args=[TaskPriority.HIGH, data],
priority=TaskPriority.HIGH
) |
|
Хранение и анализ метрик производительности
Для профессионального мониторинга недостаточно базовых инструментов. Комплексное решение включает:
1. Сбор метрик производительности:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| @shared_task(bind=True)
def instrumented_task(self, *args, **kwargs):
import time
from prometheus_client import Histogram
TASK_DURATION = Histogram(
'task_duration_seconds',
'Task duration in seconds',
['task_name']
)
start_time = time.time()
try:
result = actual_task_logic(*args, **kwargs)
status = 'success'
except Exception as e:
status = 'error'
raise
finally:
duration = time.time() - start_time
TASK_DURATION.labels(self.name).observe(duration)
# Сохраняем детальную информацию в БД
TaskExecution.objects.create(
task_id=self.request.id,
task_name=self.name,
args=str(args),
kwargs=str(kwargs),
duration=duration,
status=status,
worker=self.request.hostname,
memory_usage=get_current_memory_usage()
)
return result |
|
2. Визуализация и анализ статистики:
Создайте маршрут в Django для просмотра статистики:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| def task_performance_dashboard(request):
# Агрегированная статистика по задачам
task_stats = TaskExecution.objects.values('task_name').annotate(
avg_duration=Avg('duration'),
max_duration=Max('duration'),
min_duration=Min('duration'),
success_rate=100 * Count('id', filter=Q(status='success')) / Count('id'),
count=Count('id')
).order_by('-avg_duration')
# График выполнения по времени
timeline_data = TaskExecution.objects.filter(
created_at__gte=timezone.now() - timezone.timedelta(days=7)
).values('created_at__date').annotate(
count=Count('id'),
success=Count('id', filter=Q(status='success')),
failed=Count('id', filter=Q(status='error'))
).order_by('created_at__date')
return render(request, 'tasks/dashboard.html', {
'task_stats': task_stats,
'timeline_data': timeline_data
}) |
|
Стратегии обновления Celery в работающих проектах
Обновление Celery в production может быть критичной операцией. Основные стратегии:
Поэтапное обновление:
1. Обновите только клиентскую часть Celery (в Django).
2. Постепенно обновляйте воркеры, начиная с одного.
3. Мониторьте поведение обновлённых воркеров.
4. Постепенно замените все воркеры.
Параллельное развёртывание:
1. Запустите новые воркеры с обновлённой версией на отдельной очереди.
2. Переключите часть задач на новую очередь.
3. Если всё работает стабильно, увеличивайте долю задач.
4. Полностью переключитесь на новую версию, когда убедитесь в её стабильности.
Использование Celery версии-кандидата:
Python
Скопировано | 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| # Тестирование новой версии параллельно с текущей
from celery import Celery
# Существующее приложение
current_app = Celery('current_app', broker='redis://localhost:6379/0')
# Тестовое приложение с новой версией
new_app = Celery('new_app', broker='redis://localhost:6379/1')
@current_app.task
def current_version_task():
pass
@new_app.task
def new_version_task():
pass
# Запуск задачи в обеих версиях для сравнения
def compare_versions():
current_result = current_version_task.delay()
new_result = new_version_task.delay()
# Сравнение результатов и поведения |
|
Независимо от выбранной стратегии, крайне важно тщательно тестировать обновления в среде, максимально приближенной к production, и иметь план отката в случае проблем.
Путь к эффективной асинхронной архитектуре
Один из ключевых принципов — правильное разделение ответственности. Не стоит помещать всю логику в воркеры Celery только потому, что это возможно. Определите, какие операции действительно выигрывают от асинхронного выполнения: тяжёлые вычисления, внешние API-вызовы, операции с файловой системой. Простые операции с базой данных обычно эффективнее выполнять синхронно.
Мониторинг — ещё один критический аспект. Система без мониторинга подобна самолёту без приборов. Минимальный набор метрик должен включать:- Количество успешных и неудачных задач.
- Время выполнения задач (p50, p95, p99 перцентили).
- Длина очередей.
- Загрузка воркеров.
- Потребление памяти.
Такой мониторинг позволит не только реагировать на проблемы, но и предвидеть их до того, как они повлияют на пользователей.
В производственной среде стоит тщательно настроить тайм-ауты для всех внешних операций. Без этого задача может "зависнуть" навсегда, блокируя воркер и постепенно снижая производительность всей системы. Добавляйте тайм-ауты для HTTP-запросов, операций с базами данных и другими внешними системами.
Грамотное управление ресурсами — ещё один секрет эффективной системы. Часто разработчики увлекаются созданием множества мелких задач, что приводит к перегрузке брокера сообщений. Объединяйте мелкие операции в пакеты, где это возможно, и используйте очереди с приоритетами для критически важных задач.
При проектировании задач помните о идемпотентности — возможности безопасного повторного выполнения задачи без побочных эффектов. Идемпотентные задачи упрощают обработку сбоев и развёртывание обновлений.
Наконец, инвестируйте время в автоматизацию развёртывания и управления инфраструктурой Celery. Используйте контейнеризацию и оркестрацию (Docker, Kubernetes) для гибкого масштабирования и управления воркерами.
Django и асинхронные данные (websockets?) Есть задача - веб приложение на django, которое отображает данные с датчиков. Данные пишутся в... Как сделать автозапуск кода каждые 24 часа на celery? Извините за столь дерзкую просьбу, но мне нужен авто запуск кода каждые 24 часа на celery. Я бы и... Объясните разницу между twisted и celery Добрый день!
Друзья, если возможно, объясните, пожалуйста, разницу между этими двумя фреймворками.... Подскажите литературу по Celery Добрый день, друзья!
Посоветуйте, пожалуйста, почитать хорошие материалы (книги, туториалы и т.п)... Djang + Celery + djcelery не показываются воркеры В админке джанги не показываются таски и воркеры. Хотя, таски обрабатываются и ставятся, как... Как узнать выполняет ли celery асинхронно функцию? Прилагаю настройки celery
# settings.py
# CELERY STUFF
BROKER_URL = 'redis://localhost:6379'... Celery error Добрый день!
Реализовал связку tornado + celery.
Все работает, запускается. Пока в worker у... Множественная обработка GET-запросов через celery Добрый день!
У меня есть задача - я обращаюсь к стороннему API, к которому делаю запрос и... Celery and RabbitMQ Всем привет. У меня есть задача, которая должна обойти весь индекс эластика и запустить над каждым... Как создать задачу по событию в Celery? Здравствуйте. Для проекта интернет-магазина возникла задача очистки товаров в корзине, если заказ... Разобраться с логикой приложения (Celery) Всем привет! Использую фреймворк Django + Celery. У меня есть список товаров, которые обновляются... Как запустить celery таску в pytest классе Есть условная celery таска
class CreateNotifications(Task):
name = 'create-notifications'
...
|