Celery — Best Practices
Celery — один из самых распространённых способов выносить тяжёлую работу в фоновые задачи в Python-проектах. Но вместе с очередями приходят и типичные подводные камни: потерянные и продублированные задачи, неожиданные блокировки и сложность отладки.
Даурен Талгатулы — Senior Software Engineer в Ergeon — на опыте продукта с ~50 тыс. задач в день делится лучшими практиками работы с Celery.
В докладе: — что такое Celery и какие брокеры выбирать (Redis, RabbitMQ, Amazon SQS) — минимум логики и простые аргументы в задачах, идемпотентность и транзакции — обратная совместимость при изменении сигнатуры задач: «резиновая» сигнатура (**kwargs), Blue/Green и Rolling-деплой — таймауты и retry policy: экспоненциальная задержка, max_retries, autoretry_for — приоритеты и роутинг очередей, Celery Beat, борьба с утечками памяти — мониторинг (Sentry, Flower, Prometheus + Grafana) и автомасштабирование (--autoscale, HPA в Kubernetes)
Видео
Презентация
1 / 39Текст презентации
Слайд 1: PYTHON, CELERY АЛМАТЫ Даурен Талгатулы
PYTHON, CELERY АЛМАТЫ Даурен Талгатулы CELERY - BEST PRACTICES
Слайд 2: Стартап в сфере благоустройства дома
• Стартап в сфере благоустройства дома • Аккаунт, Платежи, Уведомления и т.д. • Около 50 тыс. задач в обычный день, в хорошие дни может быть больше • Различные типы задач, длительные, короткие, вычисления, сетевые О НАС
Слайд 3: Что такое Celery
• Что такое Celery • Какие задачи решали мы • Лучшие практики • Настройка • Мониторинг О ЧЕМ ПОГОВОРИМ
Слайд 4: это асинхронная распределенная очередь задач, написанная на Python.
- это асинхронная распределенная очередь задач, написанная на Python. CELERY
Слайд 6: Redis
- Redis - RabbitMQ - AmazonSQS Брокеры
Слайд 7: from celery import current_app
from celery import current_app @current_app.task def send_email(email: str): print(f’Sending email to {email}’) Unit of work
Слайд 8: Вызов функции - задача выполнится в runtime
Вызов функции - задача выполнится в runtime: send_email([email protected]) Задача выполнится в воркере: send_email.delay([email protected]) Запуск задачи
Слайд 9: @app.task(queue=’high_priority’)
@app.task(queue=’high_priority’) def update_order_status(order_id, *args, **kwargs): order = Order.objects.get(id=order_id) order.update_status() Минимум логики в задаче
Слайд 10: Сообщение занимает меньше памяти
- Сообщение занимает меньше памяти - Проще сериализовать - Данные явно сохраняются и могут быть использованы при retry Простые аргументы в параметрах
Слайд 11: Безопасно
- Безопасно - Сложно реализовать в некоторых задачах - Используйте транзакции Идемпотентные задачи
Слайд 12: Blue/Green Deployment
- Blue/Green Deployment - Rolling Update Обратная совместимость
Слайд 15: Есть набор изменений, который изменяет сигнатуру задачи celery (например,
- Есть набор изменений, который изменяет сигнатуру задачи celery (например, добавляет аргумент, изменяет имена аргументов, перемещает задачу celery из одного модуля в другой и т. д.) - Пока старый код все еще выполняется, есть ряд указанных задач, запланированных с ETA/Countdown - Набор изменений развертывается: старые рабочие процессы завершаются и запускаются новые рабочие процессы - Новый рабочий процесс извлекает или начинает выполнять старую задачу ETA/countdown из очереди - Бум 💥 Изменения сигнатуры метода
Слайд 16: [ERROR/MainProcess] Received unregistered task of type old_module.my_task
[ERROR/MainProcess] Received unregistered task of type old_module.my_task или [ERROR/ForkPoolWorker-1] Task my_task[<task_id>] raised unexpected: TypeError("my_task() missing 1 required positional argument: 'some_new_argument'") Изменения сигнатуры метода
Слайд 17: Сохраняйте обратную совместимость между двумя версиями задачи
Сохраняйте обратную совместимость между двумя версиями задачи: Используйте “резиновую” сигнатуру (**kwargs) Обратная совместимость
Слайд 18: Глобальные таймауты (hard, soft)
- Глобальные таймауты (hard, soft) - soft_limit_timeout - expires - eta (countdown) + visibility timeout Таймауты
Слайд 19: Используйте экспоненциальную задержку (Exponential Backoff)
Используйте экспоненциальную задержку (Exponential Backoff) @app.task( bind=True, retry_backoff=True, retry_backoff_max=600, def send_email(self): # Логика задачи Retry policy
Слайд 20: Установите максимальное количество повторных попыток
Установите максимальное количество повторных попыток @app.task(bind=True, max_retries=3) def limited_retry_task(self): # Логика задачи Retry policy
Слайд 21: Повторяйте попытки только для определенных, устранимых исключений
Повторяйте попытки только для определенных, устранимых исключений: from requests.exceptions import RequestException @app.task( bind=True, max_retries=3, autoretry_for=(RequestException,), def api_call_task(self): # Логика вызова API Retry policy
Слайд 22: CELERY_BEAT_SCHEDULE = {
CELERY_BEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'myapp.tasks.add', 'schedule': 30.0, 'args': (16, 16), 'options': { 'expires': 15.0, Celery Beat
Слайд 23: CELERY_WORKER_MAX_TASKS_PER_CHILD=250
CELERY_WORKER_MAX_TASKS_PER_CHILD=250 CELERY_WORKER_MAX_MEMORY_PER_CHILD = 300000 # 300 Mb Утечки памяти
Слайд 24: CELERY_QUEUES = [
CELERY_QUEUES = [ 'high_priority', 'low_priority', 'default', # Default queue for regular tasks Приоритеты задач
Слайд 25: celery -A app worker -n default -Q default
celery -A app worker -n default -Q default celery -A app worker -n high -Q high_priority Приоритеты задач
Слайд 26: CELERY_TASK_ROUTES = {
CELERY_TASK_ROUTES = { # -- HIGH PRIORITY QUEUE -- # 'notifications.tasks_high.*': {'queue': 'high_priority'}, 'payments.tasks.*': {'queue': 'high_priority'}, # -- LOW PRIORITY QUEUE -- # ‘calendar.tasks_low.*': {'queue': 'low_priority'}, 'notifications.tasks_low.*': {'queue': 'low_priority'}, Routing
Слайд 27: Sentry Integration
- Sentry Integration - Flower для маленьких проектов - Prometheus, Grafana Мониторинг
Слайд 28: sentry_sdk.init(
sentry_sdk.init( integrations=[ CeleryIntegration( monitor_beat_tasks=True, Sentry Integration
Слайд 29: Web-based UI, HTTP API
- Web-based UI, HTTP API - Простые графики - Можно менять конфиг через UI Flower
Слайд 31: celery exporter
- celery exporter - Использует встроенный компонент мониторинга в реальном времени в Celery для предоставления метрик Prometheus - Отслеживает статус задачи (задача запущена, задача выполнена успешно, задача не выполнена и т. д.) - Отслеживает, какие рабочие процессы запущены, и количество активных задач - Панели инструментов Grafana Prometheus
Слайд 32: Grafana
Grafana
Слайд 33: нагрузками.
Автомасштабирование в Celery позволяет динамически изменять количество рабочих процессов (workers) в зависимости от текущей нагрузки. Это помогает эффективно использовать ресурсы и справляться с пиковыми нагрузками. Как включить автомасштабирование Для включения автомасштабирования используйте параметры --autoscale при запуске worker'а: celery -A your_app worker --autoscale=10,3 Autoscaling
Слайд 34: Ограничения
Ограничения ● Автомасштабирование работает только на уровне отдельного worker'а, не между серверами. ● Слишком частое масштабирование может привести к накладным расходам. Autoscaling
Слайд 35: Horizontal Pod Autoscaler (HPA) в Kubernetes позволяет автоматически
Horizontal Pod Autoscaler (HPA) в Kubernetes позволяет автоматически масштабировать количество подов в зависимости от наблюдаемой загрузки CPU или других метрик. Это можно эффективно использовать для масштабирования Celery workers. Autoscaling in K8s
Слайд 36: apiVersion: autoscaling/v2beta1
apiVersion: autoscaling/v2beta1 kind: HorizontalPodAutoscaler metadata: name: celery-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: celery-worker minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu targetAverageUtilization: 70 HPA
Слайд 37: apiVersion: autoscaling/v2beta1
apiVersion: autoscaling/v2beta1 kind: HorizontalPodAutoscaler metadata: name: celery-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: celery-worker minReplicas: 3 maxReplicas: 10 metrics: - type: Pods pods: metricName: celery_queue_length targetAverageValue: 100 HPA
Слайд 38: Celery once (https://github.com/cameronmaske/celery-once)
- Celery once (https://github.com/cameronmaske/celery-once) - Lock in Redis @celery.task(base=QueueOnce) def slow_task(): sleep(30) return "Done!" Run one task at a time
Слайд 39: Рахмет
Рахмет
Другие доклады митапа
- РС
- Д
- АГПрименение и оптимизация работы LLM. RAG, борьба с галлюцинациями Азамат Галимжанов














![Слайд 16: [ERROR/MainProcess] Received unregistered task of type old_module.my_task](/img/slides/talgatuly-celery-best-practices/p16.webp)





















