Некоторое время назад я столкнулся с ситуацией, когда в задании структурированной потоковой обработки spark требовалось объединить статические данные, расположенные в очень большой таблице.
Первый подход был не очень хорош. Даже при использовании небольших микропакетов он увеличивал время пакетной обработки на порядки.
(Очень) упрощенным примером такого случая может быть поток событий продаж, который необходимо объединить с дополнительной информацией о продукте, расположенной в большой таблице продуктов.
Эта статья посвящена использованию mapPartitions для объединения фреймов данных Spark Structured Streaming со статическими данными.
Подход №1 — объединение потоковых и статических данных
Первый подход предполагает объединение кадра данных о событиях продаж со статической таблицей продуктов.
К сожалению, это соединение заставляло каждый микропакет выполнять полное сканирование таблицы продуктов, что приводило к высокой длительности пакета, даже если поток обрабатывал одну запись.
Код выглядел следующим образом:
// streamingDS = … Sales stream initialization …
// Read static product table
val staticDS = spark.read
.format("parquet")
.load("/tmp/prods.parquet").as[Product]
// Join of sales stream with products table
streamingDS
.joinWith(staticDS,
streamingDS("productId")===staticDS("productId") &&
streamingDS("category")===staticDS("category"))
.map{
case (sale,product) => new SaleInfo(sale, Some(product))
}
Используя небольшое демонстрационное приложение, DAG показывает виновника:
Разбиение статической таблицы на разделы было проигнорировано, поэтому были прочитаны все строки всех разделов (в данном случае 5).
Полное сканирование таблицы продуктов добавило >1 минуту к длительности микропакета, даже если в нем было только одно событие.
Подход №2 — mapPartitions
Второй подход был основан на обращении к хранилищу ключевых значений для каждого события продажи с помощью операции Spark mapPartitions, которая позволяет выполнять преобразования фрейма данных/набора данных на уровне строк.
Ни таблицы Parquet, ни таблицы Delta не подходят для индивидуального поиска ключей, поэтому необходимым условием для этого сценария является загрузка информации о товаре в хранилище ключевых значений (в данном примере Mongo DB).
Код примера немного сложнее, но в некоторых случаях он стоит усилий, чтобы сохранить низкую продолжительность партии. Особенно это касается небольших микропартий.
// streamingDS = … Sales stream initialization …
streamingDS.mapPartitions(partition => {
// setup DB connection
val dbService = new ProductService()
dbService.connect()
partition.map(sale => {
// Product lookup and merge
val product = dbService.findProduct(sale.productId)
new SaleInfo(sale, Some(product))
}).iterator
})
Новый график длительности партии показывает, что проблема давно исчезла, и мы вернулись к короткой длительности партии.
Надеюсь, вам понравилось читать! Пожалуйста, дайте мне знать, если у вас есть лучшие подходы к решению этой проблемы.
Детали теста: Spark версии 3.2.1 работает на Ubuntu 20.04 LTS / WSL2.
Код теста: https://github.com/trincaog/spark-mappartitions-test
Фото Marc Sendra Martorell на Unsplash