Celery — Best Practices

Almaty Python Meetup #4

ДТ
Даурен Талгатулы
Ergeon

Celery — один из самых распространённых способов выносить тяжёлую работу в фоновые задачи в Python-проектах. Но вместе с очередями приходят и типичные подводные камни: потерянные и продублированные задачи, неожиданные блокировки и сложность отладки.

Даурен Талгатулы — Senior Software Engineer в Ergeon — на опыте продукта с ~50 тыс. задач в день делится лучшими практиками работы с Celery.

В докладе: — что такое Celery и какие брокеры выбирать (Redis, RabbitMQ, Amazon SQS) — минимум логики и простые аргументы в задачах, идемпотентность и транзакции — обратная совместимость при изменении сигнатуры задач: «резиновая» сигнатура (**kwargs), Blue/Green и Rolling-деплой — таймауты и retry policy: экспоненциальная задержка, max_retries, autoretry_for — приоритеты и роутинг очередей, Celery Beat, борьба с утечками памяти — мониторинг (Sentry, Flower, Prometheus + Grafana) и автомасштабирование (--autoscale, HPA в Kubernetes)

Видео

Презентация

Слайд 1: PYTHON, CELERY АЛМАТЫ Даурен Талгатулы 1 / 39
Текст презентации

Слайд 1: PYTHON, CELERY АЛМАТЫ Даурен Талгатулы

PYTHON, CELERY АЛМАТЫ Даурен Талгатулы CELERY - BEST PRACTICES

Слайд 2: Стартап в сфере благоустройства дома

• Стартап в сфере благоустройства дома • Аккаунт, Платежи, Уведомления и т.д. • Около 50 тыс. задач в обычный день, в хорошие дни может быть больше • Различные типы задач, длительные, короткие, вычисления, сетевые О НАС

Слайд 3: Что такое Celery

• Что такое Celery • Какие задачи решали мы • Лучшие практики • Настройка • Мониторинг О ЧЕМ ПОГОВОРИМ

Слайд 4: это асинхронная распределенная очередь задач, написанная на Python.

- это асинхронная распределенная очередь задач, написанная на Python. CELERY

Слайд 6: Redis

- Redis - RabbitMQ - AmazonSQS Брокеры

Слайд 7: from celery import current_app

from celery import current_app @current_app.task def send_email(email: str): print(f’Sending email to {email}’) Unit of work

Слайд 8: Вызов функции - задача выполнится в runtime

Вызов функции - задача выполнится в runtime: send_email([email protected]) Задача выполнится в воркере: send_email.delay([email protected]) Запуск задачи

Слайд 9: @app.task(queue=’high_priority’)

@app.task(queue=’high_priority’) def update_order_status(order_id, *args, **kwargs): order = Order.objects.get(id=order_id) order.update_status() Минимум логики в задаче

Слайд 10: Сообщение занимает меньше памяти

- Сообщение занимает меньше памяти - Проще сериализовать - Данные явно сохраняются и могут быть использованы при retry Простые аргументы в параметрах

Слайд 11: Безопасно

- Безопасно - Сложно реализовать в некоторых задачах - Используйте транзакции Идемпотентные задачи

Слайд 12: Blue/Green Deployment

- Blue/Green Deployment - Rolling Update Обратная совместимость

