Форум программистов, компьютерный форум, киберфорум
py-thonny
Войти
Регистрация
Восстановить пароль
Блоги Сообщество Поиск Заказать работу  

Асинхронные операции в Django с Celery

Запись от py-thonny размещена 05.04.2025 в 17:07
Показов 1805 Комментарии 0

Нажмите на изображение для увеличения
Название: 1e9e7d70-34ab-4e15-ae94-14d73547829e.jpg
Просмотров: 35
Размер:	105.0 Кб
ID:	10536
Разработчики Django часто сталкиваются с проблемой, когда пользователь нажимает кнопку отправки формы и... ждёт. Секунды растягиваются в минуты, терпение иссякает, а интерфейс приложения замирает. Причина этой неприятной ситуации кроется в природе синхронных операций, которые блокируют основной поток исполнения, пока задача не будет полностью завершена. Обычный Django-проект выполняет все операции последовательно — это означает, что если, например, функция отправки электронной почты занимает 5 секунд, то пользователь вынужден ждать весь этот промежуток времени, прежде чем увидит результат своих действий. В небольших проектах с малым потоком пользователей эта задержка может быть незаметной, но когда приложение масштабируется, проблема становится критичной.

Здесь и может помочь Celery — распределённая очередь задач, которая позволяет выполнять операции асинхронно, вне основного потока Django. Это своего рода рабочий который берёт сложные и долгие задачи и выполняет их параллельно, освобождая основное приложение для обработки новых запросов.Celery решает целый спектр задач:
  • Отправка писем и сообщений.
  • Обработка и трансформация изображений.
  • Анализ текста и проверка данных.
  • Работа с внешними API и веб-сервисами.
  • Обработка и анализ больших массивов данных.
  • Генерация отчётов и документов.

Что особенно ценно, Celery позволяет не только выполнять операции в фоновом режиме, но и планировать их на определённое время, создавая периодические задачи. Конечно, Celery не является единственным решением для асинхронных операций в Django. Среди альтернатив можно выделить:

Huey — простая и лёгкая библиотека для организации очередей задач, которая отлично работает на Windows (в отличие от Celery, которая с версии 4 потеряла официальную поддержку Windows).
Django-RQ — очередь задач на основе Redis, предлагающая минималистичный интерфейс и простту настройки.
Dramatiq — относительно новое решение, ориентированное на надёжность и производительность.
Django Channels — хотя это решение создано в первую очередь для WebSockets, оно может использоваться и для асинхронной обработки задач.

Выбор конкретного инструмента зависит от требований проекта, но Celery зарекомендовал себя как зрелое и проверенное временем решение с богатым функционалом и хорошей интеграцией с Django. Особенно полезно это сочетание для проектов, где требуется обрабатывать ресурсоёмкие задачи без ущерба для пользовательского опыта.

Основы работы Celery



Celery представляет собой распределенную систему для обработки больших объемов сообщений, способную обрабатывать миллионы задач в минуту с минимальными задержками.

Архитектура Celery



Архитектура Celery состоит из 4 основных компонентов:

1. Производители задач (Producers) — это части кода, которые создают задачи и отправляют их в очередь. В контексте Django такими производителями обычно выступают представления (views) или другие компоненты приложения.
2. Брокер сообщений (Message Broker) — промежуточное хранилище, которое принимает задачи от производителей и передает их исполнителям. Брокер действует как посредник между вашим Django-приложением и процессами Celery.
3. Исполнители задач (Workers) — процессы, которые постоянно проверяют очереди задач и выполняют поступающие задания. Они работают независимо от основного приложения, могут быть запущены на том же сервере или распределены по нескольким машинам.
4. База результатов (Results Backend) — опциональный компонент, который хранит информацию о выполненных задачах и их результатах. Это особенно полезно, если ваше приложение должно отслеживать статус задач или использовать их результаты.

Такая архитектура обеспечивает высокую степень масштабируемости и отказоустойчивости. Вы можете добавлять новых исполнителей по мере роста нагрузки, а при выходе из строя одного из них задачи перераспределяются между остальными.

Брокеры сообщений и их выбор



Брокер сообщений играет ключевую роль в работе Celery, поэтому его выбор критически важен. Наиболее популярные варианты:

Redis — быстрый хранилище данных в памяти, который часто используется как брокер для Celery благодаря своей скорости и простоте настройки. Redis может выступать одновременно как брокер сообщений и как хранилище результатов, что упрощает инфраструктуру проекта.

Python Скопировано
1
2
3
# Пример настройки Redis как брокера и хранилища результатов
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
RabbitMQ — специализированный брокер сообщений, реализующий протокол AMQP. Считается наиболее надежным выбором для производственной среды, особенно при высоких нагрузках. RabbitMQ обеспечивает гарантированную доставку сообщений даже в случае сбоев.

Python Скопировано
1
2
# Пример настройки RabbitMQ
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
Amazon SQS, Azure Service Bus или Apache Kafka — облачные сервисы и решения корпоративного уровня, которые могут использоваться в качестве брокеров для Celery в крупных распределенных системах. При выборе брокера стоит учитывать:
  • Объем и характер задач.
  • Требования к надежности доставки сообщений.
  • Инфраструктурные ограничения.
  • Опыт команды с конкретными технологиями.

Для большинства проектов на Django начальный выбор часто падает на Redis из-за его простоты и универсальности.

Установка и базовая настройка



Установка Celery достаточно проста и выполняется через pip:

Bash Скопировано
1
pip install celery
Для работы с Redis дополнительно потребуется установить клиентскую библиотеку:

Bash Скопировано
1
pip install redis
После установки пакетов необходимо настроить экземпляр Celery в вашем Django-проекте. Обычно это делается путем создания файла celery.py в директории проекта, рядом с settings.py:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# myproject/celery.py
import os
from celery import Celery
 
# Устанавливаем переменную окружения для настроек Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
 
# Создаем экземпляр Celery
app = Celery('myproject')
 
# Загружаем настройки из settings.py (с префиксом CELERY_)
app.config_from_object('django.conf:settings', namespace='CELERY')
 
# Автоматически обнаруживаем задачи в приложениях Django
app.autodiscover_tasks()
Затем необходимо обновить файл __init__.py вашего проекта, чтобы гарантировать, что приложение Celery загружается при запуске Django:

Python Скопировано
1
2
3
4
# myproject/__init__.py
from .celery import app as celery_app
 
__all__ = ('celery_app',)

Сериализация данных в Celery: возможности и ограничения



Когда вы передаете задачу в Celery, данные должны быть сериализованы для передачи через брокер сообщений. По умолчанию Celery использует JSON для сериализации, но также поддерживает другие форматы:
JSON — наиболее универсальный формат, но имеет ограничения по типам данных,
Pickle — поддерживает большинство Python-объектов, но менее безопасен,
YAML — альтернатива JSON с более широкими возможностями,
MessagePack — бинарный формат, более компактный, чем JSON.

Выбор сериализатора настраивается в конфигурации Celery:

