- Airflow
- What if we say it’s not like the others?
- . with a very particular set of skills.
- Pixels, pixels everywhere!
- Audio pipeline that goes to eleven
- And subtitle support to match it
- . with real time text recognition
- But wait, there’s more!
- Введение в Apache Airflow
- DAG (Directed Acyclic Graph)
- Operator
- Sensor
- Установка
- Airflow Executors
- SequentialExecutor
- LocalExecutor
- CeleryExecutor
- DaskExecutor
- KubernetesExecutor
- Строим data pipeline на Apache Airflow
- Заключение
- 💌 Присоединяйтесь к рассылке
Airflow
- Pay once, own forever!
- Instant license delivery.
- Use on all your computers.
[$18.99]
What if we say it’s not like the others?
Airflow is different We’re not cutting any corners. This is not yet another FFmpeg wrapper like you might have seen elsewhere. Don’t get us wrong, we love FFmpeg and use many of its parts under the hood, but our custom built video processing pipeline goes way beyond wrapping FFmpeg and calling it a day. We’ve been working on it for years it and it lets us do things that other similar software simply can’t .
It’s a bold claim for sure, so here are just a few examples:
- AirPlay HEVC videos to Apple TV without transcoding
- Streaming to AirPlay 2 enabled TVs
- Adaptive audio volume, spatial headphone downmix
- Lossless audio transcoding when streaming to Apple TV (FLACВ codec, requires tvOSВ 14)
- High quality audio transcoding when streaming to Chromecast (Opus codec)
- OCR (text recognition) for DVD/Bluray/Vobsub subtitles
. with a very particular set of skills.
Airflow is a razor sharp focused software. It supports specific set of devices and it will pull every trick in the book to get the best possible results on these devices. It may not stream video to your smart fridge, but it will gladly push your Chromecast, Apple TV and AirPlay 2 TVs to their limits.
And yes, Airflow can handle pretty much any video format and codec you throw at it.
Pixels, pixels everywhere!
Airflow can stream full 4K HDR HEVC files to Chromecast Ultra, Built-in, Apple TV 4K and AirPlay 2 enabled TVs. It will go out of its way not to touch the original video stream unless absolutely needed for compatibility reasons, ensuring best possible video quality with lowest CPU load (your computer fans will thank you). As far as we can tell, Airflow is still the only desktop software that can natively stream HEVC videos to Apple TV and AirPlay 2 TVs.
And for those pesky videos that are incompatible with your device — Airflow will handle that tranparently, with hardware accelerated transcoding if your computer supports it.
Audio pipeline that goes to eleven
Full multichannel support including DD+ passthrough with Dolby Atmos? Of course.
Advanced adaptive volume booster + limiter for late night watching when you don’t want to disturb your neighbours with loud scenes but still want to hear the dialogue clearly? Check.
Spatial headphone downmix for surround sound videos? Also check.
Detailed A/V sync adjustment where you can compensate for the delay of individual devices like bluetooth headphones? Airflow has it.
And subtitle support to match it
For both embedded and external subtitles. It’s a bit of a secret that pretty much every other streaming software needs to extract embedded subtitle tracks before playing the video. That involves reading the entire file upfront! Crazy, right? Airflow needs no such crude tricks. Embedded or external, for our playback pipeline it’s all the same. All widely used subtitle formats are supported, now including vobsub. Integrated opensubtitles.org search is a cherry on top.
. with real time text recognition
Some subtitles (DVD, Vobsub, Bluray) are stored as pictures. This means that the only way to render them when streaming is to burn them in the video. That’s inconvenient to say the least. It massively increases CPU load (think fan noise and heat) and it’s completely infeasible to do for 4K videos.
Enter our new realtime subtitle text recognition (OCR). During playback Airflow will transparently extract the text from picture subtitles and render it on target device just like it would with regular text subtitles.
But wait, there’s more!
The «small» things, like the scrubbing preview, beautiful polished user interface, multiple playlists support, meticulous last position tracking, or the integrated Speed Test for Chromecast, which is invaluable when dealing with network connection issues. The list goes on.
Did we mention the remote control companion app for Android and iPhone? No? Well, it’s pretty cool. It lets you control all Airflow features from the comfort of your couch. And it’s completely free!
Источник
Введение в Apache Airflow
Также по теме Airflow:
Apache Airflow — это продвинутый workflow менеджер и незаменимый инструмент в арсенале современного дата инженера. Если смотреть открытые вакансии на позицию data engineer, то нередко встретишь опыт работы с Airflow как одно из требований к позиции.
Я разработал практический курс по Apache Airflow 2.0, он доступен на платформе StartDataJourney, создана она также мною. Сейчас есть возможность приобрести его с 15% скидкой по промокоду EARLYBIRD, действует до конца апреля 2021 года. Ввести промокод можно на этапе оформления заказа. Приятного обучения — Apache Airflow 2.0: практический курс.
Airflow был разработан в 2014 году в компании Airbnb, автор Maxime Beauchemin. Позже инструмент был передан под опеку в организацию Apache, а в январе 2019 получил статус Top-Level проекта. В этой статье я расскажу про установку, настройку и запуск первого дата пайплайна средствами Apache Airflow. К слову, в 2017 году я уже писал про не менее классный и простой инструмент Luigi от компании Spotify. По своей сути эти два инструмента похожи — оба предназначены для запуска цепочек задач (дата пайплайнов), но есть у них и ряд различий о которых я говорил во время своего выступления на PyCON Russia 2019:
В этой статье я постараюсь рассказать о необходимом минимуме для работы с Airflow. Для начала давайте рассмотрим основные сущности инструмента.
DAG (Directed Acyclic Graph)
DAG — это ориентированный ациклический граф, т.е. граф у которого отсутствуют циклы, но могут быть параллельные пути, выходящие из одного и того же узла. Простыми словами DAG это сущность, объединяющая ваши задачи в единый data pipeline (или цепочку задач), где явно видны зависимости между узлами.
На картинке можно видеть классический DAG, где Task E является конечным в цепочке и зависит от всех задача слева от него.
Operator
Если вы знакомы с инструментом Luigi, то Operator в Airflow это аналог Task в Luigi. Оператор это звено в цепочке задач. Используя оператор разработчик описывает какую задачу необходимо выполнить. В Airflow есть ряд готовых операторов, например:
- PythonOperator — оператор для исполнения python кода
- BashOperator — оператор для запуска bash скриптов/команд
- PostgresOperator — оператор для вызова SQL запросов в PostgreSQL БД
- RedshiftToS3Transfer — оператор для запуска UNLOAD команды из Redshift в S3
- EmailOperator — оператор для отправки электронных писем
Полный список стандартных операторов можно найти в документации Apache Airflow.
DAG является объединяющей сущностью для набора операторов, т.е. если вернуться к картинке выше, то Task A, Task B и т.д. это отдельные операторы.
Важно! Операторы не могут принимать возвращаемые значения от выполнения предыдущих операторов в цепочке (как, например, цепочка из вызовов функций), т.к. могут исполняться в разном адресном пространстве и даже на разных физических машинах.
Sensor
Сенсор это разновидность Operator, его удобно использовать при реализации событийно ориентированных пайплайнов. Из стандартного набора есть, например:
- PythonSensor — ждём, когда функция вернёт True
- S3Sensor — проверяет наличие объекта по ключу в S3-бакете
- RedisPubSubSensor — проверяет наличие сообщения в pub-sub очереди
- RedisKeySensor — проверяет существует ли переданный ключ в Redis хранилище
Это лишь малая часть доступных для использования сенсоров. Чтобы создать свой сенсор, достаточно унаследоваться от BaseSensorOperator и переопределить метод poke .
Хуки это внешние интерфейсы для работы с различными сервисами: базы данных, внешние API ресурсы, распределенные хранилища типа S3, redis, memcached и т.д. Хуки являются строительными блоками операторов и берут на себя всю логику по взаимодействию с хранилищем конфигов и доступов (о нём ниже). Используя хуки можно забыть про головную боль с хранением секретной информации в коде (пароли к доступам, например).
Установка
Apache Airflow состоит из нескольких частей:
- Веб-приложение с панелью управления, написано на Flask
- Планировщик (Scheduler), в production среде чаще всего используется Celery
- Воркер, выполняющий работу. В production среде также чаще всего встречается конфигурация с Celery.
В качестве базы данных рекомендуется использовать PostgreSQL или MySQL. В этом посте речь пойдёт про установку и настройку Apache Airflow руками, я не буду использовать готовые образы Docker, чтобы наглядно показать как всё запускается изнутри.
Погнали! Создаём новое виртуальное окружение Python, и ставим в него Apache Airflow:
У Airflow много зависимостей в отличие от Luigi, поэтому на экране будет много текста. Вот, например, результат вывода pip freeze :
После установки пакета apache-airflow, в виртуальном окружении будет доступна команда airflow . Запустите её без параметров, чтобы увидеть список доступных команд.
Apache Airflow свои настройки хранит в файле airflow.cfg , который по умолчанию будет создан в домашней директории юзера по пути
/airflow/airflow.cfg . Путь можно изменить, присвоив переменной окружения новое значение:
Далее выполняем инициализацию для базы данных.
Эта команда накатит все миграции, по умолчанию в качестве базы данных Airflow использует SQLite. Для демонстрационных возможностей это нормально, но в реальном бою лучше всё же переключиться на MySQL или PostgreSQL. Давайте делать всё по-взрослому. Я буду использовать Postgres, поэтому если он у вас до сих пор не стоит, то самое время установить PostgreSQL.
Создаю базу данных и пользователя к ней для Airflow:
А теперь открываем airflow.cfg и правим значение параметра sql_alchemy_conn на postgresql+psycopg2://airflow:airflow@localhost/airflow_metadata и load_examples = False . Последний параметр отвечает за загрузку примеров с бесполезными DAGами, они нам не нужны.
В качестве python-драйвера для PostgreSQL я использую psycopg2, поэтому её необходимо поставить в окружение:
Инициализируем новую базу данных:
Airflow Executors
Хочу немножко отвлечься от запуска Airflow и рассказать про очень важную концепцию — Executors. Как понятно из названия, Executors отвечают за исполнение задач. В Airflow есть несколько видов исполнителей:
- SequentialExecutor
- LocalExecutor
- CeleryExecutor
- DaskExecutor
- KubernetesExecutor
В боевой среде чаще всего встречается CeleryExecutor, который, как можно догадаться, использует Celery. Но обо всём по порядку.
SequentialExecutor
Этот исполнитель установлен в качестве значения по умолчанию в airflow.cfg у параметра executor и представляет из себя простой вид воркера, который не умеет запускать параллельные задачи. Как можно догадаться, в конкретный момент времени выполняться может только одна единственная задача. Этот вид исполнителя используют в ознакомительных целях, для продуктивной среды он категорически не подходит.
LocalExecutor
Этот вид исполнителя даёт максимальные ощущения продуктивной среды в тестовом окружении (или окружении разработки). Он умеет выполнять задачи параллельно (например, исполнять несколько DAGов одновременно) путём порождения дочерних процессов, но всё же не совсем предназначен для продакшена ввиду ряда проблем:
- Ограничение при масштабировании (возможно эта проблема не будет актуальна для вас), исполнитель этого типа ограничен ресурсами машины на котором он запущен
- Отсутствие отказоустойчивости. Если машина с этим типом воркера падает, то задачи перестают исполнять до момента её возвращения в строй.
При небольшом количестве задач всё же можно использовать LocalExecutor, т.к. это проще, быстрее и не требует настройки дополнительных сервисов.
CeleryExecutor
Наиболее популярный вид исполнения задач. Под капотом использует всю магию таск-менеджера Celery, а соответственно тянет за собой все зависимости этого инструмента. Чтобы использовать CeleryExecutor необходимо дополнительно настроить брокер сообщений. Чаще всего используют либо Redis либо RabbitMQ. Преимущества этого вида в том, что его легко масштабировать — поднял новую машину с воркером, и он готов выполнять требуемую работу, а также в отказоустойчивости. В случае падения одного из воркеров его работа будет передана любому из живых.
DaskExecutor
Очень похож на CeleryExecutor, но только вместо Celery использует инструмент Dask, в частности dask-distributed.
KubernetesExecutor
Относительно новый вид исполнения задач на кластере Kubernetes. Задачи исполняются как новые pod инстансы. В связи с развитием контейнеров и их повсеместным использованием, данный вид исполнения может быть интересен широкому кругу людей. Но у него есть минус — если у вас нет Kubernetes кластера, то настроить его будет непростым упражнением.
Так к чему я начал разговор про Executors. В стандартной конфигурации Airflow предлагает нам использовать SequentialExecutor, но мы ведь стараемся подражать продуктивной среде, поэтому будем использовать LocalExecutor. В airflow.cfg поменяйте значение параметра executor на LocalExecutor.
Запускаем веб-приложение на 8080 порту:
Если всё настроено правильно, то переход по адресу localhost:8080 должен показать страницу как на скриншоте:
Поздравляю! Мы настроили и запустили Apache Airflow. На странице можно заметить сообщение:
The scheduler does not appear to be running. The DAGs list may not update, and new tasks will not be scheduled.
Сообщение указывает на то, что не запущен планировщик Airflow (scheduler). Он отвечает за DAG discovery (обнаружение новых DAG), а также за планирование их запуска. Запустить планировщик можно командой:
Для того, чтобы не переключаться между разными окнами терминалов, я люблю использовать менеджер терминалов tmux.
Итак, база настроена, веб-приложение и планировщик запущены. Нам остаётся только написать наш первый data pipeline и почувствовать себя в шкуре крутого дата инженера.
Строим data pipeline на Apache Airflow
В файле настроек airflow.cfg есть параметр dags_folder , он указывает на путь, где лежат файлы с DAGами. Это путь $AIRFLOW_HOME/dags . Именно туда мы положим наш код с задачами.
Какие задачи будет выполнять пайплайн? Я решил для демонстрации взять пример с датасетом Titanic о котором писал в статье про pandas. Суть в том, что сначала необходимо будет скачать датасет, следующим шагом будет этап создания сводной таблицы: сгруппируем пассажиров по полу и пассажирскому классу, чтобы узнать количество людей в каждом классе. Результатом будет новый csv-файл со сводной таблицей.
Вот так выглядит DAG:
А вот код всего DAGа, включая 2 оператора:
В DAGе у нас используются 2 PythonOperator. Обратите внимание, что они принимают функцию, которую необходимы выполнить. В первом случае это download_titanic_dataset , которая скачивает датасет из сети, во втором случае это pivot_dataset , которая сохраняет сводную таблицу из исходного файла (сохраненного предыдущей функцией).
Стоит обратить внимание на объект DAG и то как описаны зависимости между двумя операторами. В Airflow допустимы конструкции >> и , а также методы .set_upstream и .set_downstream . Т.е. код:
Можно заменить на:
Это означает, что выполнение оператора pivot_titanic_dataset зависит от выполнения оператора create_titanic_dataset.
На уровне объекта DAG задаются настройки, например:
- Время начала выполнения пайплайна ( start_date )
- Периодичность запуска ( schedule_interval )
- Информация о владельце DAG ( owner )
- Количество повторений в случае неудач ( retries )
- Пауза между повторами ( retry_delay )
Параметров в разы больше. Более подробно как всегда можно прочитать в доках.
Итак, сохраняем в файл код и помещаем его по пути $AIRFLOW_HOME/dags . Для того, чтобы DAGи отображались в интерфейсе Airflow необходимо запустить планировщик:
Если всё сделано верно, то в списке появится наш DAG:
Его можно активировать, переключив с Off на On и попробовать запустить (Trigger Dag).
Заключение
Эта статья лишь небольшое введение в Apache Airflow. Я не раскрыл и 20% того, что умеет инструмент, но и такой задачи себе не ставил. Лучшим способом изучить Apache Airflow является работа с ним. Пробуйте, экспериментируйте, чтобы понять подходит он под ваши задачи или нет.
💌 Присоединяйтесь к рассылке
Понравился контент? Пожалуйста, подпишись на рассылку.
Источник