Анализ данных потока кликов с помощью Databricks и Redpanda

Глобальным организациям необходим способ обработки огромных объемов данных, которые они производят для принятия решений в режиме реального времени. Для этой цели они часто используют инструменты потоковой обработки событий, такие как Redpanda, и инструменты потоковой обработки, такие как Databricks.

Примером использования является рекомендация контента пользователям на основе их кликов на мобильном или веб-приложении. Потоки кликов будут передаваться через Redpanda в Databricks, где рекомендательный механизм будет анализировать данные и рекомендовать контент:

Redpanda — это быстрая и масштабируемая платформа потоковой передачи событий в реальном времени, которая служит в качестве Apache Kafka. альтернативой. Она API-совместима с Kafka, поэтому все ваши существующие инструменты для Kakfa работают и с Redpanda. Он также поставляется в виде единого бинарного файла и может работать на виртуальной машине, в Docker и Kubernetes.

В этом учебнике рассматривается потоковая передача событий и аналитика данных с использованием Redpanda и Databricks. Вы узнаете, как отправлять данные в тему Redpanda из оболочки, хранить полученные данные в CSV файлах в Databricks и анализировать данные в реальном времени для получения информации.

Использование Redpanda для отправки данных в Databricks

Давайте начнем!

Во-первых, предварительные условия. Для выполнения этого руководства вам понадобится следующее:

  • Физическая или виртуальная машина с общедоступным IP-адресом.
  • Docker и docker-compose, установленные на этой виртуальной машине
  • Kafkacat (или любой клиент, совместимый с Kafka API) для подключения к Redpanda в качестве производителя.

Настройка Redpanda

Чтобы настроить Redpanda, создайте файл docker-compose.yml на сервере, к которому можно получить доступ через интернет. Это гарантирует, что брокер Redpanda сможет взаимодействовать с вашим развернутым экземпляром Databricks:

version: "3.7"
services:
  redpanda:
    command:
      - redpanda
      - start
      - --smp
      - "1"
      - --reserve-memory
      - 0M
      - --overprovisioned
      - --node-id
      - "0"
      - --kafka-addr
      - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr
      - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
    image: docker.vectorized.io/vectorized/redpanda:latest
    container_name: redpanda-1
    ports:
      - 9092:9092
      - 29092:29092
      - 8081:8081
Войдите в полноэкранный режим Выйдите из полноэкранного режима

Запустите контейнер Redpanda, перейдя в каталог, содержащий файл docker-compose.yml и выполните следующую команду:

docker compose up -d
Войти в полноэкранный режим Выйти из полноэкранного режима

Эта операция извлечет образ Redpanda Docker и запустит Redpanda на порту 9092. Убедитесь, что ваш экземпляр виртуальной машины имеет статический публичный IP-адрес и что порт 9092 является публичным.

Обратитесь к следующим руководствам для добавления публичного IP адреса и порта на вашем экземпляре виртуальной машины:

  • Добавление публичного IP-адреса на Amazon EC2
  • Добавление публичного IP-адреса на Azure Virtual Machine
  • Добавление публичного IP-адреса на Google Compute Engine

Настройка Databricks

Чтобы начать работу, создайте учетную запись Databricks (учетная запись предоставляется бесплатно на 14-дневный пробный период). После заполнения данных учетной записи вы будете перенаправлены на выбор поставщика облачных услуг. Перейдите к предпочтительному облачному провайдеру, выбрав соответствующую инструкцию по настройке из списка ниже:

  • Настройка Databricks на AWS
  • Настройка Databricks на Azure
  • Настройка Databricks на GCP

После успешной установки вы должны попасть на приборную панель со ссылками на различные аспекты Databricks.

Ваша первая задача — создать кластер Databricks. Кластер — это набор вычислительных ресурсов и конфигураций, позволяющих выполнять рабочие нагрузки в области науки о данных и инженерии. В данном случае вы будете запускать рабочую нагрузку инженерии данных для потоковой передачи данных из темы Redpanda в CSV-файл в Databricks.

На приборной панели нажмите на кнопку Create в левом верхнем углу, чтобы создать новый кластер. Нажав на Cluster, вы попадете на страницу для настройки свойств вашего кластера. Выберите имя для кластера и оставьте остальные поля без изменений; затем нажмите на кнопку Create Cluster.

Для выполнения всех задач этого руководства вы будете использовать блокнот Databricks. Блокнот Databricks — это интерфейс, который может содержать исполняемый код, документацию и визуализацию, подобно блокноту Jupyter. Этот блокнот будет служить в качестве блокнота для выполнения ваших команд.
Снова нажмите на кнопку Создать в верхней левой части приборной панели и на этот раз выберите опцию создания нового блокнота. Выберите описательное имя, например redpanda-kconnect-scratchpad и установите Scala в качестве языка по умолчанию.

Настройка потоковой передачи данных в Databricks

После настройки первого блокнота, вставьте содержимое ниже в первую ячейку блокнота:

import org.apache.spark.sql.functions.{get_json_object, json_tuple}

var streamingInputDF =
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "SERVER_IP:9092")
    .option("subscribe", "csv_input")
    .option("startingOffsets", "latest")
    .option("minPartitions", "10")
    .option("failOnDataLoss", "true")
    .load()
    .select($"value".cast("string"))
    .as[(String)]
Войти в полноэкранный режим Выйти из полноэкранного режима

Приведенный выше фрагмент кода создает кадр потоковых данных, назначенный переменной streamingInputDF. Этот фрейм данных подписывается на интересующую тему, csv_input, в кластере Redpanda. Кластер идентифицируется по IP-адресу сервера и порту. Порт в данном случае 9092, тот же порт, который использует Kafka.