Python Скопировано
1
2
3
4
# Пример настройки сериализации в settings.py
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
При использовании Celery важно помнить, что не все типы данных могут быть корректно сериализованы. Особую осторожность следует проявлять с:
  • Объектами Django ORM (модели, QuerySets).
  • Сложными вложенными структурами.
  • Функциями и методами.
  • Файловыми объектами и дескрипторами.

Чтобы избежать проблем с сериализацией, рекомендуется передавать в задачи только простые типы данных: числа, строки, списки, словари и т.п. Если необходимо работать с моделями Django, лучше передавать их идентификаторы, а не сами объекты.

Python Скопировано
1
2
3
4
5
6
7
8
9
# Неправильно: передача объекта модели
def process_user_task(user):
    # ...
 
# Правильно: передача идентификатора
def process_user_task(user_id):
    from myapp.models import User
    user = User.objects.get(id=user_id)
    # ...

Режимы выполнения задач: eager и асинхронный



Celery предлагает два основных режима выполнения задач, которые определяют поведение системы при обработке заданий:

Асинхронный режим — стандартный способ работы Celery в производственной среде. Задачи отправляются в очередь через брокер сообщений и выполняются отдельными процессами-воркерами. Этот режим обеспечивает подлинную асинхронность и масштабируемость:

Python Скопировано
1
2
3
4
5
# Отправка задачи в асинхронном режиме
from myapp.tasks import process_data
result = process_data.delay(arg1, arg2)
# или с дополнительными параметрами
result = process_data.apply_async(args=[arg1, arg2], countdown=10)
Eager режим (нетерпеливый) — в этом режиме задачи выполняются синхронно в текущем процессе, без использования брокера и воркеров. Особенно полезен для тестирования и отладки:

Python Скопировано
1
2
3
# Настройка eager режима в settings.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True  # Распространение исключений
Переключение между режимами может осуществляться глобально через настройки или локально для отдельных задач:

Python Скопировано
1
2
3
4
# Временное включение eager режима для конкретного вызова
with app.conf.eager_propagates_exceptions.override(True):
    with app.conf.task_always_eager.override(True):
        task.delay()
При разработке приложения типичной практикой является использование eager режима в среде разработки и тестирования, переключаясь на асинхронный режим в продакшн-окружении.

Стратегии ретрая и обработки ошибок в брокерах сообщений



Одно из ключевых преимуществ Celery — возможность настраивать механизмы повторных попыток (retry) при возникновении ошибок. В распределенных системах отказы неизбежны, и грамотная стратегия их обработки критически важна.
Базовая стратегия повторных попыток настраивается при определении задачи:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
import logging
 
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_payment(self, payment_id):
    try:
        # Код обработки платежа
        payment_gateway.process(payment_id)
    except PaymentGatewayError as exc:
        try:
            # Логирование ошибки
            logging.error(f"Payment error: {exc}")
            # Повторная попытка через 60 секунд
            raise self.retry(exc=exc)
        except MaxRetriesExceededError:
            # Действия после исчерпания попыток
            notify_admin.delay(f"Payment {payment_id} failed after retries")
            raise
В этом примере:
bind=True — делает доступным объект задачи через self,
max_retries=3 — ограничивает количество повторных попыток,
default_retry_delay=60 — устанавливает задержку между попытками в секундах.

Для более сложных сценариев Celery поддерживает экспоненциальную задержку между попытками:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
@shared_task(bind=True, max_retries=5)
def fetch_api_data(self, endpoint):
    try:
        response = requests.get(endpoint, timeout=5)
        response.raise_for_status()
        return response.json()
    except (requests.RequestException, ValueError) as exc:
        # Экспоненциальная задержка: 2^retry * базовая_задержка
        retry_backoff = 2 ** self.request.retries * 30
        # Случайный "джиттер" для предотвращения "эффекта громады"
        retry_jitter = random.uniform(0, 10)
        retry_delay = retry_backoff + retry_jitter
        raise self.retry(exc=exc, countdown=int(retry_delay))
Такой подход позволяет избежать ситуации, когда после сбоя все воркеры одновременно пытаются выполнить повторы, создавая новую волну нагрузки на систему.

Настройка запуска Celery в production-окружениях



В производственной среде правильная настройка Celery критически важна для обеспечения стабильности и эффективности работы.

Запуск воркеров с Supervisor или Systemd

Для надежного управления процессами Celery рекомендуется использовать менеджеры процессов, такие как Supervisor или Systemd:

Пример конфигурации Supervisor (/etc/supervisor/conf.d/celery.conf):

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
[program:celery]
command=/path/to/venv/bin/celery -A myproject worker --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600
Настройка параллелизма и конкурентности

Celery предлагает различные модели конкурентности:
  1. prefork (процессы, по умолчанию).
  2. eventlet (сопрограммы).
  3. gevent (сопрограммы).
  4. solo (один процесс).

Выбор зависит от характера ваших задач:

Bash Скопировано
1
2
3
4
5
# Запуск с 8 процессами (prefork)
celery -A myproject worker --concurrency=8 --loglevel=INFO
 
# Запуск с eventlet (для задач с I/O-bound операциями)
celery -A myproject worker --concurrency=100 --pool=eventlet --loglevel=INFO
Управление ресурсами

Для предотвращения перегрузки системы можно настроить ограничения по памяти и времени выполнения:

Python Скопировано
1
2
3
# Ограничения в настройках Celery
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100  # Рестарт воркера после 100 задач
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 200000  # 200MB
Безопасность

В производственной среде важно обеспечить безопасность:

Python Скопировано
1
2
3
4
5
# Защита от подделки сообщений
CELERY_BROKER_URL = 'redis://:strong_password@redis.example.com:6379/0'
 
# Ограничение типов сериализации
CELERY_ACCEPT_CONTENT = ['json']  # Отключаем потенциально опасный pickle
Мониторинг

Для отслеживания состояния системы в production настройте интеграцию с системами мониторинга:

Python Скопировано
1
2
3
# Настройка интеграции с Prometheus
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True
Также рекомендуется использовать инструменты визуализации, такие как Flower или Celery Exporter, для наблюдения за работой очередей и исполнителей в реальном времени.

От Celery к RabbitMQ или от RabbitMQ к Celery ?
Всем день добрый. Отработать данные чисто в Rabbit получилось без труда. Но, разбирая работу...

Django и celery shared_task
Как запустить shared_task в асинхронном режиме? Есть у меня celery таска from celery import...

