В предыдущем посте мы показали, как запустить Delta Lake на Amazon EMR Serverless. С тех пор вышел новый релиз (6.7.0) с реализованным флагом --packages
. Это помогает нам намного проще работать с spark. Тем не менее, флаг --packages
требует дополнительной настройки сети, с которой большинство Data Scientists и инженеров не знакомы. Наша цель — показать шаг за шагом, как это сделать.
Сначала несколько пояснений
При использовании Spark с зависимостями Java у нас есть два варианта: (1) собрать и вставить файлы .jar
вручную в кластер или (2) передать зависимости флагом --packages
, чтобы spark могла автоматически загрузить их из maven. Начиная с версии 6.7.0 EMR Serverless, этот флаг доступен для использования.
Проблема заключается в том, что кластеру spark необходимо подключиться к интернету, чтобы загрузить пакеты из maven. Amazon EMR Serverless поначалу живет вне какого-либо VPC и поэтому не может выйти в интернет. Для этого необходимо создать EMR-приложение внутри VPC. Однако EMR-приложения могут быть созданы только в частных подсетях, которые (кстати…) не выходят в интернет и не могут связаться с S3 😭… Как это исправить?
Шаг первый: создание сети
На схеме ниже показана вся необходимая сетевая структура:
Она легко создается на AWS в интерфейсе VPC. Нажмите кнопку Create VPC и выберите VPC и другие. AWS делает всю работу и предоставляет дизайн VPC с 2 публичными подсетями, 2 частными подсетями, интернет-шлюзом, необходимыми таблицами маршрутизации и S3-endpoint (чтобы ресурсы внутри VPC могли обращаться к S3).
Вы можете установить число зон доступности на 1, если хотите, но для обеспечения высокой доступности вам следует работать как минимум с 2 AZ.
Далее вам нужно убедиться, что вы отметили как минимум один NAT-шлюз, который отвечает за доступ частных подсетей в интернет. Ниже показан экран с окончательной настройкой:
Нажмите кнопку create VPC, и мы закончили работу с сетью.
Осталось создать группу безопасности, которая разрешает исходящий трафик в интернет. Вернитесь к VPC в AWS и нажмите Security Group в левой панели. Затем нажмите Создать группу безопасности. Назовите вашу группу безопасности, снимите флажок с выбранного VPC и отметьте только что созданную. По умолчанию группы безопасности не разрешают никакого входящего трафика и разрешают весь исходящий трафик. Мы можем оставить все как есть. Создайте группу безопасности и вуаля!
Шаг второй: Роли и политики IAM
Вам нужны две роли: роль, связанная с сервисом, и еще одна роль, которая дает разрешение на доступ к S3 и Glue. Мы уже обсуждали это в предыдущем посте в разделе Настройка — Аутентификация. Ознакомьтесь с ним. Нам также понадобится набор данных для работы. Для этого подойдет знаменитый набор данных Titanic. Вы можете скачать его здесь.
Шаг 3: Создайте EMR Studio и бессерверное приложение EMR
Сначала мы должны создать EMR Studio. Если у вас еще нет созданных студий, это очень просто. После нажатия кнопки Get started на главной странице EMR Serverless вы можете нажать кнопку для автоматического создания студии.
Во-вторых, вам нужно создать приложение EMR Serverless. Задайте имя и (помните!) выберите релиз 6.7.0. Для настройки сетевого взаимодействия необходимо отметить Choose custom settings и прокрутить вниз до Network connections.
В сетевых подключениях выберите созданный вами VPC, две частные подсети и группу безопасности.
Шаг четвертый: Код Spark
Теперь мы подготовим простой код pyspark, чтобы смоделировать некоторые изменения в наборе данных (мы включим двух новых пассажиров — Ней и Сару — и обновим информацию о двух пассажирах, которые считались погибшими, но были найдены живыми, мистер Оуэн Браунд и мистер Уильям Аллен). Ниже приведен код для этого.
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
from delta.tables import *
print("Reading CSV file from S3...")
schema = "PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"
df = spark.read.csv(
"s3://<YOUR-BUCKET>/titanic",
header=True, schema=schema, sep=";"
)
print("Writing titanic dataset as a delta table...")
df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")
print("Updating and inserting new rows...")
new = df.where("PassengerId IN (1, 5)")
new = new.withColumn("Survived", f.lit(1))
newrows = [
(892, 1, 1, "Sarah Crepalde", "female", 23.0, 1, 0, None, None, None, None),
(893, 0, 1, "Ney Crepalde", "male", 35.0, 1, 0, None, None, None, None)
]
newrowsdf = spark.createDataFrame(newrows, schema=schema)
new = new.union(newrowsdf)
print("Create a delta table object...")
old = DeltaTable.forPath(spark, "s3://<YOUR-BUCKET>/silver/titanic_delta")
print("UPSERT...")
# UPSERT
(
old.alias("old")
.merge(new.alias("new"),
"old.PassengerId = new.PassengerId"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
print("Checking if everything is ok")
print("New data...")
(
spark.read.format("delta")
.load("s3://<YOUR-BUCKET>/silver/titanic_delta")
.where("PassengerId < 6 OR PassengerId > 888")
.show()
)
print("Old data - with time travel")
(
spark.read.format("delta")
.option("versionAsOf", "0")
.load("s3://<YOUR-BUCKET>/silver/titanic_delta")
.where("PassengerId < 6 OR PassengerId > 888")
.show()
)
Этот файл .py
должен быть загружен на S3.
Шаг 5: GO!
Теперь мы отправляем задание на выполнение. Мы можем сделать это с помощью AWS CLI:
aws emr-serverless start-job-run
--name Delta-Upsert
--application-id <YOUR-APPLICATION-ID>
--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py",
"sparkSubmitParameters": "--packages io.delta:delta-core_2.12:2.0.0"
}
}'
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"}
}
}'
Вот и все! Когда задание будет выполнено, зайдите в папку журналов и проверьте журналы (ищите ID приложения, ID задания и журналы SPARK_DRIVER). Вы должны увидеть что-то вроде этого:
Reading CSV file from S3...
Writing titanic dataset as a delta table...
Updating and inserting new rows...
Create a delta table object...
UPSERT...
Checking if everything is ok
New data...
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 1| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 1| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|
| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|
| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|
| 892| 1| 1| Sarah Crepalde|female|23.0| 1| 0| null| null| null| null|
| 893| 0| 1| Ney Crepalde| male|35.0| 1| 0| null| null| null| null|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Old data - with time travel
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|
| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|
| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Удачного кодирования и дальнейшего развития!