#41. Celery

Celery

План

  1. Що треба знати до Celery
  2. Celery - розподілена черга завдань
    1. Як працює Celery
    2. Брокери
    3. Встановлення Celery і брокера Redis
  3. Celery і Python
  4. Celery і Django
  5. Література

Що треба знати до Celery

Процес (Process) - екземпляр програми під час виконання, незалежний об’єкт, якому виділено системні ресурси (наприклад, процесорний час і пам’ять). Кожен процес виконується в окремому адресному просторі: один процес не може отримати доступ до змінних і структур даних іншого. Якщо процес хоче отримати доступ до чужих ресурсів, необхідно використовувати міжпроцесну взаємодію. Це можуть бути конвеєри, файли, канали зв’язку між комп’ютерами та багато іншого.

Потік (Thread) використовує той самий простір стека, що і процес, а безліч потоків спільно використовують дані своїх станів. Як правило, кожен потік може працювати (читати і писати) з однією і тією ж областю пам’яті, на відміну від процесів, які не можуть просто так отримати доступ до пам’яті іншого процесу. У кожного потоку є власні регістри і власний стек, але інші потоки можуть їх використовувати.
Потік - певний спосіб виконання процесу. Коли один потік змінює ресурс процесу, цю зміну одразу ж стає видно іншим потокам цього процесу.

Синхронною (synchronous) називається така взаємодія між компонентами, за якої клієнт, надіславши запит, блокується і може продовжувати роботу тільки після отримання відповіді від сервера. З цієї причини такий вид взаємодії називають іноді блокуючим (blocking).

У рамках асинхронної (asynchronous) або неблокуючої (non blocking) взаємодії клієнт після надсилання запиту серверу може продовжувати роботу, навіть якщо відповідь на запит ще не надійшла. Асинхронна взаємодія дає змогу отримати вищу продуктивність системи завдяки використанню часу між надсиланням запиту й отриманням відповіді на нього для виконання інших завдань. Інша важлива перевага асинхронної взаємодії - менша залежність клієнта від сервера, можливість продовжувати роботу, навіть якщо машина, на якій розташований сервер, стала недоступною. Ця властивість використовується для організації надійного зв’язку між компонентами, навіть якщо і клієнт, і сервер не весь час перебувають у робочому стані.

NoSQL (від англ. not only SQL - не тільки SQL) - термін, що позначає низку підходів, спрямованих на реалізацію систем управління базами даних, які мають суттєві відмінності від моделей, що використовуються в традиційних реляційних СКБД із доступом до даних засобами мови SQL. Застосовується до баз даних, у яких робиться спроба розв’язати проблеми масштабованості та доступності за рахунок атомарності (англ. atomicity) і узгодженості даних (англ. consistency). Детальніше тут


Celery - розподілена черга завдань


Celery це ніщо інше як розподілена черга завдань, реалізована мовою Python.

Celery - це проста, гнучка та надійна розподілена система для опрацювання величезної кількості повідомлень, що включає в себе інструменти, необхідні для підтримки такої системи.

Це черга завдань з упором на обробку в реальному часі, а також з підтримкою планування завдань.

Celery має відкритий вихідний код і перебуває під ліцензією BSD.

Отже, що ж вміє Celery:

  • Виконувати асинхронно завдання
  • Виконувати періодичні завдання (розумна заміна cron)
  • Виконувати відкладені завдання
  • Розподілене виконання (може бути запущений на N серверах)
  • У межах одного worker’а можливе конкурентне виконання декількох завдань (одночасно)
  • Виконувати завдання повторно, якщо виліз exception
  • Обмежувати кількість завдань за одиницю часу (rate limit, для завдання або глобально)
  • Нескладно моніторити виконання завдань
  • Виконувати підзавдання
  • Надсилати звіти про exception’и
  • Перевіряти чи виконалося завдання

Як працює Celery

Brocker (Брокер)

Брокер повідомлень (він же диспетчер черги) - це посередник(транспорт), який приймає і віддає повідомлення (завдання) між окремими модулями/додатками всередині деякої складної системи, де модулі/додатки повинні спілкуватися між собою - тобто пересилати дані один одному.

Worker (Воркер)

Воркер це окремо запущений процес для виконання певних завдань, Celery запускається на одному або декількох воркерах, щоб виконувати завдання паралельно на кожному воркері.

Back-end (Бекенд)

Бекенд у випадку з Celery виступає як сховище результатів виконання завдань.

Брокери

AMQP

