Как запустить Amazon EMR Serverless с флагом —packages

В предыдущем посте мы показали, как запустить Delta Lake на Amazon EMR Serverless. С тех пор вышел новый релиз (6.7.0) с реализованным флагом --packages. Это помогает нам намного проще работать с spark. Тем не менее, флаг --packages требует дополнительной настройки сети, с которой большинство Data Scientists и инженеров не знакомы. Наша цель — показать шаг за шагом, как это сделать.

Сначала несколько пояснений

При использовании Spark с зависимостями Java у нас есть два варианта: (1) собрать и вставить файлы .jar вручную в кластер или (2) передать зависимости флагом --packages, чтобы spark могла автоматически загрузить их из maven. Начиная с версии 6.7.0 EMR Serverless, этот флаг доступен для использования.

Проблема заключается в том, что кластеру spark необходимо подключиться к интернету, чтобы загрузить пакеты из maven. Amazon EMR Serverless поначалу живет вне какого-либо VPC и поэтому не может выйти в интернет. Для этого необходимо создать EMR-приложение внутри VPC. Однако EMR-приложения могут быть созданы только в частных подсетях, которые (кстати…) не выходят в интернет и не могут связаться с S3 😭… Как это исправить?

Шаг первый: создание сети

На схеме ниже показана вся необходимая сетевая структура:

Она легко создается на AWS в интерфейсе VPC. Нажмите кнопку Create VPC и выберите VPC и другие. AWS делает всю работу и предоставляет дизайн VPC с 2 публичными подсетями, 2 частными подсетями, интернет-шлюзом, необходимыми таблицами маршрутизации и S3-endpoint (чтобы ресурсы внутри VPC могли обращаться к S3).

Вы можете установить число зон доступности на 1, если хотите, но для обеспечения высокой доступности вам следует работать как минимум с 2 AZ.

Далее вам нужно убедиться, что вы отметили как минимум один NAT-шлюз, который отвечает за доступ частных подсетей в интернет. Ниже показан экран с окончательной настройкой:

Нажмите кнопку create VPC, и мы закончили работу с сетью.

Осталось создать группу безопасности, которая разрешает исходящий трафик в интернет. Вернитесь к VPC в AWS и нажмите Security Group в левой панели. Затем нажмите Создать группу безопасности. Назовите вашу группу безопасности, снимите флажок с выбранного VPC и отметьте только что созданную. По умолчанию группы безопасности не разрешают никакого входящего трафика и разрешают весь исходящий трафик. Мы можем оставить все как есть. Создайте группу безопасности и вуаля!

Шаг второй: Роли и политики IAM

Вам нужны две роли: роль, связанная с сервисом, и еще одна роль, которая дает разрешение на доступ к S3 и Glue. Мы уже обсуждали это в предыдущем посте в разделе Настройка — Аутентификация. Ознакомьтесь с ним. Нам также понадобится набор данных для работы. Для этого подойдет знаменитый набор данных Titanic. Вы можете скачать его здесь.

Шаг 3: Создайте EMR Studio и бессерверное приложение EMR

Сначала мы должны создать EMR Studio. Если у вас еще нет созданных студий, это очень просто. После нажатия кнопки Get started на главной странице EMR Serverless вы можете нажать кнопку для автоматического создания студии.

Во-вторых, вам нужно создать приложение EMR Serverless. Задайте имя и (помните!) выберите релиз 6.7.0. Для настройки сетевого взаимодействия необходимо отметить Choose custom settings и прокрутить вниз до Network connections.

В сетевых подключениях выберите созданный вами VPC, две частные подсети и группу безопасности.

Шаг четвертый: Код Spark

Теперь мы подготовим простой код pyspark, чтобы смоделировать некоторые изменения в наборе данных (мы включим двух новых пассажиров — Ней и Сару — и обновим информацию о двух пассажирах, которые считались погибшими, но были найдены живыми, мистер Оуэн Браунд и мистер Уильям Аллен). Ниже приведен код для этого.

from pyspark.sql import functions as f
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

from delta.tables import *

print("Reading CSV file from S3...")

schema = "PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"
df = spark.read.csv(
    "s3://<YOUR-BUCKET>/titanic", 
    header=True, schema=schema, sep=";"
)

print("Writing titanic dataset as a delta table...")
df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")

print("Updating and inserting new rows...")
new = df.where("PassengerId IN (1, 5)")
new = new.withColumn("Survived", f.lit(1))
newrows = [
    (892, 1, 1, "Sarah Crepalde", "female", 23.0, 1, 0, None, None, None, None),
    (893, 0, 1, "Ney Crepalde", "male", 35.0, 1, 0, None, None, None, None)
]
newrowsdf = spark.createDataFrame(newrows, schema=schema)
new = new.union(newrowsdf)

print("Create a delta table object...")
old = DeltaTable.forPath(spark, "s3://<YOUR-BUCKET>/silver/titanic_delta")


print("UPSERT...")
# UPSERT
(
    old.alias("old")
    .merge(new.alias("new"), 
    "old.PassengerId = new.PassengerId"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

print("Checking if everything is ok")
print("New data...")

(
    spark.read.format("delta")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)

print("Old data - with time travel")
(
    spark.read.format("delta")
    .option("versionAsOf", "0")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)
Вход в полноэкранный режим Выход из полноэкранного режима

Этот файл .py должен быть загружен на S3.

Шаг 5: GO!

Теперь мы отправляем задание на выполнение. Мы можем сделать это с помощью AWS CLI:

aws emr-serverless start-job-run 
--name Delta-Upsert 
--application-id <YOUR-APPLICATION-ID> 
--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole 
--job-driver '{
  "sparkSubmit": {
    "entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py", 
    "sparkSubmitParameters": "--packages io.delta:delta-core_2.12:2.0.0"
  }
}' 
--configuration-overrides '{
"monitoringConfiguration": {
  "s3MonitoringConfiguration": {
    "logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"} 
  } 
}'
Войти в полноэкранный режим Выйти из полноэкранного режима

Вот и все! Когда задание будет выполнено, зайдите в папку журналов и проверьте журналы (ищите ID приложения, ID задания и журналы SPARK_DRIVER). Вы должны увидеть что-то вроде этого:

Reading CSV file from S3...
Writing titanic dataset as a delta table...
Updating and inserting new rows...
Create a delta table object...
UPSERT...
Checking if everything is ok
New data...
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       1|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       1|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
|        892|       1|     1|      Sarah Crepalde|female|23.0|    1|    0|            null|   null| null|    null|
|        893|       0|     1|        Ney Crepalde|  male|35.0|    1|    0|            null|   null| null|    null|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+

Old data - with time travel
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Вход в полноэкранный режим Выход из полноэкранного режима

Удачного кодирования и дальнейшего развития!

Оцените статью
devanswers.ru
Добавить комментарий