Автор: shiyuhang
TiSpark — это тонкий слой, созданный для запуска Apache Spark поверх TiKV/TiFlash для ответа на сложные OLAP-запросы. Он поддерживает чтение, запись и удаление из TiKV/TiFlash с гарантией транзакций.
Чтобы ускорить чтение, TiSpark будет выталкивать некоторые операторы в TiKV или TiFlash. В этой статье вы узнаете:
- Что такое pushdown в Spark
- Как Spark реализует pushdown
- Стратегия вытеснения в TiSpark
Что такое pushdown в Spark?
Pushdown — это классическая оптимизация SQL, которая может ускорить выполнение SQL-запросов. Она продвигает некоторые операторы как можно ближе к источнику данных, чтобы уменьшить объем данных, которые необходимо обработать верхнему уровню. Например, предикат pushdown подталкивает условие where, а агрегация pushdown подталкивает агрегатную функцию.
Итак, что такое pushdown в Spark? Имеет ли он то же значение, что и выше?
Прежде всего, давайте познакомимся с внутренним устройством Spark SQL. Ядром Spark SQL является катализатор, он будет:
-
Разбирает Spark SQL до нерешенного логического плана
-
Применять правила анализа и каталог для получения решенного логического плана
-
Применять правила оптимизации для получения оптимизированного логического плана
-
использовать стратегии планирования для получения физического плана
-
Выбор одного из физических планов на основе модели затрат
-
Генерировать исполняемые RDD и назначать их Spark Core
В процессе катализа Spark SQL будет разобран в виде дерева. Узел дерева называется TreeNode. В Spark есть несколько классов, которые наследуют TreeNode для представления различных типов узлов в логическом плане и физическом плане. Например, условие where будет разобрано как узел filter
.
Теперь поговорим о вытеснении в Spark. В Spark существует два этапа вытеснения:
-
Оптимизация проталкивания: сначала Spark продвигает операторы ближе к источнику данных, когда оптимизирует логический план.
-
Подталкивание к источнику данных: затем Spark подталкивает операторы к источнику данных, когда строит физический план.
В качестве примера рассмотрим следующий SQL:
select * from A join B on A.id = B.id where A.a>10 and B.b<100;
Этот SQL будет разобран в Spark как дерево. Фильтр — это условие where, join — оператор join, а scan — источник данных (здесь представлены таблица A и таблица B).
После первого шага pushdown фильтр будет приближен к источнику данных, чтобы уменьшить количество данных, которые будут обработаны оператором join.
Затем фильтр может быть сдвинут к источнику данных при построении физического плана. Иными словами, Spark больше не нужно фильтровать данные.
Как Spark реализует pushdown
В предыдущем разделе вы узнали, что в Spark существует два этапа pushdown. На первом происходит оптимизация Spark SQL, а на втором — pushdown к источнику данных.
В этом разделе вы узнаете, как Spark реализует эти два pushdown. Код в этом разделе основан на Spark 3.2.
оптимизация выталкивания
Оптимизация pushdown будет применена к логическому плану на этапе оптимизатора катализатора. В качестве примера возьмем pushdown предиката.
Правило вытеснения предикатов в Spark называется PushDownPredicates
.
object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
CombineFilters.applyLocally
.orElse(PushPredicateThroughNonJoin.applyLocally)
.orElse(PushPredicateThroughJoin.applyLocally)
}
}
Правило будет рекурсивно применено к дереву с помощью transform
.
Для каждого TreeNode в плане:
-
CombineFilters будет использоваться для объединения условия where. Например, a >1 и a >2 будут объединены в a >2.
-
PushPredicateThroughNonJoin будет обрабатывать предикат без присоединения
-
PushPredicateThroughJoin будет обрабатывать предикат с объединением.
Вы можете обратиться к исходному коду Spark, чтобы увидеть подробности PushPredicateThroughNonJoin и PushPredicateThroughJoin.
Обращение к источнику данных
Spark может отталкиваться от источника данных при построении физического плана на основе результатов оптимизации отталкивания.
Возможность отталкивания зависит от возможностей и реализации источника данных. Если ваш источник данных не поддерживает pushdowns, то вам нужно сказать Spark, что этого делать не следует.
Таким образом, Spark предоставляет некоторый интерфейс для взаимодействия источников данных со Spark. В качестве примера мы также возьмем pushdown предикатов. Spark предоставляет для него следующий интерфейс:
@Evolving
public interface SupportsPushDownFilters extends ScanBuilder {
Filter[] pushFilters(Filter[] filters);
Filter[] pushedFilters();
}
-
Filter[] pushFilters(Filter[] filters):на входе — результат оптимизации pushdown, а на выходе — фильтры, которые не могут быть вытолкнуты в источник данных, которые в Spark называются
postScanFilters
. -
Filter[] pushedFilters():вход пустой, а выход — фильтры, которые могут быть вытолкнуты в источник данных и называются
pushedFilters
. -
Фильтр может быть как
postScanFilters
, так иpushedFilters
. В этом случае источник данных и Spark будут выполнять фильтр вместе.
Как сопровождающий коннектора источника данных Spark, мы можем легко управлять pushdown с помощью интерфейса. Но как Spark применяет правила pushdown? Есть два шага:
-
Храните
postScanFilters
у реализаторов интерфейса. Spark будет обрабатывать их позже. -
Обработайте
pushedFilters
с помощью оператора scan. Как он будет обрабатываться, зависит от реализации источника данных.
Храните postScanFilters
.
Первый шаг происходит на этапе оптимизации катализатора V2ScanRelationPushDown
. Основной код выглядит следующим образом (упрощенно):
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
import DataSourceV2Implicits._
def apply(plan: LogicalPlan): LogicalPlan = {
applyColumnPruning(pushDownAggregates(pushDownFilters(createScanBuilder(plan))))
}
private def pushDownFilters(plan: LogicalPlan) = plan.transform {
case Filter(condition, sHolder: ScanBuilderHolder) =>
val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
sHolder.builder, normalizedFiltersWithoutSubquery)
val filterCondition = postScanFilters.reduceLeftOption(And)
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
}
Все правила отталкивания применяются в методе apply, включая pushDownFilters
, который отвечает за отталкивание предикатов.
pushDownFilters получит postScanFilters
и pushedFilters
от PushDownUtils и вернет только postScanFilters для окончательного логического плана. Spark сделает фильтры позже.
Код PushDownUtils выглядит следующим образом:
object PushDownUtils extends PredicateHelper {
def pushFilters(
scanBuilder: ScanBuilder,
filters: Seq[Expression]): (Seq[sources.Filter], Seq[Expression]) = {
scanBuilder match {
case r: SupportsPushDownFilters =>
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
}
(r.pushedFilters(), (untranslatableExprs ++ postScanFilters).toSeq)
case _ => (Nil, filters)
}
}
}
PushDownUtils будет соответствовать SupportsPushDownFilters и получит pushedFilters и postScanFilters по реализации источника данных. pushedFilters будет пустым, если реализация пуста, что означает, что ни один предикат не будет pushdown.
Работа с pushedFilters
.
В качестве примера возьмем источник данных JDBC:
Метод build в JDBCScanBuilder
вернет JDBCScan с входным pushedFilter
.
override def build(): Scan = {
JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, pushedFilter,
pushedAggregateList, pushedGroupByCols)
}
JDBCScan
вызовет relation.buildScan
в методе toV1TableScan
и вернет JDBCRDD
.
case class JDBCScan(
relation: JDBCRelation,
prunedSchema: StructType,
pushedFilters: Array[Filter],
pushedAggregateColumn: Array[String] = Array(),
groupByColumns: Option[Array[String]]) extends V1Scan {
override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
new BaseRelation with TableScan {
override def buildScan(): RDD[Row] = {
val columnList = if (groupByColumns.isEmpty) {
prunedSchema.map(_.name).toArray
} else {
pushedAggregateColumn
}
relation.buildScan(columnList, prunedSchema, pushedFilters, groupByColumns)
}
}.asInstanceOf[T]
}
}
Фильтр будет разобран как условие where и сохранен в filterWhereClause в JDBCRDD
. Затем полный SQL с условием where будет запрашивать источники данных, совместимые с протоколом MySQL.
private[jdbc] class JDBCRDD(
sc: SparkContext,
getConnection: () => Connection,
schema: StructType,
columns: Array[String],
filters: Array[Filter],
partitions: Array[Partition],
url: String,
options: JDBCOptions,
groupByColumns: Option[Array[String]])
extends RDD[InternalRow](sc, Nil) {
private val filterWhereClause: String =
filters
.flatMap(JDBCRDD.compileFilter(_, JdbcDialects.get(url)))
.map(p => s"($p)").mkString(" AND ")
}
Затем реализация источника данных JDBC будет применена к Spark на этапе планирования с помощью DataSourceV2Strategy. Упрощенный код ядра выглядит следующим образом:
class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters,
DataSourceV2ScanRelation(_, V1ScanWrapper(scan, pushed, aggregate), output)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val dsScan = RowDataSourceScanExec(
output,
output.toStructType,
Set.empty,
pushed.toSet,
aggregate,
unsafeRowRDD,
v1Relation,
tableIdentifier = None)
withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion = false)
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(_, scan: LocalScan, output))
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation)
case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation)
Spark будет соответствовать PhysicalOperation в стратегии. Что касается источника данных JDBC, то он попадет в первый случай, поскольку это V1Scan.
В первом случае вызывается scan.toV1TableScan
, чтобы получить JDBCRDD, который был представлен выше. Затем RowDataSourceScanExec
выполнит выборку данных из JDBCRDD. Наконец, withProjectAndFilter
поместит RowDataSourceScanExec
во весь физический план. Spark сначала выполнит сканирование с фильтром, а затем выполнит остальные операторы.
Стратегия вытеснения в TiSpark
Стратегия в TiSpark
TiSpark — это коннектор Spark, который предоставляет источник данных TiKV. Таким образом, мы можем выталкивать некоторые операторы из Spark в TiKV в соответствии с обсуждением выше.
Возникает вопрос: какова стратегия выталкивания в TiSpark?
Мы должны поддерживать pushdown в Spark >= 2.4 и удовлетворять следующим стратегиям.
Во-первых, оператор не может быть вытолкнут вниз, если источник данных не может обработать оператор. Для предикатов TiKV не может поддерживать все выражения и типы данных. Поэтому TiSpark должен исключить их автоматически.
Во-вторых, мы можем не захотеть отталкивать, чтобы уменьшить нагрузку на TiKV. Это поможет, когда ресурсов Spark много, а ресурсов TiKV мало. TiSpark предоставляет некоторые конфигурации для этого:
-
spark.tispark.plan.allow_agg_pushdown: с помощью этой конфигурации вы можете отказаться от агрегации pushdown.
-
spark.tispark.plan.unsupported_pushdown_exprs: вы можете указать неподдерживаемые выражения, чтобы запретить их вытеснение. Это также поможет вам работать со старой версией TiKV, которая может не поддерживать некоторые выражения.
Как TiSpark реализует стратегию
Далее мы узнаем, как TiSpark реализует стратегию.
Мы представили интерфейс pushdown, предоставляемый Spark. Однако он не может соответствовать стратегии TiSpark. Вот несколько вопросов об интерфейсе pushdown:
-
Плохая способность к расширению: Дизайн интерфейса pushdown недостаточно хорош для расширения в Data Source API V1 (DSV1). Это означает, что трудно поддерживать различные отталкивания в DSV1.
-
Ограниченные возможности отталкивания: DSV2 улучшает интерфейс pushdown и решает проблемы расширения. Но возможности pushdown ограничены. Spark 3.0 поддерживает только вытеснение предикатов и вытеснение столбцов. В версии Spark 3.1 добавлена функция агрегации, а в версии Spark 3.2 — функция ограничения.
-
Негибкая стратегия вытеснения: Например, агрегация pushdown не поддерживает push avg, который может быть преобразован как сумма/сумма в DSV2.
TiSpark должен поддерживать общие pushdowns в каждой поддерживаемой версии spark. Таким образом, интерфейс pushdown не подходит. Что делать дальше? Ответ кроется в расширении катализатора.
Расширение катализатора поддерживается после версии Spark 2.2. Мы можем вводить пользовательские правила и стратегии в большинство фаз катализатора. Другими словами, мы можем внедрить стратегии TiSpark pushdown для точного управления pushdown.
Spark будет проталкиваться к источнику данных на этапе планирования. Соответствующий интерфейс расширения — injectPlannerStrategy(на основе Spark 3.2.1 и TiSpark 3.0.1).
def injectPlannerStrategy(builder: StrategyBuilder): Unit = {
plannerStrategyBuilders += builder
}
TiSpark должен реализовать этот интерфейс:
ReflectionUtil будет возвращать TiStrategy
в соответствии с различными версиями Spark путем отражения в scala. Это позволит избежать проблем совместимости, вызванных разными версиями spark.
e.injectPlannerStrategy(new TiStrategyFactory(getOrCreateTiContext))
class TiStrategyFactory(getOrCreateTiContext: SparkSession => TiContext)
extends (SparkSession => Strategy) {
override def apply(sparkSession: SparkSession): Strategy = {
TiExtensions.validateCatalog(sparkSession)
ReflectionUtil.newTiStrategy(getOrCreateTiContext, sparkSession)
}
}
TiStrategy — это ядро отталкивания. Она сопоставляет TiDBtable, которая представляет источник данных TiDB, а затем выполняетdoPlan
. Если совпадение не удалось, TiSpark ничего не будет делать, чтобы не повлиять на другие источники данных.
ase class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSession: SparkSession)
extends Strategy with Logging {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan
.collectFirst {
case DataSourceV2ScanRelation(DataSourceV2Relation(table: TiDBTable, _, _, _, _), _, _) =>
doPlan(table, plan)
}
.toSeq
.flatten
}
Стратегии в doPlan имитируют стратегии в DataSourceV2Strategy. TiSpark определит операторы, которые могут быть перенесены в источник данных с совпадением шаблонов. Затем запрашивает TiKV на основе этих операторов. В качестве примера возьмем выталкивание предикатов. TiSpark сопоставит PhysicalOperation и выполнит метод pruneFilterProject.
private def doPlan(source: TiDBTable, plan: LogicalPlan): Seq[SparkPlan] =
plan match {
case PhysicalOperation(
projectList,
filters,
DataSourceV2ScanRelation(
DataSourceV2Relation(source: TiDBTable, _, _, _, _),
_,
_)) =>
pruneFilterProject(projectList, filters, source, newTiDAGRequest()) :: Nil
case TiAggregation(
groupingExpressions,
aggregateExpressions,
resultExpressions,
TiAggregationProjectionV2(filters, _, `source`, projects))
if isValidAggregates(groupingExpressions, aggregateExpressions, filters, source) =>
case _ => Nil
}
Метод pruneFilterProject выполнит:
-
Преобразует выражение фильтра Spark в выражение фильтра TiKV с помощью TiExprUtils.isSupportedFilter. TiSpark также определит, может ли выражение быть вытеснено в методе. Фильтры, которые можно проталкивать, будут помещены в
pushdownFilters
, а те, которые нельзя проталкивать, будут помещены вresidualFilter
. -
DAGRequest, который является параметром в запросе к TiKV, будет построен с помощью filterToDAGRequest. Фильтры
pushdownFilters
будут помещены в DAGRequest. Затем, сканирование, которое может получить данные TiKV, будет сгенерированоtoCoprocessorRDD
. -
Сканирование будет обернуто и выполнено FilterExec. Тем временем нам нужно применить
residualFilter
обратно к Spark.
private def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
source: TiDBTable,
dagRequest: TiDAGRequest): SparkPlan = {
val (pushdownFilters: Seq[Expression], residualFilters: Seq[Expression]) =
filterPredicates.partition((expression: Expression) =>
TiExprUtils.isSupportedFilter(expression, source, blocklist))
val residualFilter: Option[Expression] =
residualFilters.reduceLeftOption(catalyst.expressions.And)
filterToDAGRequest(tiColumns, pushdownFilters, source, dagRequest)
val scan = toCoprocessorRDD(source, projectSeq, dagRequest)
residualFilter.fold(scan)(FilterExec(_, scan))
}
Таким образом, TiSpark может поддерживать большую часть pushdown в каждой версии Spark (>=2.4).
На данный момент TiSpark поддерживает предикатный pushdown, агрегатный pushdown, предельный pushdown, order by pushdown и проекционный pushdown. Кроме того, TiSpark может контролировать, может ли конкретное выражение или тип данных быть оттесненным.
Заключение
TiSpark поддерживает отталкивание с помощью расширения катализатора, что влечет за собой несколько проблем:
-
Увеличение сложности кода
-
Нестабильность, поскольку мы можем затронуть развивающийся интерфейс или метод в Spark.
-
Мы должны быть очень осторожны, чтобы не повлиять на исходную логику Spark
Spark фокусируется на развитии DataSource API, а также интерфейса pushdown. Надеемся, что в ближайшем будущем DataSource API будет достаточно сильным, чтобы удовлетворить потребности TiSpark. В то время мы будем рады перенести pushdown в DataSource API.
Приложение
Ниже перечислены поддерживаемые в TiSpark функции pushdown
Тип данных | сумма | подсчет | avg | min | max | предикат & порядок по & группа по |
БИТ | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
BOOLEAN | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
TINYINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
SMALLINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
МЕДИУМИНТ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
INTEGER | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
BIGINT | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
FLOAT | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
ДВОЙНОЙ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
ДЕЦИМАЛ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
ДАТА | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
ДАТЕТЕЙНОЕ ВРЕМЯ | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TIMESTAMP | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
ВРЕМЯ | ❌ | ✅ | ❌ | ? | ? | ✅ |
ГОД | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
CHAR | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
VARCHAR | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TINYTEXT | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
ТЕКСТ | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
MEDIUMTEXT | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
LONGTEXT | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
БИНАРНЫЙ | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
ВАРБИНАРНЫЙ | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ |
TINYBLOB | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
BLOB | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
СРЕДНИЙ БЛОК | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
LONGBLOB | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
ENUM | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ |
УСТАНОВИТЬ | ❌ | ❌ | ❌ | ? | ? | ❌ |
- Отжимание min/max(время) приводит к неверному результату
- Нажатие min/max(set) может вызвать панику TiKV
Вы можете определить, является ли оператор pushdown, по explain
в TiSpark. Вот пример:
1.Создайте таблицу в TiDB
CREATE TABLE `test`.`t` (
`id` int(11) NOT NULL,
PRIMARY KEY (`id`)
);
2.Выполнить Spark SQL с помощью explan
spark.sql("select avg(id) from test.t where id > 10").explain
3.Вы получите план выполнения
*(2) HashAggregate(keys=[], functions=[specialsum(specialsum(id#252L, DecimalType(38,0), null)#258, DecimalType(38,0), null), specialsum(count(id#252L)#259L, LongType, 0)])
+- Exchange SinglePartition, true, [id=#38]
+- *(1) HashAggregate(keys=[], functions=[partial_specialsum(specialsum(id#252L, DecimalType(38,0), null)#258, DecimalType(38,0), null), partial_specialsum(count(id#252L)#259L, LongType, 0)])
+- *(1) ColumnarToRow
+- TiKV CoprocessorRDD{[table: t] IndexReader, Columns: id@LONG: { IndexRangeScan(Index:primary(id)): { RangeFilter: [[id@LONG GREATER_THAN 10]], Range: [([t200 00 00 00 00 00 00o_i200 00 00 00 00 00 00 01 03200 00 00 00 00 00 00v], [t200 00 00 00 00 00 00o_i200 00 00 00 00 00 00 01372])] }, Aggregates: Sum(id@LONG), Count(id@LONG) }, startTs: 434873744501506049}
Фокус на сопроцессоре TiKVRDD
-
RangeFilter: [[id@LONG GREATER_THAN 10]]
: указывает, что id>10 отталкивается от 10. -
Aggregates: Sum(id@LONG), Count(id@LONG)
: указывает, что Sum и Count сдвинуты вниз, они будут переписаны в avg в Spark.