(Advanced Message Queuing Protocol) - відкритий протокол для передачі повідомлень між компонентами системи. Основна ідея полягає в тому, що окремі підсистеми (або незалежні додатки) можуть обмінюватися довільним чином повідомленнями через AMQP-брокер, який здійснює маршрутизацію, можливо гарантує доставку, розподіл потоків даних, підписку на потрібні типи повідомлень.

  • Producer (постачальник) - програма, що надсилає повідомлення.

  • Queue (черга) - черга повідомлень (завдань). Вона існує всередині брокера. Будь-яка кількість постачальників може надсилати повідомлення в одну чергу, також будь-яка кількість передплатників може отримувати повідомлення з однієї черги. У схемах чергу буде позначено стеком і підписано ім’ям:

  • Consumer (споживач) - програма, що приймає повідомлення. Зазвичай передплатник перебуває в стані очікування повідомлень.

Постачальник, передплатник і брокер не зобов’язані перебувати на одній фізичній машині.

RabbitMQ
RabbitMQ - це брокер повідомлень із відкритим вихідним кодом. Він маршрутизує повідомлення за всіма базовими принципами протоколу AMQP, описаними в специфікації. Відправник передає повідомлення брокеру, а той доставляє його одержувачу. RabbitMQ реалізує і доповнює протокол AM

Redis
Redis (розшифровується як Remote Dictionary Server) - це швидке сховище даних типу “ключ-значення” в пам’яті з відкритим вихідним кодом для використання в якості бази даних, кешу, брокера повідомлень або черги.

Redis це NoSQL база даних! Для Celery вкрай рекомендую використовувати саме його.

Встановлення Celery і брокера Redis

pip install celery

Celery 4.0+ офіційно вже не підтримується для Windows
Варіанти запуску:

  1. Використовувати Linux | MacOs
  2. Docker
  3. WSL 2 (для Windows 10)
  4. Змінна оточення або прямо в коді

Redis
Встановлення самого сервісу

