#41. Celery
Celery
План
- Що треба знати до Celery
- Celery - розподілена черга завдань
- Як працює Celery
- Брокери
- Встановлення Celery і брокера Redis
- Celery і Python
- Celery і Django
- Література
Що треба знати до Celery
Процес (Process) - екземпляр програми під час виконання, незалежний об’єкт, якому виділено системні ресурси (наприклад, процесорний час і пам’ять). Кожен процес виконується в окремому адресному просторі: один процес не може отримати доступ до змінних і структур даних іншого. Якщо процес хоче отримати доступ до чужих ресурсів, необхідно використовувати міжпроцесну взаємодію. Це можуть бути конвеєри, файли, канали зв’язку між комп’ютерами та багато іншого.
Потік (Thread) використовує той самий простір стека, що і процес, а безліч потоків спільно використовують дані своїх станів. Як правило, кожен потік може працювати (читати і писати) з однією і тією ж областю пам’яті, на відміну від процесів, які не можуть просто так отримати доступ до пам’яті іншого процесу. У кожного потоку є власні регістри і власний стек, але інші потоки можуть їх використовувати.
Потік - певний спосіб виконання процесу. Коли один потік змінює ресурс процесу, цю зміну одразу ж стає видно іншим потокам цього процесу.
Синхронною (synchronous) називається така взаємодія між компонентами, за якої клієнт, надіславши запит, блокується і може продовжувати роботу тільки після отримання відповіді від сервера. З цієї причини такий вид взаємодії називають іноді блокуючим (blocking).
У рамках асинхронної (asynchronous) або неблокуючої (non blocking) взаємодії клієнт після надсилання запиту серверу може продовжувати роботу, навіть якщо відповідь на запит ще не надійшла. Асинхронна взаємодія дає змогу отримати вищу продуктивність системи завдяки використанню часу між надсиланням запиту й отриманням відповіді на нього для виконання інших завдань. Інша важлива перевага асинхронної взаємодії - менша залежність клієнта від сервера, можливість продовжувати роботу, навіть якщо машина, на якій розташований сервер, стала недоступною. Ця властивість використовується для організації надійного зв’язку між компонентами, навіть якщо і клієнт, і сервер не весь час перебувають у робочому стані.
NoSQL (від англ. not only SQL - не тільки SQL) - термін, що позначає низку підходів, спрямованих на реалізацію систем управління базами даних, які мають суттєві відмінності від моделей, що використовуються в традиційних реляційних СКБД із доступом до даних засобами мови SQL. Застосовується до баз даних, у яких робиться спроба розв’язати проблеми масштабованості та доступності за рахунок атомарності (англ. atomicity) і узгодженості даних (англ. consistency). Детальніше тут
Celery - розподілена черга завдань
Celery це ніщо інше як розподілена черга завдань, реалізована мовою Python.
Celery - це проста, гнучка та надійна розподілена система для опрацювання величезної кількості повідомлень, що включає в себе інструменти, необхідні для підтримки такої системи.
Це черга завдань з упором на обробку в реальному часі, а також з підтримкою планування завдань.
Celery має відкритий вихідний код і перебуває під ліцензією BSD.
Отже, що ж вміє Celery:
- Виконувати асинхронно завдання
- Виконувати періодичні завдання (розумна заміна cron)
- Виконувати відкладені завдання
- Розподілене виконання (може бути запущений на N серверах)
- У межах одного worker’а можливе конкурентне виконання декількох завдань (одночасно)
- Виконувати завдання повторно, якщо виліз exception
- Обмежувати кількість завдань за одиницю часу (rate limit, для завдання або глобально)
- Нескладно моніторити виконання завдань
- Виконувати підзавдання
- Надсилати звіти про exception’и
- Перевіряти чи виконалося завдання
Як працює Celery
Brocker (Брокер)
Брокер повідомлень (він же диспетчер черги) - це посередник(транспорт), який приймає і віддає повідомлення (завдання) між окремими модулями/додатками всередині деякої складної системи, де модулі/додатки повинні спілкуватися між собою - тобто пересилати дані один одному.
Worker (Воркер)
Воркер це окремо запущений процес для виконання певних завдань, Celery запускається на одному або декількох воркерах, щоб виконувати завдання паралельно на кожному воркері.
Back-end (Бекенд)
Бекенд у випадку з Celery виступає як сховище результатів виконання завдань.
Брокери
(Advanced Message Queuing Protocol) - відкритий протокол для передачі повідомлень між компонентами системи. Основна ідея полягає в тому, що окремі підсистеми (або незалежні додатки) можуть обмінюватися довільним чином повідомленнями через AMQP-брокер, який здійснює маршрутизацію, можливо гарантує доставку, розподіл потоків даних, підписку на потрібні типи повідомлень.
-
Producer (постачальник) - програма, що надсилає повідомлення.
-
Queue (черга) - черга повідомлень (завдань). Вона існує всередині брокера. Будь-яка кількість постачальників може надсилати повідомлення в одну чергу, також будь-яка кількість передплатників може отримувати повідомлення з однієї черги. У схемах чергу буде позначено стеком і підписано ім’ям:
-
Consumer (споживач) - програма, що приймає повідомлення. Зазвичай передплатник перебуває в стані очікування повідомлень.
Постачальник, передплатник і брокер не зобов’язані перебувати на одній фізичній машині.
RabbitMQ
RabbitMQ - це брокер повідомлень із відкритим вихідним кодом. Він маршрутизує повідомлення за всіма базовими принципами протоколу AMQP, описаними в специфікації. Відправник передає повідомлення брокеру, а той доставляє його одержувачу. RabbitMQ реалізує і доповнює протокол AM
Redis
Redis (розшифровується як Remote Dictionary Server) - це швидке сховище даних типу “ключ-значення” в пам’яті з відкритим вихідним кодом для використання в якості бази даних, кешу, брокера повідомлень або черги.
Redis це NoSQL база даних! Для Celery вкрай рекомендую використовувати саме його.
Встановлення Celery і брокера Redis
pip install celery
Celery 4.0+ офіційно вже не підтримується для Windows
Варіанти запуску:
- Використовувати Linux | MacOs
- Docker
- WSL 2 (для Windows 10)
- Змінна оточення або прямо в коді
Redis
Встановлення самого сервісу
```sudo apt install redis-server``
artem@HP:~$ redis-server
17624:C 01 Mar 2021 02:50:02.381 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
17624:C 01 Mar 2021 02:50:02.381 # Redis version=5.0.7, bits=64, commit=00000000, modified=0, pid=17624, just started
17624:C 01 Mar 2021 02:50:02.381 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
17624:M 01 Mar 2021 02:50:02.382 * Increased maximum number of open files to 10032 (it was originally set to 1024).
17624:M 01 Mar 2021 02:50:02.383 # Could not create server TCP listening socket *:6379: bind: Address already in use
artem@HP:~$ redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> set foo bar
OK
127.0.0.1:6379> get foo
"bar"
127.0.0.1:6379>
Для роботи необхідно так само необхідна і бібліотека
pip install redis
Celery і Python
Створюємо файл tasks.py і “додаток”, у якому необхідно вказати назву (знадобиться для вказівки брокеру) і брокера.
from celery import Celery
broker_url = 'redis://localhost'
app = Celery('tasks', broker=broker_url)
@app.task
def add(x, y):
return x + y
Ми описали завдання, і позначили його через декоратор застосунку celery.
Для того, щоб ми могли викликати завдання, необхідно запустити celery як окремий додаток:
celery -A tasks worker --loglevel=INFO
консольна команда буде доступна після встановлення celery
-A app_name - вказати ім’я програми,
worker - запустити один воркер,
loglevel - рівень подробиць
Запуск і обробка результату
Для запуску завдань є багато різних способів, розглянемо базовий.
from tasks import add
add.delay(4, 4)
Для запуску завдання негайно використовується метод delay (скорочений метод apply_async()).
Запуск завдань повертає не результат, а AsyncResult, для того, щоб отримувати значення, потрібно під час створення застосунку вказати параметр backend, який відповідає за місце зберігання результатів, таким параметром може бути Redis:
broker_url = 'redis://localhost'
app = Celery('tasks', broker=broker_url, backend=broker_url)
Результат матиме досить велику кількість методів і атрибутів.
Основні два методи це ready і get.
ready - відповідає за те, завершилося завдання чи ще в процесі.
get - чекає виконання завдання і повертає результат. Рекомендується використовувати після ready, щоб не чекати виконання даремно.
>>> result = add.delay(4, 4)
>>> result.ready()
False
>>> result.get()
8
Іноді опис парметрів завдання і його виклик можуть бути в абсолютно різних місцях, для цього існує механізм підпису:
s1 = add.s(2, 2)
res = s1.delay()
res.get()
у цьому прикладі s1 це підпис завдання, тобто завдання заготовлене для виконання, його можна серіалізувати і відправити мережею, наприклад, а виконати в уже зовсім інших місцях.
Завдання можна групувати:
from celery import group
from proj.tasks import add
group(add.s(i, i) for i in range(10))().get()
Види запуску
Є три варіанти запуску тасків:
apply_async(args[, kwargs[, ...]])
Надсилання повідомлення із зазначенням додаткових параметрів
delay(*args, **kwargs)
Надсилання повідомлення без будь-яких параметрів самого повідомлення
calling (__call__)
Просто виклик, декоратор не заважає нам просто викликати функцію без celery :)
Основні параметри apply_async()
сountdown- відправити через
add.apply_async((2,2), countdown=10)
# відправити через 10 секунд
eta- відправити в конкретний час
add.apply_async((2,2), eta=now() + timedelta(seconds=10))
# відправити через 10 секунд
expires- час після якого перестати виконувати завдання, можна вказати як цифру так і час
add.apply_async((4,5), countdown=60, expires=120)
add.apply_async((4,5), expires=now() + timedelta(days=2))
link- виконати інше завдання по завершенню поточного, ґрунтуючись на результатах поточного
add.apply_async((2, 2), link=add.s(16))
# ( 2 + 2 ) + 16
Сelery beat - Переодичні завдання
Celery може виконувати будь-які завдання просто за графіком
Для цього потрібно налаштувати додаток:
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'
add-every-30-seconds Ключ словника, це тільки назва, можна вказати що завгодно.
task це виконуваний таск
args його аргументи
schedule: частота виконання в секундах
Виконання по крону
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
Cron - система завдання розкладу, можна зробити практично який завгодно.
https://crontab.guru/
Для розкладу потрібно запускати окремий воркер (beat) для розкладу.
Celery і Django
Для використання celery в django рекомендується створити ще один файл celery.py на одному рівні з settings.py
- proj/
- manage.py
- proj/
- celery.py
- __init__.py
- settings.py
- urls.py
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Параметр namespace під час вказівки конфіга відповідатиме за те, з якого слова починатимуться налаштування в settings.py.
Наприклад:
# Celery Configuration Options
CELERY_BROKER_URL = 'redis://localhost'
CELERY_RESULT_BACKEND = 'redis://localhost'
CELERY_TIMEZONE = 'America/New_York'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
app.autodiscover_tasks() - цей рядок відповідатиме за автоматичний пошук таких у всіх додатках.
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
На тому ж рівні де і settings.py створити\використати файл __init__.py залежно від версії python
# __init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
Усі завдання необхідно покривати не стандартним декоратором task, а декоратором shared_task, тоді django зможе автоматично знайти всі таски в додатку.
tasks.py
from celery import shared_task
from demoapp.models import Widget
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()
Так само для Django існує багато різних розширень, наприклад:
django-celery-results - що б зберігати резльутати в бд або кеші джанго.
django-celery-beat - налаштування для періодичних завдань, відразу вшите в адмінку джаго.
Класичним прикладом використання Celery є надсилання електронної пошти. Я використовую цей приклад, щоб показати вам основи використання Celery. Для початку створимо view і завдання:
from django.conf import settings
from django.core.mail import send_mail
from django.template import Engine, Context
from myproject.celery import app
def render_template(template, context):
engine = Engine.get_default()
tmpl = engine.get_template(template)
return tmpl.render(Context(context))
@celery_app.task
def send_mail_task(recipients, subject, template, context):
send_mail(
subject=subject,
message=render_template(f'{template}.txt', context),
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=recipients,
fail_silently=False,
html_message=render_template(f'{template}.html', context)
)
Використовуючи Celery, ми скорочуємо час відповіді клієнту, оскільки відокремлюємо процес надсилання від основного коду, що відповідає за повернення відповіді.
Найпростіший спосіб виконати це завдання - викликати метод delay, що надається декоратором app.task.
send_mail_task.delay(('noreply@example.com', ), 'Celery cookbook test', 'test', {})
Celery так само дає змогу налаштувати повторні спроби після збою.
@celery_app.task(bind=True, default_retry_delay=10 * 60)
def send_mail_task(self, recipients, subject, template, context):
message = render_template(f'{template}.txt', context)
html_message = render_template(f'{template}.html', context)
try:
send_mail(
subject=subject,
message=message,
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=recipients,
fail_silently=False,
html_message=html_message
)
except smtplib.SMTPException as ex:
self.retry(exc=ex)
Тепер завдання буде перезапущено через десять хвилин, у разі якщо відправлення не буде вдалим. Крім того, ви зможете встановити кількість повторних спроб.
Нендеринг шаблону винесено за межі виклику send_mail. Це тому, що ми укладаємо виклик send_mail в try / except, і краще мати якомога менше коду в try / except.
Flower - інструмент для моніторингу Celery
pip install flower
запуск
flower -A proj --port=5555
і з Celery
celery flower -A proj --address=127.0.0.1 --port=5555
Практика в якості ДЗ:
Повторити те, що зроблено в відео
Домашнє завдання:
- Зробити кнопку для адміну, яка підтверджуватиме всі повернення через Celery task.
- Створити таск, який відхилятиме всі повернення о 6 годині вечора по Києву.
Література
- Вступ до Celery Python
- Celery: починаємо правильно
- Про RabbitMQ habr
- Redis Quick Start
- First Steps with Celery та Next Steps
- Відео, але краще одразу з другої частини і перемотувати