Слайд 15: Есть набор изменений, который изменяет сигнатуру задачи celery (например,

- Есть набор изменений, который изменяет сигнатуру задачи celery (например, добавляет аргумент, изменяет имена аргументов, перемещает задачу celery из одного модуля в другой и т. д.) - Пока старый код все еще выполняется, есть ряд указанных задач, запланированных с ETA/Countdown - Набор изменений развертывается: старые рабочие процессы завершаются и запускаются новые рабочие процессы - Новый рабочий процесс извлекает или начинает выполнять старую задачу ETA/countdown из очереди - Бум 💥 Изменения сигнатуры метода

Слайд 16: [ERROR/MainProcess] Received unregistered task of type old_module.my_task

[ERROR/MainProcess] Received unregistered task of type old_module.my_task или [ERROR/ForkPoolWorker-1] Task my_task[<task_id>] raised unexpected: TypeError("my_task() missing 1 required positional argument: 'some_new_argument'") Изменения сигнатуры метода

Слайд 17: Сохраняйте обратную совместимость между двумя версиями задачи

Сохраняйте обратную совместимость между двумя версиями задачи: Используйте “резиновую” сигнатуру (**kwargs) Обратная совместимость

Слайд 18: Глобальные таймауты (hard, soft)

- Глобальные таймауты (hard, soft) - soft_limit_timeout - expires - eta (countdown) + visibility timeout Таймауты

Слайд 19: Используйте экспоненциальную задержку (Exponential Backoff)

Используйте экспоненциальную задержку (Exponential Backoff) @app.task( bind=True, retry_backoff=True, retry_backoff_max=600, def send_email(self): # Логика задачи Retry policy

Слайд 20: Установите максимальное количество повторных попыток

Установите максимальное количество повторных попыток @app.task(bind=True, max_retries=3) def limited_retry_task(self): # Логика задачи Retry policy

Слайд 21: Повторяйте попытки только для определенных, устранимых исключений

Повторяйте попытки только для определенных, устранимых исключений: from requests.exceptions import RequestException @app.task( bind=True, max_retries=3, autoretry_for=(RequestException,), def api_call_task(self): # Логика вызова API Retry policy

Слайд 22: CELERY_BEAT_SCHEDULE = {

CELERY_BEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'myapp.tasks.add', 'schedule': 30.0, 'args': (16, 16), 'options': { 'expires': 15.0, Celery Beat

Слайд 23: CELERY_WORKER_MAX_TASKS_PER_CHILD=250

CELERY_WORKER_MAX_TASKS_PER_CHILD=250 CELERY_WORKER_MAX_MEMORY_PER_CHILD = 300000 # 300 Mb Утечки памяти

Слайд 24: CELERY_QUEUES = [

CELERY_QUEUES = [ 'high_priority', 'low_priority', 'default', # Default queue for regular tasks Приоритеты задач

Слайд 25: celery -A app worker -n default -Q default

celery -A app worker -n default -Q default celery -A app worker -n high -Q high_priority Приоритеты задач

Слайд 26: CELERY_TASK_ROUTES = {

CELERY_TASK_ROUTES = { # -- HIGH PRIORITY QUEUE -- # 'notifications.tasks_high.*': {'queue': 'high_priority'}, 'payments.tasks.*': {'queue': 'high_priority'}, # -- LOW PRIORITY QUEUE -- # ‘calendar.tasks_low.*': {'queue': 'low_priority'}, 'notifications.tasks_low.*': {'queue': 'low_priority'}, Routing

Слайд 27: Sentry Integration

- Sentry Integration - Flower для маленьких проектов - Prometheus, Grafana Мониторинг

Слайд 28: sentry_sdk.init(

sentry_sdk.init( integrations=[ CeleryIntegration( monitor_beat_tasks=True, Sentry Integration

Слайд 29: Web-based UI, HTTP API

- Web-based UI, HTTP API - Простые графики - Можно менять конфиг через UI Flower

Слайд 31: celery exporter

- celery exporter - Использует встроенный компонент мониторинга в реальном времени в Celery для предоставления метрик Prometheus - Отслеживает статус задачи (задача запущена, задача выполнена успешно, задача не выполнена и т. д.) - Отслеживает, какие рабочие процессы запущены, и количество активных задач - Панели инструментов Grafana Prometheus

Слайд 32: Grafana

Grafana

Слайд 33: нагрузками.

Автомасштабирование в Celery позволяет динамически изменять количество рабочих процессов (workers) в зависимости от текущей нагрузки. Это помогает эффективно использовать ресурсы и справляться с пиковыми нагрузками. Как включить автомасштабирование Для включения автомасштабирования используйте параметры --autoscale при запуске worker'а: celery -A your_app worker --autoscale=10,3 Autoscaling

Слайд 34: Ограничения

Ограничения ● Автомасштабирование работает только на уровне отдельного worker'а, не между серверами. ● Слишком частое масштабирование может привести к накладным расходам. Autoscaling

Слайд 35: Horizontal Pod Autoscaler (HPA) в Kubernetes позволяет автоматически

Horizontal Pod Autoscaler (HPA) в Kubernetes позволяет автоматически масштабировать количество подов в зависимости от наблюдаемой загрузки CPU или других метрик. Это можно эффективно использовать для масштабирования Celery workers. Autoscaling in K8s

Слайд 36: apiVersion: autoscaling/v2beta1

apiVersion: autoscaling/v2beta1 kind: HorizontalPodAutoscaler metadata: name: celery-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: celery-worker minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu targetAverageUtilization: 70 HPA

Слайд 37: apiVersion: autoscaling/v2beta1

apiVersion: autoscaling/v2beta1 kind: HorizontalPodAutoscaler metadata: name: celery-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: celery-worker minReplicas: 3 maxReplicas: 10 metrics: - type: Pods pods: metricName: celery_queue_length targetAverageValue: 100 HPA

Слайд 38: Celery once (https://github.com/cameronmaske/celery-once)

- Celery once (https://github.com/cameronmaske/celery-once) - Lock in Redis @celery.task(base=QueueOnce) def slow_task(): sleep(30) return "Done!" Run one task at a time

Слайд 39: Рахмет

Рахмет