```sudo apt install redis-server``

artem@HP:~$ redis-server
17624:C 01 Mar 2021 02:50:02.381 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
17624:C 01 Mar 2021 02:50:02.381 # Redis version=5.0.7, bits=64, commit=00000000, modified=0, pid=17624, just started
17624:C 01 Mar 2021 02:50:02.381 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
17624:M 01 Mar 2021 02:50:02.382 * Increased maximum number of open files to 10032 (it was originally set to 1024).
17624:M 01 Mar 2021 02:50:02.383 # Could not create server TCP listening socket *:6379: bind: Address already in use

artem@HP:~$ redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set foo bar
OK
127.0.0.1:6379> get foo
"bar"
127.0.0.1:6379> 

Для роботи необхідно так само необхідна і бібліотека

pip install redis

Celery і Python

Створюємо файл tasks.py і “додаток”, у якому необхідно вказати назву (знадобиться для вказівки брокеру) і брокера.

from celery import Celery  
  
broker_url = 'redis://localhost'  
app = Celery('tasks', broker=broker_url)  
  
  
@app.task  
def add(x, y):  
    return x + y

Ми описали завдання, і позначили його через декоратор застосунку celery.

Для того, щоб ми могли викликати завдання, необхідно запустити celery як окремий додаток:

celery -A tasks worker --loglevel=INFO

консольна команда буде доступна після встановлення celery

-A app_name - вказати ім’я програми,
worker - запустити один воркер,
loglevel - рівень подробиць

Запуск і обробка результату

Для запуску завдань є багато різних способів, розглянемо базовий.

from tasks import add
add.delay(4, 4)

Для запуску завдання негайно використовується метод delay (скорочений метод apply_async()).

Запуск завдань повертає не результат, а AsyncResult, для того, щоб отримувати значення, потрібно під час створення застосунку вказати параметр backend, який відповідає за місце зберігання результатів, таким параметром може бути Redis:

broker_url = 'redis://localhost'
app = Celery('tasks', broker=broker_url, backend=broker_url)

Результат матиме досить велику кількість методів і атрибутів.

Основні два методи це ready і get.

ready - відповідає за те, завершилося завдання чи ще в процесі.

get - чекає виконання завдання і повертає результат. Рекомендується використовувати після ready, щоб не чекати виконання даремно.

>>> result = add.delay(4, 4)
>>> result.ready()
False
>>> result.get()
8

Іноді опис парметрів завдання і його виклик можуть бути в абсолютно різних місцях, для цього існує механізм підпису:

s1 = add.s(2, 2)
res = s1.delay()
res.get()

у цьому прикладі s1 це підпис завдання, тобто завдання заготовлене для виконання, його можна серіалізувати і відправити мережею, наприклад, а виконати в уже зовсім інших місцях.

Завдання можна групувати:

from celery import group
from proj.tasks import add

group(add.s(i, i) for i in range(10))().get()

Види запуску

Є три варіанти запуску тасків:

apply_async(args[, kwargs[, ...]])

Надсилання повідомлення із зазначенням додаткових параметрів

delay(*args, **kwargs)

Надсилання повідомлення без будь-яких параметрів самого повідомлення

calling (__call__)

Просто виклик, декоратор не заважає нам просто викликати функцію без celery :)

Основні параметри apply_async()

  1. сountdown - відправити через
add.apply_async((2,2), countdown=10)
# відправити через 10 секунд
  1. eta - відправити в конкретний час
add.apply_async((2,2), eta=now() + timedelta(seconds=10))
# відправити через 10 секунд
  1. expires - час після якого перестати виконувати завдання, можна вказати як цифру так і час
add.apply_async((4,5), countdown=60, expires=120)
add.apply_async((4,5), expires=now() + timedelta(days=2))
  1. link - виконати інше завдання по завершенню поточного, ґрунтуючись на результатах поточного
add.apply_async((2, 2), link=add.s(16))
# ( 2 + 2 ) + 16

Сelery beat - Переодичні завдання

Celery може виконувати будь-які завдання просто за графіком

Для цього потрібно налаштувати додаток:

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

add-every-30-seconds Ключ словника, це тільки назва, можна вказати що завгодно.

task це виконуваний таск

args його аргументи

schedule: частота виконання в секундах

Виконання по крону

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Cron - система завдання розкладу, можна зробити практично який завгодно.

https://crontab.guru/
Для розкладу потрібно запускати окремий воркер (beat) для розкладу.

Celery і Django

Для використання celery в django рекомендується створити ще один файл celery.py на одному рівні з settings.py

- proj/
  - manage.py
  - proj/
    - celery.py
    - __init__.py
    - settings.py
    - urls.py
import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Параметр namespace під час вказівки конфіга відповідатиме за те, з якого слова починатимуться налаштування в settings.py.

Наприклад:

# Celery Configuration Options
CELERY_BROKER_URL = 'redis://localhost'  
CELERY_RESULT_BACKEND = 'redis://localhost'
CELERY_TIMEZONE = 'America/New_York'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

app.autodiscover_tasks() - цей рядок відповідатиме за автоматичний пошук таких у всіх додатках.

- app1/
    - tasks.py
    - models.py
- app2/
    - tasks.py
    - models.py

На тому ж рівні де і settings.py створити\використати файл __init__.py залежно від версії python

# __init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Усі завдання необхідно покривати не стандартним декоратором task, а декоратором shared_task, тоді django зможе автоматично знайти всі таски в додатку.

tasks.py

from celery import shared_task
from demoapp.models import Widget


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)


@shared_task
def count_widgets():
    return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()

Так само для Django існує багато різних розширень, наприклад:

django-celery-results - що б зберігати резльутати в бд або кеші джанго.

django-celery-beat - налаштування для періодичних завдань, відразу вшите в адмінку джаго.

Класичним прикладом використання Celery є надсилання електронної пошти. Я використовую цей приклад, щоб показати вам основи використання Celery. Для початку створимо view і завдання:

from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context

from myproject.celery import app


def render_template(template, context):
    engine = Engine.get_default()
    tmpl = engine.get_template(template)
    return tmpl.render(Context(context))
    
    
@celery_app.task
def send_mail_task(recipients, subject, template, context):
    send_mail(
        subject=subject,
        message=render_template(f'{template}.txt', context),
        from_email=settings.DEFAULT_FROM_EMAIL,
        recipient_list=recipients,
        fail_silently=False,
        html_message=render_template(f'{template}.html', context)
)

Використовуючи Celery, ми скорочуємо час відповіді клієнту, оскільки відокремлюємо процес надсилання від основного коду, що відповідає за повернення відповіді.

Найпростіший спосіб виконати це завдання - викликати метод delay, що надається декоратором app.task.

send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})

Celery так само дає змогу налаштувати повторні спроби після збою.

@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
    message = render_template(f'{template}.txt', context)
    html_message = render_template(f'{template}.html', context)
    try:
        send_mail(
            subject=subject,
            message=message,
            from_email=settings.DEFAULT_FROM_EMAIL,
            recipient_list=recipients,
            fail_silently=False,
            html_message=html_message
        )
    except smtplib.SMTPException as ex:
        self.retry(exc=ex)

Тепер завдання буде перезапущено через десять хвилин, у разі якщо відправлення не буде вдалим. Крім того, ви зможете встановити кількість повторних спроб.
Нендеринг шаблону винесено за межі виклику send_mail. Це тому, що ми укладаємо виклик send_mail в try / except, і краще мати якомога менше коду в try / except.

Flower - інструмент для моніторингу Celery

pip install flower

запуск

flower -A proj --port=5555

і з Celery

celery flower -A proj --address=127.0.0.1 --port=5555

Практика в якості ДЗ:

Повторити те, що зроблено в відео

Домашнє завдання:

  1. Зробити кнопку для адміну, яка підтверджуватиме всі повернення через Celery task.
  2. Створити таск, який відхилятиме всі повернення о 6 годині вечора по Києву.

Література

  1. Вступ до Celery Python
  2. Celery: починаємо правильно
  3. Про RabbitMQ habr
  4. Redis Quick Start
  5. First Steps with Celery та Next Steps
  6. Відео, але краще одразу з другої частини і перемотувати