Репликация данных DynamoDB из одной таблицы в другую
Эта статья блога поможет вам быстро начать работу с DynamoDB Streams и AWS Lambda с помощью Go. В ней будет рассказано о том, как развернуть все решение с помощью AWS CDK.
Представленный здесь сценарий использования довольно прост. Есть несколько таблиц DynamoDB
, и цель состоит в том, чтобы получить данные в одной из этих таблиц (также называемой таблицей source
) и реплицировать их в другую таблицу (также называемую таблицей target
), чтобы она могла обслуживать различные запросы. Для демонстрации сквозного потока существует также Amazon API Gateway, который выполняет функцию Lambda, сохраняющую данные в исходной таблице DynamoDB
. Изменения в этой таблице вызовут другую функцию Lambda (благодаря DynamoDB Streams), которая окончательно реплицирует данные в целевую таблицу.
Глобальный или локальный вторичный индекс предоставляют аналогичные возможности.
Теперь, когда у вас есть общее представление о том, чего мы пытаемся достичь…
…давайте перейдем непосредственно к делу!
Прежде чем продолжить, убедитесь, что у вас установлен язык программирования Go (v1.16 или выше) и AWS CDK.
Клонируйте проект и перейдите в нужную директорию:
git clone https://github.com/abhirockzz/dynamodb-streams-lambda-golang
cd cdk
Чтобы начать развертывание…
… все, что вам нужно сделать, это выполнить одну команду (cdk deploy
) и немного подождать. Вы увидите список ресурсов, которые будут созданы, и вам нужно будет дать свое подтверждение, чтобы продолжить.
Не волнуйтесь, в следующем разделе я объясню, что происходит.
cdk deploy
# output
Bundling asset DynamoDBStreamsLambdaGolangStack/ddb-streams-function/Code/Stage...
✨ Synthesis time: 5.94s
This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening).
Please confirm you intend to make the following modifications:
//.... omitted
Do you wish to deploy these changes (y/n)? y
Это начнет создание ресурсов AWS, необходимых для нашего приложения.
Если вы хотите увидеть шаблон AWS CloudFormation, который будет использоваться за сценой, запустите
cdk synth
и проверьте папкуcdk.out
.
Вы можете следить за прогрессом в терминале или перейти в консоль AWS: CloudFormation > Stacks > DynamoDBStreamsLambdaGolangStack
.
Как только все ресурсы будут созданы, вы можете опробовать приложение. У вас должно быть:
- Две функции Lambda
- Две таблицы DynamoDB (исходная и целевая)
- Один API-шлюз (также маршрут, интеграция)
- и некоторые другие (например, роли IAM и т.д.).
Прежде чем продолжить, получите конечную точку API Gateway, которую вам нужно будет использовать. Она доступна в выводе стека (в терминале или на вкладке Outputs в консоли AWS CloudFormation для вашего стека):
Конечное решение…
Начните с создания нескольких пользователей в (исходной) таблице DynamoDB.
Для этого вызовите конечную точку API Gateway (HTTP) с соответствующей полезной нагрузкой JSON
:
# export the API Gateway endpoint
export APIGW_ENDPOINT=<replace with API gateway endpoint above>
# for example:
export APIGW_ENDPOINT=https://gy8gxsx9x7.execute-api.us-east-1.amazonaws.com/
# invoke the endpoint with JSON data
curl -i -X POST -d '{"email":"user1@foo.com", "state":"New York","city":"Brooklyn","zipcode": "11208"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT
curl -i -X POST -d '{"email":"user2@foo.com", "state":"New York","city":"Staten Island","zipcode": "10314"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT
curl -i -X POST -d '{"email":"user3@foo.com", "state":"Ohio","city":"Cincinnati","zipcode": "45201"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT
curl -i -X POST -d '{"email":"user4@foo.com", "state":"Ohio","city":"Cleveland","zipcode": "44101"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT
curl -i -X POST -d '{"email":"user5@foo.com", "state":"California","city":"Los Angeles","zipcode": "90003"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT
Перейдите к таблице DynamoDB
в консоли AWS и убедитесь, что записи были созданы:
Если у вас под рукой AWS CLI, вы также можете попробовать
aws dynamodb scan --table <name of table>
.
Если все идет хорошо, наша функция репликации также должна работать. Чтобы убедиться в этом, необходимо проверить целевую таблицу DynamoDB
.
Обратите внимание, что атрибут
zipcode
отсутствует — это сделано специально для данного демо. Вы можете сами выбрать атрибуты, которые хотите включить в целевую таблицу, и написать логику своей функции соответствующим образом.
Целевая таблица DynamoDB
имеет state
в качестве ключа раздела и city
в качестве ключа сортировки, вы можете запрашивать ее другим способом (по сравнению с исходной таблицей, которую вы могли запрашивать только на основе email
).
Не забудьте очистить!
После завершения работы, чтобы удалить все сервисы, просто используйте:
cdk destroy
#output prompt (choose 'y' to continue)
Are you sure you want to delete: DynamoDBStreamsLambdaGolangStack (y/n)?
Потрясающе! Вы смогли настроить и опробовать готовое решение. Прежде чем мы закончим, давайте быстро пройдемся по некоторым важным частям кода, чтобы лучше понять, что происходит за кулисами.
Краткое описание кода
Поскольку мы сосредоточимся только на важных битах, большая часть кода (операторы печати, обработка ошибок и т.д.) опущена/закомментирована для краткости.
Infra-IS-код с AWS CDK и Go!
Вы можете обратиться к коду CDK здесь
Начнем с создания таблицы DynamoDB и убедимся, что включена функция DynamoDB Streams.
sourceDynamoDBTable := awsdynamodb.NewTable(stack, jsii.String("source-dynamodb-table"),
&awsdynamodb.TableProps{
PartitionKey: &awsdynamodb.Attribute{
Name: jsii.String("email"),
Type: awsdynamodb.AttributeType_STRING},
Stream: awsdynamodb.StreamViewType_NEW_AND_OLD_IMAGES})
sourceDynamoDBTable.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)
Затем мы обрабатываем функцию Lambda (она позаботится о создании и развертывании функции) и убеждаемся, что предоставили ей соответствующие разрешения на запись в таблицу DynamoDB
.
createUserFunction := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("create-function"),
&awscdklambdagoalpha.GoFunctionProps{
Runtime: awslambda.Runtime_GO_1_X(),
Environment: &map[string]*string{envVarName: sourceDynamoDBTable.TableName()},
Entry: jsii.String(createFunctionDir)})
sourceDynamoDBTable.GrantWriteData(createUserFunction)
Шлюз API (HTTP API) создан, вместе с интеграцией HTTP-Lambda Function, а также соответствующим маршрутом.
api := awscdkapigatewayv2alpha.NewHttpApi(stack, jsii.String("http-api"), nil)
createFunctionIntg := awscdkapigatewayv2integrationsalpha.NewHttpLambdaIntegration(jsii.String("create-function-integration"), createUserFunction, nil)
api.AddRoutes(&awscdkapigatewayv2alpha.AddRoutesOptions{
Path: jsii.String("/"),
Methods: &[]awscdkapigatewayv2alpha.HttpMethod{awscdkapigatewayv2alpha.HttpMethod_POST},
Integration: createFunctionIntg})
Нам также нужна таблица target
DynamoDB — обратите внимание, что эта таблица имеет составной первичный ключ (state
и city
):
targetDynamoDBTable := awsdynamodb.NewTable(stack, jsii.String("target-dynamodb-table"),
&awsdynamodb.TableProps{
PartitionKey: &awsdynamodb.Attribute{
Name: jsii.String("state"),
Type: awsdynamodb.AttributeType_STRING},
SortKey: &awsdynamodb.Attribute{
Name: jsii.String("city"),
Type: awsdynamodb.AttributeType_STRING},
})
targetDynamoDBTable.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)
Наконец, мы создаем вторую функцию Lambda, которая отвечает за репликацию данных, предоставляем ей нужные разрешения и, самое главное, добавляем DynamoDB
в качестве источника событий.
replicateUserFunction := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("replicate-function"),
&awscdklambdagoalpha.GoFunctionProps{
Runtime: awslambda.Runtime_GO_1_X(),
Environment: &map[string]*string{envVarName: targetDynamoDBTable.TableName()},
Entry: jsii.String(replicateFunctionDir)})
replicateUserFunction.AddEventSource(awslambdaeventsources.NewDynamoEventSource(sourceDynamoDBTable, &awslambdaeventsources.DynamoEventSourceProps{StartingPosition: awslambda.StartingPosition_LATEST, Enabled: jsii.Bool(true)}))
targetDynamoDBTable.GrantWriteData(replicateUserFunction)
Лямбда-функция — создание пользователя
Вы можете обратиться к коду лямбда-функции здесь
Логика функции довольно проста — она преобразует входящую полезную нагрузку JSON в Go struct
и затем вызывает DynamoDB PutItem API для сохранения данных.
func handler(ctx context.Context, req events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
payload := req.Body
var user User
err := json.Unmarshal([]byte(payload), &user)
if err != nil {//..handle}
item := make(map[string]types.AttributeValue)
item["email"] = &types.AttributeValueMemberS{Value: user.Email}
item["state"] = &types.AttributeValueMemberS{Value: user.State}
item["city"] = &types.AttributeValueMemberS{Value: user.City}
item["zipcode"] = &types.AttributeValueMemberN{Value: user.Zipcode}
item["active"] = &types.AttributeValueMemberBOOL{Value: true}
_, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
TableName: aws.String(table),
Item: item,
})
if err != nil {//..handle}
return events.APIGatewayV2HTTPResponse{StatusCode: http.StatusCreated}, nil
}
Лямбда-функция — репликация данных
Вы можете обратиться к коду лямбда-функции здесь
Обработчик функции репликации данных принимает в качестве параметра DynamoDBEvent. Он извлекает новую добавленную запись и создает новую запись, которая может быть сохранена в таблице target
DynamoDB. Тип данных для каждого атрибута проверяется и обрабатывается соответствующим образом. Хотя код показывает только типы String
и Boolean
, его можно использовать и для других типов данных DynamoDB, таких как Map
s, Set
s и т.д.
func handler(ctx context.Context, e events.DynamoDBEvent) {
for _, r := range e.Records {
item := make(map[string]types.AttributeValue)
for k, v := range r.Change.NewImage {
if v.DataType() == events.DataTypeString {
item[k] = &types.AttributeValueMemberS{Value: v.String()}
} else if v.DataType() == events.DataTypeBoolean {
item[k] = &types.AttributeValueMemberBOOL{Value: v.Boolean()}
}
}
_, err := client.PutItem(context.Background(), &dynamodb.PutItemInput{
TableName: aws.String(table),
Item: item})
if err != nil {//..handle}
}
}
Вот некоторые вещи, которые вы можете попробовать:
- Вставить больше данных в исходную таблицу — поищите способы массовой вставки в таблицу DynamoDB.
- Выполнять запросы в целевой таблице на основе
state
,city
или обоих.
Подведение итогов
В этом блоге вы увидели простой пример того, как использовать потоки DynamoDB для реагирования на изменения данных в таблице практически в реальном времени, используя комбинацию потоков DynamoDB и функций Lambda. Вы также использовали AWS CDK для развертывания всей инфраструктуры, включая шлюз API, функции Lambda, таблицы DynamoDB
, интеграции, а также сопоставления источников событий Lambda.
Все это было сделано с использованием языка программирования Go, который очень хорошо поддерживается в DynamoDB, AWS Lambda и AWS CDK.
Счастливого строительства!