Celery + Django запуск задач по расписанию
Юзаю чистый celery (без django-celery-beat). Конфиг: app.conf.beat_schedule = { ...

Django+celery получение результатов фоновых задач
Добрый день! Из views.py запускаю ресурсозатратные отчеты, результаты отображаю в html или...


Интеграция с Django



После знакомства с основами Celery пришло время интегрировать его в Django-проект. Интеграция Celery с Django не представляет особой сложности, но требует внимания к деталям и понимания принципов взаимодействия этих технологий.

Настройка Django-проекта для работы с Celery



Первый шаг — добавление необходимых настроек в файл settings.py вашего проекта. После установки Celery и выбранного брокера сообщений (в нашем примере будет использоваться Redis) нужно задать ключевые параметры:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
[H2]settings.py[/H2]
 
# Настройки Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # URL брокера сообщений
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # Хранилище результатов
 
# Дополнительные настройки (опционально)
CELERY_ACCEPT_CONTENT = ['json']  # Разрешенные форматы сериализации
CELERY_TASK_SERIALIZER = 'json'  # Формат сериализации задач
CELERY_RESULT_SERIALIZER = 'json'  # Формат сериализации результатов
CELERY_TIMEZONE = 'Europe/Moscow'  # Временная зона для планировщика задач
Следующий этап — создание файла celery.py в корневой директории проекта (рядом с файлом settings.py). Этот файл содержит конфигурацию Celery-приложения:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# myproject/celery.py
import os
from celery import Celery
 
# Устанавливаем переменную окружения с настройками Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
 
# Создаем экземпляр приложения Celery
app = Celery('myproject')
 
# Загружаем настройки из Django settings (с префиксом CELERY_)
app.config_from_object('django.conf:settings', namespace='CELERY')
 
# Автоматически находим задачи в приложениях Django
app.autodiscover_tasks()
 
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
Чтобы убедиться, что Celery загружается при запуске Django, нужно модифицировать файл __init__.py проекта:

Python Скопировано
1
2
3
4
5
6
7
# myproject/__init__.py
from __future__ import absolute_import, unicode_literals
 
# Импортируем Celery-приложение
from .celery import app as celery_app
 
__all__ = ('celery_app',)
Теперь Django будет автоматически искать файлы tasks.py в каждом зарегистрированном приложении. Именно в этих файлах будут определяться асинхронные задачи.

Создание первой асинхронной задачи



Допустим, у нас есть приложение users внутри проекта Django, и мы хотим добавить асинхронную отправку email при регистрации пользователя. Для этого создадим файл tasks.py в директории приложения:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# users/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from time import sleep
 
@shared_task
def send_welcome_email(user_id):
    from users.models import User  # Импортируем здесь для избежания циклических импортов
    
    try:
        user = User.objects.get(id=user_id)
        # Имитация длительной операции
        sleep(5)
        
        send_mail(
            'Добро пожаловать на наш сайт!',
            f'Здравствуйте, {user.username}! Спасибо за регистрацию.',
            'noreply@example.com',
            [user.email],
            fail_silently=False,
        )
        return f"Email успешно отправлен пользователю {user.email}"
    except User.DoesNotExist:
        return f"Пользователь с ID {user_id} не найден"
Обратите внимание на декоратор @shared_task. Он используется вместо @app.task, когда задача определяется вне файла, где создан экземпляр Celery. Это позволяет приложениям Django быть переиспользуемыми, так как они не привязаны к конкретному экземпляру Celery. Теперь чтобы вызвать эту задачу в представлении Django, используем метод .delay():

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# users/views.py
from django.shortcuts import render, redirect
from django.contrib import messages
from .forms import RegistrationForm
from .tasks import send_welcome_email
 
def register(request):
    if request.method == 'POST':
        form = RegistrationForm(request.POST)
        if form.is_valid():
            user = form.save()
            # Запускаем асинхронную задачу
            send_welcome_email.delay(user.id)
            messages.success(request, 'Регистрация успешна! Проверьте вашу почту.')
            return redirect('home')
    else:
        form = RegistrationForm()
    return render(request, 'users/register.html', {'form': form})
Метод .delay() — это сокращение для .apply_async(), который позволяет указать дополнительные параметры вызова:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
# Вариант с apply_async для более гибкой настройки
send_welcome_email.apply_async(
    args=[user.id],
    countdown=10,  # Задержка перед выполнением (сек)
    expires=300,   # Время, после которого задача аннулируется
    retry=True,    # Автоматический повтор при ошибке
    retry_policy={
        'max_retries': 3,
        'interval_start': 0,
        'interval_step': 0.2,
        'interval_max': 0.5,
    }
)

Паттерны организации кода для асинхронных задач



При работе с Celery в Django проектах сформировалось несколько подходов к организации кода:

1. Модули задач в каждом приложении:
Самый распространённый подход — создание файла tasks.py в каждом приложении Django, где определяются задачи, связанные с функциональностью этого приложения. Это хорошо согласуется с принципом модульности Django.

2. Отдельное приложение для задач:
В крупных проектах со множеством задач иногда создают специальное приложение (например, tasks или async_jobs), которое содержит все асинхронные задачи проекта, сгруппированные по категориям:

Python Скопировано
1
2
3
4
5
6
   tasks/
     __init__.py
     emails.py
     reports.py
     cleanup.py
     ...
3. Декомпозиция по доменам:
В сложных проектах с сервис-ориентированной архитектурой задачи часто группируются по доменам бизнес-логики, независимо от структуры Django-приложений:

Python Скопировано
1
2
3
4
5
6
   # payments/services.py
   from myproject.tasks.registry import register_task
   
   @register_task
   def process_payment(payment_id):
       # Логика обработки платежа
4. Композиция задач:
Для сложных многошаговых процессов эффективно использовать композицию задач через цепочки:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
   from celery import chain
   
   # Создание последовательной цепочки задач
   workflow = chain(
       validate_order.s(order_id),
       process_payment.s(),
       send_confirmation.s()
   )
   
   # Запуск цепочки
   result = workflow.delay()
Независимо от выбранного подхода, рекомендуется следовать этим правилам:
  • Делайте задачи идемпотентными (повторный запуск не должен вызывать проблем).
  • Избегайте передачи в задачи объектов моделей Django напрямую.
  • Документируйте входные параметры и возвращаемые значения задач.
  • Структурируйте логику так, чтобы она была тестируемой отдельно от Celery.

Интеграция с Django ORM: особенности и подводные камни



Работа с моделями Django в задачах Celery требует особого внимания. Хотя это выглядит простым, здесь скрывается ряд неочевидных проблем, которые могут привести к непредсказуемому поведению приложения.
Первая и самая распространённая ошибка — передача объектов моделей напрямую в задачи:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
# НЕПРАВИЛЬНО:
@shared_task
def process_article(article):
    article.publish()  # Вызовет ошибку сериализации
 
# ПРАВИЛЬНО:
@shared_task
def process_article(article_id):
    from blog.models import Article
    article = Article.objects.get(id=article_id)
    article.publish()
Причина этой проблемы — в механизме сериализации. Celery сериализует аргументы задачи перед отправкой в очередь, но объекты Django не могут быть корректно сериализованы стандартными методами.
Второй важный момент — состояние базы данных может измениться между моментом постановки задачи в очередь и её выполнением:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@shared_task
def notify_subscribers(article_id):
    from blog.models import Article, Subscription
    
    try:
        article = Article.objects.get(id=article_id)
    except Article.DoesNotExist:
        return "Статья больше не существует"
    
    # Используем актуальные данные из БД, а не устаревшие
    subscriptions = Subscription.objects.filter(
        category=article.category,
        is_active=True  # Проверяем статус на момент выполнения
    )
    
    # Остальной код...
Третья особенность связана с транзакциями. Если вы создаёте задачу внутри транзакции Django, и задача должна работать с объектами, созданными в этой же транзакции, может возникнуть ситуация гонки — задача начнёт выполняться до завершения транзакции:

Python Скопировано
1
2
3
4
5
6
# В представлении или сервисе
@transaction.atomic
def create_order(request):
    order = Order.objects.create(user=request.user)
    # Задача может запуститься ДО фиксации транзакции
    process_order.delay(order.id)  # Риск гонки
Решение — использовать on_commit для отложенного вызова задач:

Python Скопировано
1
2
3
4
5
6
7
from django.db import transaction
 
@transaction.atomic
def create_order(request):
    order = Order.objects.create(user=request.user)
    # Задача будет поставлена в очередь ТОЛЬКО после успешной фиксации
    transaction.on_commit(lambda: process_order.delay(order.id))

Мониторинг выполнения задач



Отслеживание статуса и производительности асинхронных задач критически важно для выявления проблем и узких мест в работе приложения.
Базовый мониторинг статуса задач можно реализовать через API результатов Celery:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
from celery.result import AsyncResult
 
def check_task_status(request, task_id):
    task = AsyncResult(task_id)
    
    context = {
        'task_id': task_id,
        'status': task.status,
        'result': task.result if task.successful() else None,
    }
    
    return render(request, 'task_status.html', context)
Для более продвинутого мониторинга в реальном времени можно использовать Flower — веб-инструмент для мониторинга и администрирования Celery. Устанавливается просто:

Bash Скопировано
1
pip install flower
И запускается командой:

Bash Скопировано
1
celery -A myproject flower --port=5555
Flower предоставляет веб-интерфейс с богатой информацией о воркерах, очередях, выполняющихся и завершённых задачах, с графиками и статистикой.

Конфигурация безопасного доступа к Celery в Django



Безопасность системы асинхронных задач требует внимания к нескольким аспектам:

1. Защита брокера сообщений — использование аутентификации и авторизации:

Python Скопировано
1
2
3
4
5
# Для Redis
CELERY_BROKER_URL = 'redis://:сложный_пароль@redis-сервер:6379/0'
 
# Для RabbitMQ
CELERY_BROKER_URL = 'amqp://пользователь:пароль@rabbitmq-сервер:5672/vhost'
2. Ограничение доступа к веб-интерфейсу Flower через аутентификацию:

Bash Скопировано
1
celery -A myproject flower --port=5555 --basic-auth=user:pass
3. Защита от инъекций кода через ограничение форматов сериализации:

Python Скопировано
1
CELERY_ACCEPT_CONTENT = ['json']  # Исключаем pickle из соображений безопасности

Управление зависимостями между асинхронными задачами



В сложных системах задачи редко существуют изолированно — часто одна задача зависит от результатов другой или должна выполняться после неё.

Celery предоставляет несколько паттернов для организации взаимодействия задач:

1. Цепочки (Chains) — последовательное выполнение задач:

Python Скопировано
1
2
3
4
5
6
7
from celery import chain
 
result = chain(
    extract_data.s(url),  # Первая задача
    transform_data.s(),   # Вторая получит результат первой
    load_data.s(target)   # Третья получит результат второй
)()
2. Группы (Groups) — параллельное выполнение набора задач:

Python Скопировано
1
2
3
4
from celery import group
 
urls = ['http://example1.com', 'http://example2.com', ...]
result = group(fetch_url.s(url) for url in urls)()
3. Хорды (Chords) — выполнение задачи после завершения группы задач:

Python Скопировано
1
2
3
4
5
6
from celery import chord
 
result = chord(
    (process_part.s(data_part) for data_part in partitioned_data),
    aggregate_results.s()  # Эта задача запустится после выполнения всех частей
)()
4. Сигнатуры с обратными вызовами для более гибкого управления:

Python Скопировано
1
2
3
4
5
task.apply_async(
    args=[arg1, arg2],
    link=success_callback.s(),
    link_error=error_callback.s()
)
Такие инструменты позволяют создавать сложные рабочие процессы с параллельными и последовательными шагами, условной логикой и обработкой ошибок.

Практические примеры применения



Теория без практики мертва, поэтому давайте рассмотрим конкретные сценарии, где Celery действительно спасает положение в Django-приложениях. Каждый из этих примеров — не просто надуманная ситуация, а реальная проблема, с которой сталкиваются разработчики.

Обработка тяжелых вычислений



Представьте, что ваше приложение выполняет сложные математические вычисления, например, анализ статистических данных или обработку финансовых моделей. Выполнение таких операций напрямую в представлении привело бы к "зависанию" запроса.

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# tasks.py
@shared_task(bind=True, max_retries=3)
def analyze_dataset(self, dataset_id):
    from analytics.models import Dataset, AnalysisResult
    
    try:
        dataset = Dataset.objects.get(id=dataset_id)
        
        # Имитация ресурсоемких вычислений
        results = {}
        start_time = time.time()
        
        # Допустим, здесь используется pandas, numpy или другая
        # библиотека для анализа данных
        import pandas as pd
        df = pd.DataFrame(dataset.get_data())
        
        # Несколько тяжелых операций
        results['mean'] = df.mean().to_dict()
        results['correlation'] = df.corr().to_dict()
        results['regression'] = perform_regression_analysis(df)
        
        # Сохраняем результаты
        AnalysisResult.objects.create(
            dataset=dataset,
            results=results,
            processing_time=time.time() - start_time
        )
        
        return f"Анализ датасета {dataset_id} завершен успешно"
    except Dataset.DoesNotExist:
        return f"Датасет {dataset_id} не найден"
    except Exception as exc:
        self.retry(exc=exc, countdown=60*5)  # Повторная попытка через 5 минут
В представлении вызов этой задачи будет мгновенным, позволяя пользователю продолжить работу:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@login_required
def analyze_view(request, dataset_id):
    if request.method == 'POST':
        task = analyze_dataset.delay(dataset_id)
        request.session['analysis_task_id'] = task.id
        messages.success(
            request, 
            "Анализ запущен! Вы получите уведомление, когда он завершится."
        )
        return redirect('dataset_detail', dataset_id=dataset_id)
    
    return render(request, 'analytics/analyze_form.html', {
        'dataset_id': dataset_id
    })

Отправка электронных писем



Мы уже видели пример с отправкой приветственного email, но давайте рассмотрим более сложный случай — массовую рассылку с персонализацией:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@shared_task(rate_limit='100/m')  # Ограничение: 100 писем в минуту
def send_newsletter(campaign_id, recipient_ids=None):
    from marketing.models import Campaign, Subscriber
    
    campaign = Campaign.objects.get(id=campaign_id)
    
    # Если список получателей не указан, берем всех активных подписчиков
    if not recipient_ids:
        recipients = Subscriber.objects.filter(
            is_active=True, 
            subscribed_to_campaigns=True
        )
    else:
        recipients = Subscriber.objects.filter(id__in=recipient_ids)
    
    # Запускаем индивидуальные задачи для каждого получателя
    sent_count = 0
    for recipient in recipients:
        # Используем транзакционную задачу для каждого получателя
        send_personalized_email.delay(
            campaign_id, 
            recipient.id
        )
        sent_count += 1
    
    return f"Запланировано отправок: {sent_count}"
 
@shared_task(bind=True, max_retries=3)
def send_personalized_email(self, campaign_id, subscriber_id):
    from marketing.models import Campaign, Subscriber, EmailLog
    
    try:
        campaign = Campaign.objects.get(id=campaign_id)
        subscriber = Subscriber.objects.get(id=subscriber_id)
        
        # Персонализация шаблона
        context = {
            'first_name': subscriber.first_name,
            'last_name': subscriber.last_name,
            'unsubscribe_url': f"https://example.com/unsubscribe/{subscriber.unsubscribe_token}/"
        }
        
        # Рендеринг HTML и текстовой версии
        html_content = render_to_string(
            'emails/newsletter.html', 
            {**campaign.template_context, **context}
        )
        text_content = strip_tags(html_content)
        
        # Отправка сообщения
        msg = EmailMultiAlternatives(
            campaign.subject,
            text_content,
            settings.DEFAULT_FROM_EMAIL,
            [subscriber.email]
        )
        msg.attach_alternative(html_content, "text/html")
        msg.send()
        
        # Логируем успешную отправку
        EmailLog.objects.create(
            campaign=campaign,
            subscriber=subscriber,
            status='sent'
        )
        
        return f"Email для {subscriber.email} отправлен"
    
    except (Campaign.DoesNotExist, Subscriber.DoesNotExist) as e:
        # Логируем ошибку
        return f"Ошибка: {str(e)}"
    
    except Exception as exc:
        # Если проблема с отправкой, попробуем повторить
        try:
            self.retry(exc=exc, countdown=60*30)  # Повтор через 30 минут
        except self.MaxRetriesExceededError:
            # Логируем окончательную неудачу
            EmailLog.objects.create(
                campaign_id=campaign_id,
                subscriber_id=subscriber_id,
                status='failed',
                error_message=str(exc)
            )
            return f"Не удалось отправить email после нескольких попыток: {str(exc)}"
Такая структура позволяет:
1. Контролировать скорость отправки, чтобы не перегрузить почтовый сервер.
2. Персонализировать каждое письмо.
3. Повторять отправку при временных сбоях.
4. Вести подробную статистику доставки.

Работа с внешними API



Обращения к внешним API — ещё один отличный кандидат для Celery-задач, особенно если API имеет ограничения по частоте запросов:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@shared_task(bind=True, max_retries=5, rate_limit='5/m')
def fetch_currency_rates(self):
    """Получает текущие курсы валют из внешнего API и сохраняет в базу"""
    import requests
    from finance.models import CurrencyRate
    
    API_URL = "https://api.exchangerate-api.com/v4/latest/USD"
    
    try:
        response = requests.get(API_URL, timeout=10)
        response.raise_for_status()  # Вызовет исключение при HTTP-ошибке
        
        data = response.json()
        rates = data.get('rates', {})
        
        # Сохраняем курсы в базу
        batch = []
        timestamp = timezone.now()
        
        for currency, rate in rates.items():
            batch.append(CurrencyRate(
                currency_code=currency,
                rate=Decimal(str(rate)),  # Безопасное преобразование float в Decimal
                source='exchangerate-api',
                timestamp=timestamp
            ))
        
        # Массовое создание записей
        CurrencyRate.objects.bulk_create(batch)
        
        return f"Обновлено {len(batch)} валютных курсов"
    
    except requests.RequestException as exc:
        # Экспоненциальная задержка между повторами
        retry_backoff = 2 ** self.request.retries
        self.retry(exc=exc, countdown=retry_backoff * 60)
В этом примере мы реализовали:
  • Ограничение частоты запросов (rate_limit='5/m').
  • Экспоненциальную задержку между повторами.
  • Обработку ошибок сети.
  • Пакетное сохранение результатов.

Асинхронная обработка загружаемых пользователем файлов



Обработка загружаемых файлов часто требует значительных ресурсов, особенно если речь идёт о работе с изображениями или большими документами:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@shared_task
def process_uploaded_image(image_id):
    from gallery.models import Image
    
    try:
        img = Image.objects.get(id=image_id)
        original_path = img.file.path
        
        # Создаем миниатюры разных размеров
        from PIL import Image as PILImage
        
        image = PILImage.open(original_path)
        
        # Создаем миниатюру
        thumb_path = original_path.replace('.', '_thumb.')
        image.copy()
        image.thumbnail((200, 200))
        image.save(thumb_path, quality=90)
        
        # Создаем версию среднего размера
        medium_path = original_path.replace('.', '_medium.')
        image.copy()
        image.thumbnail((800, 800))
        image.save(medium_path, quality=90)
        
        # Оптимизируем оригинал (например, сжимаем JPEG)
        if original_path.lower().endswith('.jpg') or original_path.lower().endswith('.jpeg'):
            optimized = PILImage.open(original_path)
            optimized.save(original_path, quality=85, optimize=True)
        
        # Обновляем метаданные в базе
        img.width, img.height = image.size
        img.processed = True
        img.thumbnail_url = thumb_path.replace(settings.MEDIA_ROOT, settings.MEDIA_URL)
        img.medium_url = medium_path.replace(settings.MEDIA_ROOT, settings.MEDIA_URL)
        img.save()
        
        return f"Изображение {image_id} обработано успешно"
    
    except Image.DoesNotExist:
        return f"Изображение {image_id} не найдено"
    except Exception as e:
        # Логируем ошибку и обновляем статус
        img.processed = False
        img.processing_error = str(e)
        img.save()
        raise
Таким образом, пользователь может загрузить файл и увидеть мгновенный отклик системы, пока обработка происходит в фоне.

Продвинутые техники



После освоения базовых возможностей Celery пора погрузиться в более сложные и мощные техники, которые помогут вывести работу с асинхронными задачами на новый уровень. Эти продвинутые возможности позволяют создавать масштабируемые, надёжные и легко поддерживаемые системы.

Периодические задачи и Celery Beat



Для многих приложений критически важна возможность выполнять задачи по расписанию – будь то ночное резервное копирование, еженедельные отчёты или ежеминутное обновление кэша. Celery Beat – встроенный планировщик, который решает эту задачу.
Настройка периодических задач выполняется в файле settings.py:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# settings.py
from celery.schedules import crontab
 
CELERY_BEAT_SCHEDULE = {
    'ежедневный-отчет': {
        'task': 'reports.tasks.generate_daily_report',
        'schedule': crontab(hour=7, minute=30),  # Каждый день в 7:30
    },
    'обновление-курсов-валют': {
        'task': 'finance.tasks.update_exchange_rates',
        'schedule': 3600.0,  # Каждый час (в секундах)
        'args': (["USD", "EUR", "GBP"],)
    },
    'очистка-сессий': {
        'task': 'core.tasks.clean_expired_sessions',
        'schedule': crontab(hour=3, minute=0, day_of_week='mon'),  # По понедельникам в 3:00
    }
}
Для запуска планировщика нужно выполнить команду:

Bash Скопировано
1
celery -A myproject beat
В производственной среде Beat рекомендуется запускать вместе с воркерами или как отдельный сервис под управлением Supervisor:

Python Скопировано
1
2
3
4
5
6
7
8
9
[program:celerybeat]
command=/path/to/venv/bin/celery -A myproject beat --loglevel=INFO
directory=/path/to/project
user=celery
numprocs=1
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat.log
autostart=true
autorestart=true
Если вам нужна более гибкая настройка расписания, можно хранить его в базе данных с помощью django-celery-beat:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from django_celery_beat.models import PeriodicTask, IntervalSchedule
 
# Создаем интервал - каждые 10 минут
schedule, created = IntervalSchedule.objects.get_or_create(
    every=10,
    period=IntervalSchedule.MINUTES,
)
 
# Создаем периодическую задачу
PeriodicTask.objects.create(
    interval=schedule,
    name='Синхронизация данных',
    task='data.tasks.sync_external_data',
    args=json.dumps(['arg1', 'arg2']),
    kwargs=json.dumps({
        'param1': 'value1',
    }),
    expires=datetime.utcnow() + timedelta(days=30),
)
Это позволяет динамически менять расписание через API или административную панель Django без перезапуска сервера.

Обработка ошибок и ретраи



Надёжная система асинхронных задач требует хорошо продуманной стратегии обработки ошибок. Celery предлагает несколько механизмов:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@shared_task(
    bind=True,
    max_retries=5,
    default_retry_delay=60,
    autoretry_for=(RequestException, ConnectionError),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True
)
def process_payment(self, payment_id):
    try:
        # Основная логика обработки платежа
        payment = Payment.objects.get(id=payment_id)
        
        response = requests.post(
            'https://payment-gateway.example/process',
            json=payment.get_payload(),
            timeout=15
        )
        
        if response.status_code >= 400:
            # Пример специфичной обработки ошибок
            error_data = response.json()
            if error_data.get('error_code') == 'insufficient_funds':
                # Не повторяем, если недостаточно средств
                payment.status = 'failed'
                payment.error_message = 'Недостаточно средств'
                payment.save()
                return False
            elif error_data.get('error_code') == 'gateway_timeout':
                # Специальная задержка для этого типа ошибок
                raise self.retry(countdown=120, exc=Exception("Таймаут шлюза"))
            else:
                # Стандартная обработка других ошибок
                raise self.retry(exc=Exception(f"Ошибка платежного шлюза: {error_data}"))
        
        # Успешная обработка
        payment.status = 'completed'
        payment.save()
        return True
    
    except Payment.DoesNotExist:
        # Логируем и не повторяем
        logger.error(f"Payment {payment_id} not found")
        return False
В этом примере используются:
autoretry_for — автоматический повтор для указанных исключений.
retry_backoff — экспоненциальное увеличение задержки между попытками.
retry_backoff_max — максимальная задержка (10 минут).
retry_jitter — добавление случайного компонента к задержке.

Для более сложных сценариев можно использовать задачи-обработчики ошибок:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@shared_task(bind=True)
def process_data(self, data_id):
    try:
        # Основная логика
        return result
    except Exception as exc:
        # Запускаем задачу-обработчик ошибок
        handle_task_error.delay(
            task_id=self.request.id,
            task_name=self.name,
            args=self.request.args,
            kwargs=self.request.kwargs,
            exception=str(exc),
            traceback=traceback.format_exc()
        )
        # Перебрасываем исключение дальше
        raise

Масштабирование воркеров



С ростом нагрузки на приложение необходимо эффективно масштабировать систему воркеров Celery. Существует несколько стратегий:

Вертикальное масштабирование — увеличение числа воркеров на одном сервере:

Bash Скопировано
1
2
# Запуск 8 воркеров на одной машине
celery -A myproject worker --concurrency=8 --loglevel=INFO
Горизонтальное масштабирование — добавление новых серверов с воркерами:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
# Конфигурация для разных типов воркеров
CELERY_WORKER_POOL_RESTARTS = True  # Автоматический перезапуск пула
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # Контроль prefetch для баланса нагрузки
 
# Настройки для специализированных очередей
CELERY_TASK_ROUTES = {
    'myapp.tasks.process_video': {'queue': 'media'},
    'myapp.tasks.send_email': {'queue': 'emails'},
    'myapp.tasks.generate_report': {'queue': 'reports'},
}
Запуск специализированных воркеров:

Bash Скопировано
1
2
3
4
5
# Воркер для обработки видео с повышенной конкурентностью
celery -A myproject worker -Q media --concurrency=4 --loglevel=INFO
 
# Воркер для отправки email с ограниченной конкурентностью
celery -A myproject worker -Q emails --concurrency=2 --loglevel=INFO
Автоматическое масштабирование — с помощью флага --autoscale:

Bash Скопировано
1
2
# Автомасштабирование от 5 до 20 воркеров в зависимости от нагрузки
celery -A myproject worker --autoscale=20,5 --loglevel=INFO
В контейнерной среде (Docker, Kubernetes) можно настроить автоматическое масштабирование числа подов с воркерами на основе метрик очередей.

Профилирование и оптимизация производительности



Для высоконагруженных систем критично понимать, где скрываются узкие места в асинхронных задачах.

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@shared_task
def optimized_task(large_dataset_id):
    from django.db import connection
    
    # Отключаем автокоммит для пакетных операций
    old_autocommit = connection.autocommit
    connection.autocommit = False
    
    try:
        dataset = LargeDataset.objects.get(id=large_dataset_id)
        items = dataset.items.all()
        
        # Обрабатываем данные пакетами для экономии памяти
        batch_size = 1000
        for i in range(0, items.count(), batch_size):
            batch = items[i:i+batch_size]
            
            # Производим необходимые операции с пакетом
            process_batch(batch)
            
            # Выполняем промежуточный коммит
            connection.commit()
        
        return "Обработка завершена успешно"
    finally:
        # Восстанавливаем исходное значение автокоммита
        connection.autocommit = old_autocommit
Для профилирования задач можно использовать декораторы:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def profile_task(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        import cProfile, pstats, io
        from pstats import SortKey
        
        profile = cProfile.Profile()
        profile.enable()
        
        try:
            result = func(*args, **kwargs)
            return result
        finally:
            profile.disable()
            
            # Выводим статистику профилирования
            s = io.StringIO()
            ps = pstats.Stats(profile, stream=s).sort_stats(SortKey.CUMULATIVE)
            ps.print_stats(20)  # Топ-20 самых затратных операций
            
            # Сохраняем результаты
            TaskProfile.objects.create(
                task_name=func.__name__,
                args=str(args),
                kwargs=str(kwargs),
                profile_data=s.getvalue()
            )
    
    return wrapper

Организация тестирования асинхронных операций



Тестирование асинхронных задач связано с рядом сложностей, ведь они выполняются отдельно от основного потока и часто зависят от внешних систем. В Django можно реализовать эффективное тестирование Celery-задач несколькими способами:

Использование CELERY_TASK_ALWAYS_EAGER

Самый простой подход — переключить Celery в синхронный режим в тестовом окружении:

Python Скопировано
1
2
3
# settings/test.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True  # Пробрасывать исключения
Это позволяет тестировать задачи как обычные функции:

Python Скопировано
1
2
3
4
5
6
7
8
9
from django.test import TestCase
from myapp.tasks import process_upload
 
class TaskTestCase(TestCase):
    def test_process_upload(self):
        # Задача выполнится синхронно
        result = process_upload.delay(file_id=1)
        self.assertTrue(result.successful())
        self.assertEqual(result.result, "ожидаемый_результат")
Изоляция задач с моками

Для более точного тестирования можно использовать патчинг и моки:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from unittest.mock import patch
from django.test import TestCase
from myapp.tasks import send_notification
from myapp.models import User
 
class NotificationTaskTest(TestCase):
    def setUp(self):
        self.user = User.objects.create(email="test@example.com")
    
    @patch('myapp.tasks.send_mail')
    def test_send_notification(self, mock_send_mail):
        # Вызываем задачу напрямую
        send_notification(self.user.id)
        
        # Проверяем, что send_mail был вызван с правильными аргументами
        mock_send_mail.assert_called_once()
        args, kwargs = mock_send_mail.call_args
        self.assertEqual(kwargs['recipient_list'], [self.user.email])
Интеграционное тестирование с реальным Celery

Для полноценного тестирования можно запустить реальные воркеры в тестовой среде:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
from django.test import TransactionTestCase
from celery.result import AsyncResult
from myapp.tasks import complex_task
 
class CeleryIntegrationTest(TransactionTestCase):
    def test_complex_task_integration(self):
        # Запускаем реальную асинхронную задачу
        task = complex_task.delay(param1="test", param2=123)
        
        # Дожидаемся результата с таймаутом
        timeout = 10
        start_time = time.time()
        while time.time() - start_time < timeout:
            if AsyncResult(task.id).ready():
                break
            time.sleep(0.1)
        
        # Проверяем результат
        result = AsyncResult(task.id)
        self.assertTrue(result.ready())
        self.assertTrue(result.successful())
        self.assertEqual(result.result, "ожидаемый_результат")

Работа с очередями приоритетов



В реальных системах не все задачи одинаково важны. Celery позволяет организовать очереди с приоритетами:

Python Скопировано
1
2
3
4
5
6
# settings.py
CELERY_TASK_ROUTES = {
    'myapp.tasks.critical_notification': {'queue': 'high_priority'},
    'myapp.tasks.daily_report': {'queue': 'low_priority'},
    'myapp.tasks.*': {'queue': 'default'},
}
Запуск воркеров с учётом приоритета:

Bash Скопировано
1
2
3
4
5
# Высокоприоритетный воркер с большим числом процессов
celery -A myproject worker -Q high_priority,default --prefetch-multiplier=1 --concurrency=10
 
# Низкоприоритетный воркер
celery -A myproject worker -Q low_priority --prefetch-multiplier=1 --concurrency=3
Для Redis можно настроить более тонкую систему приоритетов:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class TaskPriority:
    HIGH = 0
    MEDIUM = 3
    LOW = 6
    BACKGROUND = 9
 
@shared_task(bind=True)
def prioritized_task(self, importance, data):
    # Логика задачи
    pass
 
# Вызов с указанием приоритета
prioritized_task.apply_async(
    args=[TaskPriority.HIGH, data],
    priority=TaskPriority.HIGH
)

Хранение и анализ метрик производительности



Для профессионального мониторинга недостаточно базовых инструментов. Комплексное решение включает:

1. Сбор метрик производительности:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@shared_task(bind=True)
def instrumented_task(self, *args, **kwargs):
    import time
    from prometheus_client import Histogram
    
    TASK_DURATION = Histogram(
        'task_duration_seconds',
        'Task duration in seconds',
        ['task_name']
    )
    
    start_time = time.time()
    try:
        result = actual_task_logic(*args, **kwargs)
        status = 'success'
    except Exception as e:
        status = 'error'
        raise
    finally:
        duration = time.time() - start_time
        TASK_DURATION.labels(self.name).observe(duration)
        
        # Сохраняем детальную информацию в БД
        TaskExecution.objects.create(
            task_id=self.request.id,
            task_name=self.name,
            args=str(args),
            kwargs=str(kwargs),
            duration=duration,
            status=status,
            worker=self.request.hostname,
            memory_usage=get_current_memory_usage()
        )
    
    return result
2. Визуализация и анализ статистики:

Создайте маршрут в Django для просмотра статистики:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def task_performance_dashboard(request):
    # Агрегированная статистика по задачам
    task_stats = TaskExecution.objects.values('task_name').annotate(
        avg_duration=Avg('duration'),
        max_duration=Max('duration'),
        min_duration=Min('duration'),
        success_rate=100 * Count('id', filter=Q(status='success')) / Count('id'),
        count=Count('id')
    ).order_by('-avg_duration')
    
    # График выполнения по времени
    timeline_data = TaskExecution.objects.filter(
        created_at__gte=timezone.now() - timezone.timedelta(days=7)
    ).values('created_at__date').annotate(
        count=Count('id'),
        success=Count('id', filter=Q(status='success')),
        failed=Count('id', filter=Q(status='error'))
    ).order_by('created_at__date')
    
    return render(request, 'tasks/dashboard.html', {
        'task_stats': task_stats,
        'timeline_data': timeline_data
    })

Стратегии обновления Celery в работающих проектах



Обновление Celery в production может быть критичной операцией. Основные стратегии:

Поэтапное обновление:

1. Обновите только клиентскую часть Celery (в Django).
2. Постепенно обновляйте воркеры, начиная с одного.
3. Мониторьте поведение обновлённых воркеров.
4. Постепенно замените все воркеры.

Параллельное развёртывание:

1. Запустите новые воркеры с обновлённой версией на отдельной очереди.
2. Переключите часть задач на новую очередь.
3. Если всё работает стабильно, увеличивайте долю задач.
4. Полностью переключитесь на новую версию, когда убедитесь в её стабильности.

Использование Celery версии-кандидата:

Python Скопировано
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Тестирование новой версии параллельно с текущей
from celery import Celery
 
# Существующее приложение
current_app = Celery('current_app', broker='redis://localhost:6379/0')
 
# Тестовое приложение с новой версией
new_app = Celery('new_app', broker='redis://localhost:6379/1')
 
@current_app.task
def current_version_task():
    pass
 
@new_app.task
def new_version_task():
    pass
 
# Запуск задачи в обеих версиях для сравнения
def compare_versions():
    current_result = current_version_task.delay()
    new_result = new_version_task.delay()
    # Сравнение результатов и поведения
Независимо от выбранной стратегии, крайне важно тщательно тестировать обновления в среде, максимально приближенной к production, и иметь план отката в случае проблем.

Путь к эффективной асинхронной архитектуре



Один из ключевых принципов — правильное разделение ответственности. Не стоит помещать всю логику в воркеры Celery только потому, что это возможно. Определите, какие операции действительно выигрывают от асинхронного выполнения: тяжёлые вычисления, внешние API-вызовы, операции с файловой системой. Простые операции с базой данных обычно эффективнее выполнять синхронно.

Мониторинг — ещё один критический аспект. Система без мониторинга подобна самолёту без приборов. Минимальный набор метрик должен включать:
  1. Количество успешных и неудачных задач.
  2. Время выполнения задач (p50, p95, p99 перцентили).
  3. Длина очередей.
  4. Загрузка воркеров.
  5. Потребление памяти.

Такой мониторинг позволит не только реагировать на проблемы, но и предвидеть их до того, как они повлияют на пользователей.

В производственной среде стоит тщательно настроить тайм-ауты для всех внешних операций. Без этого задача может "зависнуть" навсегда, блокируя воркер и постепенно снижая производительность всей системы. Добавляйте тайм-ауты для HTTP-запросов, операций с базами данных и другими внешними системами.
Грамотное управление ресурсами — ещё один секрет эффективной системы. Часто разработчики увлекаются созданием множества мелких задач, что приводит к перегрузке брокера сообщений. Объединяйте мелкие операции в пакеты, где это возможно, и используйте очереди с приоритетами для критически важных задач.
При проектировании задач помните о идемпотентности — возможности безопасного повторного выполнения задачи без побочных эффектов. Идемпотентные задачи упрощают обработку сбоев и развёртывание обновлений.
Наконец, инвестируйте время в автоматизацию развёртывания и управления инфраструктурой Celery. Используйте контейнеризацию и оркестрацию (Docker, Kubernetes) для гибкого масштабирования и управления воркерами.

Django и асинхронные данные (websockets?)
Есть задача - веб приложение на django, которое отображает данные с датчиков. Данные пишутся в...

Как сделать автозапуск кода каждые 24 часа на celery?
Извините за столь дерзкую просьбу, но мне нужен авто запуск кода каждые 24 часа на celery. Я бы и...

Объясните разницу между twisted и celery
Добрый день! Друзья, если возможно, объясните, пожалуйста, разницу между этими двумя фреймворками....

Подскажите литературу по Celery
Добрый день, друзья! Посоветуйте, пожалуйста, почитать хорошие материалы (книги, туториалы и т.п)...

Djang + Celery + djcelery не показываются воркеры
В админке джанги не показываются таски и воркеры. Хотя, таски обрабатываются и ставятся, как...

Как узнать выполняет ли celery асинхронно функцию?
Прилагаю настройки celery # settings.py # CELERY STUFF BROKER_URL = 'redis://localhost:6379'...

Celery error
Добрый день! Реализовал связку tornado + celery. Все работает, запускается. Пока в worker у...

Множественная обработка GET-запросов через celery
Добрый день! У меня есть задача - я обращаюсь к стороннему API, к которому делаю запрос и...

Celery and RabbitMQ
Всем привет. У меня есть задача, которая должна обойти весь индекс эластика и запустить над каждым...

Как создать задачу по событию в Celery?
Здравствуйте. Для проекта интернет-магазина возникла задача очистки товаров в корзине, если заказ...

Разобраться с логикой приложения (Celery)
Всем привет! Использую фреймворк Django + Celery. У меня есть список товаров, которые обновляются...

Как запустить celery таску в pytest классе
Есть условная celery таска class CreateNotifications(Task): name = 'create-notifications' ...

Размещено в Без категории
Надоела реклама? Зарегистрируйтесь и она исчезнет полностью.
Всего комментариев 0
Комментарии
 
Новые блоги и статьи
Линейное решение не линейной задачи (емкость вычислений в сравнении с традиционными решениями пока не определена).
Hrethgir 10.04.2025
В рамках предстоящих вычислений пришлось (да, я тоже знаю про корень числа, и про степеня, и прочие теоремы, но. . . ) найти способ нахождения отношения двух углов. . . .
Запуск контейнеров Docker на ARM64
Mr. Docker 09.04.2025
Появление таких решений, как Apple M1/ M2, AWS Graviton, Ampere Altra и Raspberry Pi, сделало использование ARM-систем обыденностью для многих разработчиков и DevOps-инженеров. При этом Docker,. . .
Vue SFC компонент на PHP с Fusion
Jason-Webb 09.04.2025
PHP на сервере и JavaScript на клиенте — классическое сочетание, которое, несмотря на свою эффективность, создает определенный когнитивный диссонанс при разработке. В этом контексте появляются. . .
TypeScript vs JavaScript: Отличия и когда что использовать
Reangularity 09.04.2025
JavaScript появился в 1995 году как творение Брендана Эйха и быстро стал основой интерактивности в вебе. За свою историю он прошел путь от простого языка для манипуляций с DOM до полноценной. . .
Подключение Kafka к Elasticsearch
Codd 09.04.2025
Apache Kafka и Elasticsearch — две мощные технологии, которые при совместном использовании создают эффективную платформу для обработки и анализа данных в реальном времени. Kafka, выступая в роли. . .
Реализации таймеров в Unity
GameUnited 09.04.2025
Время — важный ресурс любой игры. Разработка качественных игровых механик невозможна без грамотного управления временем, а таймеры выступают ключевым инструментом этого управления. Представьте себе. . .
Функции высшего порядка в JavaScript
run.dev 09.04.2025
Функции высшего порядка представляют собой один из фундаментальных камней функционального программирования в JavaScript. По сути, это функции, которые либо принимают другие функции в качестве. . .
Flutter: Оптимизация производительности сложных UI
mobDevWorks 09.04.2025
Когда речь идет о сложных интерфейсах, Flutter сталкивается с несколькими фундаментальными проблемами производительности. Одна из них — избыточная перерисовка (repainting), когда даже небольшие. . .
Замыкания в Python
py-thonny 09.04.2025
Что такое замыкание? В простейшем определении, замыкание - это функция, которая запоминает и хранит доступ к переменным из охватывающей её области видимости, даже когда эта функция вызывается за. . .
Реализация Event-Driven архитектуры с RabbitMQ и Kafka в Nest.js
ArchitectMsa 09.04.2025
В монолитных системах сервисы обычно общаются напрямую через HTTP-запросы. Простой подход, удобный для начала разработки — но что происходит, когда система растёт? Синхронное взаимодействие быстро. . .
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin
Copyright ©2000 - 2025, CyberForum.ru
Выделить код Копировать код Сохранить код Нормальный размер Увеличенный размер