Замените SERVER_IP развернутым IP-адресом вашего сервера. После установки SERVER_IP запустите ячейку для инициализации конфигурации. Вы должны получить результат, подобный приведенному ниже:

streamingInputDF:org.apache.spark.sql.Dataset[String] = [value: string]
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
streamingInputDF: org.apache.spark.sql.Dataset[String] = [value: string]
Вход в полноэкранный режим Выход из полноэкранного режима

Чтобы сохранить данные в Databricks, необходимо определить поток записи в файл. Этот поток записи должен иметь тот же тип файла, что и входной поток. Приведенный ниже фрагмент считывает данные из фрейма данных streamingInput и записывает их в CSV-файл. Операция записи выполняется каждые тридцать секунд, и все новые записи в теме Redpanda будут считываться и записываться в новый CSV-файл.

Создайте вторую ячейку и добавьте содержимое, как показано в приведенном ниже фрагменте:

import org.apache.spark.sql.streaming.Trigger

val query =
  streamingInputDF
    .writeStream
    .format("csv")
    .outputMode("append")
    .option("path", "/FileStore/tables/user-details-data")
    .option("checkpointLocation", "/FileStore/tables/user-details-check")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()
Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь запустите команду для начала потоковой передачи. Ваш вывод должен выглядеть примерно так, как показано на изображении ниже.

Загрузка данных из Redpanda в Databricks

Для того чтобы увидеть фактические данные в Databricks, вы будете передавать данные в Redpanda с помощью Kafkacat. Выполните приведенную ниже команду в вашей оболочке для создания производителя консоли Redpanda. Замените SERVER_IP на IP-адрес сервера, на котором работает Redpanda:

Теперь вставьте содержимое CSV в продюсер строка за строкой:

id,first_name,last_name
1,Israel,Edeh
2,John,Doe
3,Jane,Austin
4,Omo,Lawal
5,John,Manson
6,John,Rinzler
Войдите в полноэкранный режим Выйдите из полноэкранного режима

В зависимости от вашего интервала вставки содержимого, вы должны увидеть пять завершенных заданий в области вывода ячейки потока записи.

Чтобы просмотреть файлы CSV, создайте новую ячейку и выполните следующую команду:

В области вывода вы должны получить таблицу, показывающую все созданные CSV-файлы.

Для просмотра фактического содержимого файлов выполните следующую команду:

Вы должны получить вывод со списком записей, которые вы передали на данный момент. Он будет выглядеть примерно так:

+-------------+
|          _c0|
+-------------+
|id, first_name,last_name|
|1,Israel,Edeh|
|3,Jane,Austin|
|5,John,Manson|
|6,John,Rinzler|
|  4,Omo,Lawal|
|   2,John,Doe|
+-------------+
Войти в полноэкранный режим Выход из полноэкранного режима

Анализ потоковых данных пользователей

Databricks предлагает инструменты анализа данных и машинного обучения, чтобы помочь организациям разобраться в своих данных. Вы можете выполнить простой анализ данных, которые вы передавали в этом учебнике, используя Apache Spark запросы. Допустим, вы хотите сгруппировать пользователей по их именам, чтобы узнать количество пользователей с одинаковыми именами. Для этого можно использовать следующий код:

%python
users_df = spark.read.csv("/FileStore/tables/user-details-data/", header="true", inferSchema="true")

users_df.groupBy("first_name").count().show()
Войти в полноэкранный режим Выйти из полноэкранного режима

Выполнение приведенной выше команды приведет к следующему результату:

+----------+-----+
|first_name|count|
+----------+-----+
|      John|    3|
|    Israel|    1|
|       Omo|    1|
|      Jane|    1|
+----------+-----+
Войти в полноэкранный режим Выйти из полноэкранного режима

Из анализа видно, что три пользователя имеют имя «Джон» в качестве первого имени. Вы можете провести дальнейший анализ с набором данных с большим количеством строк.

Построение графиков потоковых данных

Databricks также позволяет визуализировать и строить графики данных. Вы можете подготовить данные CSV для построения графиков, выбрав заголовки и настроив параметры построения. Ваша первая задача — отобразить данные в виде таблицы. Для этого выполните приведенную ниже команду в новой ячейке:

%python
diamonds_df = spark.read.csv("/FileStore/tables/user-details-data/", header="true", inferSchema="true")

display(diamonds_df.select("id", "first_name", "last_name"))
Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь измените тип отображения на bar, а затем нажмите кнопку Plot Options…, чтобы настроить гистограмму. Перетащите first_name в поля keys и values и удалите другие поля. Затем установите тип агрегации COUNT. Наконец, нажмите на Apply, чтобы применить настройки.

Что вы будете создавать с помощью Databricks и Redpanda?

Распределенные системы требуют скорости на всех уровнях и в каждом компоненте. Redpanda особенно хорошо масштабируется для критически важных систем, причем без зависимости от JVM или ZooKeeper.

Теперь, когда вы знаете, как передавать данные из Redpanda в Databricks, анализировать и строить графики с помощью встроенной функции отображения Databricks, вы можете использовать эту настройку для анализа данных в реальном времени практически для любого проекта.

Общайтесь с разработчиками Redpanda напрямую в сообществе Redpanda Slack или вносите свой вклад в репозиторий GitHub с исходным кодом Redpanda.Чтобы узнать больше обо всем, что вы можете делать с Redpanda, ознакомьтесь с документацией здесь.

Оцените статью
devanswers.ru
Добавить комментарий