Pokemons Flow: разработка конвейера данных с apache airflow для извлечения покемонов через API

Apache Airflow — это разработанная сообществом платформа для создания, планирования и мониторинга рабочих процессов, причем все это делается программно. С его помощью трубопроводы Airflow определяются в Python, что позволяет динамически создавать трубопроводы, не покидая уже знакомого нам синтаксиса 🐍.

Знание того, как разрабатывать конвейеры данных с помощью Apache Airflow, является более чем необходимым требованием, если вы нацелены на карьеру в области инженерии данных. Итак, если вы хотите узнать больше об этом мощном инструменте, продолжайте читать 😉.

В этой статье я расскажу вам, как разработать конвейер данных для извлечения знаменитых маленьких карманных монстров, покемонов, из API PokeAPI. После извлечения будут применены некоторые очень простые преобразования данных, чтобы, наконец, их можно было сохранить локально, имитируя загрузку данных в хранилище данных.

Полный код, разработанный в этой статье, можно посмотреть на моем GitHub 🤖.

Разработка на основе контейнеризации с помощью Docker Compose

Существует множество способов установки Airflow на вашу машину. Все они могут быть проверены в официальной документации.

Однако для этой статьи мы разработаем наш конвейер на основе контейнеризации с помощью Docker Compose.
Если у вас не установлен Docker, ознакомьтесь с руководством по установке в документации.

YML-файл, который будет загружать наш кластер, можно загрузить здесь, прямо с официальной страницы Airflow. Никаких изменений в файле для этого проекта не требуется 💚.

Загрузка этого файла необходима для выполнения следующих шагов.

Инициализация кластера

Сначала создадим каталог pokemonsflow и добавим в него файл docker-compose.yml.

После этого откройте терминал в директории и введите следующие команды для инициализации кластера Airflow:

$ docker-compose up airflow-init
$ docker-compose up -d
$ docker-compose ps
Войдите в полноэкранный режим Выход из полноэкранного режима

После этого ваш кластер Airflow становится активным, и среда готова к разработке 🚀.

Обратите внимание, что в каталоге pokemonsflow теперь есть три новых подкаталога:

dags/
logs/
plugins/
docker-compose.yml
Войдите в полноэкранный режим Выход из полноэкранного режима

В завершение создайте подкаталог data в каталоге dags. В этом каталоге будут сохраняться данные, извлеченные конвейером:

$ mkdir dags/data

Разработка конвейера данных

Теперь, когда наша среда разработки через Docker уже инициализирована, мы можем начать делать первые шаги в структурировании нашей DAG 🎆.

Прежде всего, создайте файл pokemonsflow_dag.py в подкаталоге dags. Обратите внимание, что суффикс _dag в файле необходим для того, чтобы Airflow автоматически распознал наш DAG 😉:

$ cd dags
$ touch pokemonsflow_dag.py
Войдите в полноэкранный режим Выход из полноэкранного режима

После этого добавьте в файл следующий код:

import pandas as pd
import pendulum
import requests
from airflow.decorators import dag, task


@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2022, 1, 1, tz='UTC'),
    catchup=False
)
def pokemonsflow_dag():

    @task
    def extract() -> list:
        pass

    @task
    def transform(pokemons: list) -> list:
        pass

    @task
    def load(pokemons: list):
        pass


dag = pokemonsflow_dag()
Войдите в полноэкранный режим Выход из полноэкранного режима

Таким образом, у нас уже есть начальная структура нашей DAG. Библиотека requests и pandas необходимы для задач извлечения и преобразования данных.

В DAG у нас есть три основные задачи, где:

  • Extract: выполнит извлечение двадцати покемонов из PokeAPI;
  • Transform: выберет только пять полей из извлеченных покемонов и отсортирует их в порядке убывания по полю base_experience;
  • Загрузка: наконец, эта задача сохранит преобразованные данные в подкаталоге /dags/data/, в формате CSV;

Как вы видите, задачи преобразования и загрузки зависят от данных, извлеченных или преобразованных предыдущей задачей. Для передачи данных между задачами Airflow использует встроенный механизм, называемый XComs, что является аббревиатурой от Cross Communication.

До Airflow 2.0 обмен данными между задачами с помощью XComs был несколько… многословным. Однако с появлением Airflow 2.0 мы можем обмениваться данными между задачами, просто передавая их как параметры функции. Просто, да? ☺

Теперь давайте индивидуально разработаем каждую задачу в нашей группе DAG.

Извлечение данных

Извлечение покемонов из API осуществляется с помощью следующих шагов:

  • Выполните вызов GET на конечной точке /api/v2/pokemon с параметром limit=20, чтобы мы могли ограничить результаты. Результатом будет json с полем result, аналогично этому:

[
  {
    'name': 'bulbasaur',
    'url': 'https://pokeapi.co/api/v2/pokemon/1/'
  },
  {
    'name': 'ivysaur',
    'url': 'https://pokeapi.co/api/v2/pokemon/2/'
  },
  {
    'name': 'venusaur',
    'url': 'https://pokeapi.co/api/v2/pokemon/3/'
  },
  {
    'name': 'charmander',
    'url': 'https://pokeapi.co/api/v2/pokemon/4/'
  },
  ...
]

Войдите в полноэкранный режим Выход из полноэкранного режима
  • Далее мы обращаемся к полю результатов и выполняем GET-вызов к каждому из URL-адресов. Результатом будет список с двадцатью покемонами, извлеченными из API;

Приступаем к работе! Добавьте следующий код в функцию extract внутри нашей группы DAG, не забыв, конечно, убрать ключевое слово pass:

    @task
    def extract() -> list:
        url = 'http://pokeapi.co/api/v2/pokemon'

        params = {
            'limit': 20
        }

        response = requests.get(url=url, params=params)

        json_response = response.json()
        results = json_response['results']

        pokemons = [requests.get(url=result['url']).json()
                    for result in results]

        return pokemons
Войдите в полноэкранный режим Выход из полноэкранного режима

Поздравляю! Задача по извлечению двадцати покемонов из API выполнена 🎉.

Преобразование данных

Если вы запросите конечную точку, например, через Postman, вы заметите, что каждый покемон содержит множество полей, таких как формы, статистика, ETC. Однако мы возьмем только пять полей из этого json.

Этапы задачи преобразования следующие:

  • Он берет данные, извлеченные предыдущей задачей, то есть задачей извлечения;
  • Создает Pandas’ DataFrame с данными и выбирает пять столбцов, отбрасывая остальные;
  • Сортирует данные в DataFrame по столбцу base_experience в порядке убывания;
  • Преобразует DataFrame в список словаря Python, чтобы данные можно было передать в последующую задачу;

А теперь снова за работу! Добавьте следующий код в функцию transform внутри нашего DAG, не забыв, конечно, удалить зарезервированное слово pass:

    @task
    def transform(pokemons: list) -> list:

        columns = [
            'name',
            'order',
            'base_experience',
            'height',
            'weight'
        ]

        df = pd.DataFrame(data=pokemons, columns=columns)
        df = df.sort_values(['base_experience'], ascending=False)

        pokemons = df.to_dict('records')

        return pokemons
Войдите в полноэкранный режим Выход из полноэкранного режима

Готово! Задача по преобразованию данных выполнена 🎉.

Загрузка данных

Наконец, осталось только разработать задание на нагрузку. Этапы выполнения этого задания следующие:

  • Берет данные, преобразованные предыдущей задачей;
  • Создает Pandas’ DataFrame с преобразованными данными;
  • Сохраните данные DataFrame в каталоге /dags/data/ в формате CSV;

Добавьте следующий код в функцию load внутри нашего DAG, не забыв, конечно, удалить зарезервированное слово pass:

    @task
    def load(pokemons: list):

        df = pd.DataFrame(data=pokemons)
        df.to_csv('./dags/data/pokemons_dataset.csv', index=False)
Войдите в полноэкранный режим Выход из полноэкранного режима

Поздравляю! Все задачи в нашем DAG были разработаны 🎉.

Оркестровка задач и передача данных

Успокойтесь, это еще не конец! Перед завершением нам нужно указать Airflow порядок выполнения этих задач и передать данные от одной задачи к другой. Ниже функций добавьте следующий код:

    # ETL pipeline

    # extract
    extracted_pokemons = extract()

    # transform
    transformed_pokemons = transform(pokemons=extracted_pokemons)

    # load
    load(pokemons=transformed_pokemons)
Войдите в полноэкранный режим Выход из полноэкранного режима

Теперь мы закончили 👏🏼. Обратите внимание, что передача данных между задачами осуществляется путем передачи их в качестве параметров функции. Очень, очень просто!

Окончательный код нашей группы DAG выглядел следующим образом:

import pandas as pd
import pendulum
import requests
from airflow.decorators import dag, task


@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2022, 1, 1, tz='UTC'),
    catchup=False
)
def pokemonsflow_dag():

    @task
    def extract() -> list:
        url = 'http://pokeapi.co/api/v2/pokemon'

        params = {
            'limit': 20
        }

        response = requests.get(url=url, params=params)

        json_response = response.json()
        results = json_response['results']

        pokemons = [requests.get(url=result['url']).json()
                    for result in results]

        return pokemons

    @task
    def transform(pokemons: list) -> list:

        columns = [
            'name',
            'order',
            'base_experience',
            'height',
            'weight'
        ]

        df = pd.DataFrame(data=pokemons, columns=columns)
        df = df.sort_values(['base_experience'], ascending=False)

        pokemons = df.to_dict('records')

        return pokemons

    @task
    def load(pokemons: list):

        df = pd.DataFrame(data=pokemons)
        df.to_csv('./dags/data/pokemons_dataset.csv', index=False)

    # ETL pipeline

    # extract
    extracted_pokemons = extract()

    # transform
    transformed_pokemons = transform(pokemons=extracted_pokemons)

    # load
    load(pokemons=transformed_pokemons)


dag = pokemonsflow_dag()
Войдите в полноэкранный режим Выход из полноэкранного режима

Тестирование группы DAG

Теперь мы можем проверить, работает ли наша группа DAG так, как ожидалось. Для этого перейдите в терминал, открытый в корне каталога, и введите следующие команды:


$ docker-compose exec airflow-worker bash
$ airflow dags test pokemonsflow_dag 2022-01-01

Войдите в полноэкранный режим Выход из полноэкранного режима

Когда выполнение DAG будет завершено, перейдите в каталог /dags/data/ и проверьте файл pokemons_dataset.csv. Все двадцать покемонов будут отсортированы в соответствии с колонкой base_experience.

Заключительные соображения

Как мы уже видели, Apache Airflow — это мощный инструмент для оркестровки задач в конвейере данных. Не забудьте ознакомиться со многими другими возможностями Airflow 😉.

В этой статье я рассказал, как разработать DAG для извлечения двадцати покемонов из API PokeAPI. Некоторые очень простые преобразования были применены к данным, прежде чем они были сохранены локально в формате CSV.

Если вам понравилась эта статья, не забудьте поставить лайк и поделиться ею в социальных сетях 💚.

Увидимся в следующий раз!

Оцените статью
devanswers.ru
Добавить комментарий