Узнайте, как использовать потоки DynamoDB с AWS Lambda и Go

Репликация данных DynamoDB из одной таблицы в другую

Эта статья блога поможет вам быстро начать работу с DynamoDB Streams и AWS Lambda с помощью Go. В ней будет рассказано о том, как развернуть все решение с помощью AWS CDK.

Представленный здесь сценарий использования довольно прост. Есть несколько таблиц DynamoDB, и цель состоит в том, чтобы получить данные в одной из этих таблиц (также называемой таблицей source) и реплицировать их в другую таблицу (также называемую таблицей target), чтобы она могла обслуживать различные запросы. Для демонстрации сквозного потока существует также Amazon API Gateway, который выполняет функцию Lambda, сохраняющую данные в исходной таблице DynamoDB. Изменения в этой таблице вызовут другую функцию Lambda (благодаря DynamoDB Streams), которая окончательно реплицирует данные в целевую таблицу.

Глобальный или локальный вторичный индекс предоставляют аналогичные возможности.

Теперь, когда у вас есть общее представление о том, чего мы пытаемся достичь…

…давайте перейдем непосредственно к делу!

Прежде чем продолжить, убедитесь, что у вас установлен язык программирования Go (v1.16 или выше) и AWS CDK.

Клонируйте проект и перейдите в нужную директорию:

git clone https://github.com/abhirockzz/dynamodb-streams-lambda-golang

cd cdk
Войдите в полноэкранный режим Выйдите из полноэкранного режима

Чтобы начать развертывание…

… все, что вам нужно сделать, это выполнить одну команду (cdk deploy) и немного подождать. Вы увидите список ресурсов, которые будут созданы, и вам нужно будет дать свое подтверждение, чтобы продолжить.

Не волнуйтесь, в следующем разделе я объясню, что происходит.

cdk deploy

# output

Bundling asset DynamoDBStreamsLambdaGolangStack/ddb-streams-function/Code/Stage...

✨  Synthesis time: 5.94s

This deployment will make potentially sensitive changes according to your current security approval level (--require-approval broadening).
Please confirm you intend to make the following modifications:

//.... omitted

Do you wish to deploy these changes (y/n)? y
Вход в полноэкранный режим Выйти из полноэкранного режима

Это начнет создание ресурсов AWS, необходимых для нашего приложения.

Если вы хотите увидеть шаблон AWS CloudFormation, который будет использоваться за сценой, запустите cdk synth и проверьте папку cdk.out.

Вы можете следить за прогрессом в терминале или перейти в консоль AWS: CloudFormation > Stacks > DynamoDBStreamsLambdaGolangStack.

Как только все ресурсы будут созданы, вы можете опробовать приложение. У вас должно быть:

  • Две функции Lambda
  • Две таблицы DynamoDB (исходная и целевая)
  • Один API-шлюз (также маршрут, интеграция)
  • и некоторые другие (например, роли IAM и т.д.).

Прежде чем продолжить, получите конечную точку API Gateway, которую вам нужно будет использовать. Она доступна в выводе стека (в терминале или на вкладке Outputs в консоли AWS CloudFormation для вашего стека):

Конечное решение…

Начните с создания нескольких пользователей в (исходной) таблице DynamoDB.

Для этого вызовите конечную точку API Gateway (HTTP) с соответствующей полезной нагрузкой JSON:

# export the API Gateway endpoint
export APIGW_ENDPOINT=<replace with API gateway endpoint above>

# for example:
export APIGW_ENDPOINT=https://gy8gxsx9x7.execute-api.us-east-1.amazonaws.com/

# invoke the endpoint with JSON data

curl -i -X POST -d '{"email":"user1@foo.com", "state":"New York","city":"Brooklyn","zipcode": "11208"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user2@foo.com", "state":"New York","city":"Staten Island","zipcode": "10314"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user3@foo.com", "state":"Ohio","city":"Cincinnati","zipcode": "45201"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user4@foo.com", "state":"Ohio","city":"Cleveland","zipcode": "44101"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT

curl -i -X POST -d '{"email":"user5@foo.com", "state":"California","city":"Los Angeles","zipcode": "90003"}' -H 'Content-Type: application/json' $APIGW_ENDPOINT
Войти в полноэкранный режим Выйти из полноэкранного режима

Перейдите к таблице DynamoDB в консоли AWS и убедитесь, что записи были созданы:

Если у вас под рукой AWS CLI, вы также можете попробовать aws dynamodb scan --table <name of table>.

Если все идет хорошо, наша функция репликации также должна работать. Чтобы убедиться в этом, необходимо проверить целевую таблицу DynamoDB.

Обратите внимание, что атрибут zipcode отсутствует — это сделано специально для данного демо. Вы можете сами выбрать атрибуты, которые хотите включить в целевую таблицу, и написать логику своей функции соответствующим образом.

Целевая таблица DynamoDB имеет state в качестве ключа раздела и city в качестве ключа сортировки, вы можете запрашивать ее другим способом (по сравнению с исходной таблицей, которую вы могли запрашивать только на основе email).

Не забудьте очистить!

После завершения работы, чтобы удалить все сервисы, просто используйте:

cdk destroy

#output prompt (choose 'y' to continue)

Are you sure you want to delete: DynamoDBStreamsLambdaGolangStack (y/n)?
Войти в полноэкранный режим Выйти из полноэкранного режима

Потрясающе! Вы смогли настроить и опробовать готовое решение. Прежде чем мы закончим, давайте быстро пройдемся по некоторым важным частям кода, чтобы лучше понять, что происходит за кулисами.

Краткое описание кода

Поскольку мы сосредоточимся только на важных битах, большая часть кода (операторы печати, обработка ошибок и т.д.) опущена/закомментирована для краткости.

Infra-IS-код с AWS CDK и Go!

Вы можете обратиться к коду CDK здесь

Начнем с создания таблицы DynamoDB и убедимся, что включена функция DynamoDB Streams.

    sourceDynamoDBTable := awsdynamodb.NewTable(stack, jsii.String("source-dynamodb-table"),
        &awsdynamodb.TableProps{
            PartitionKey: &awsdynamodb.Attribute{
                Name: jsii.String("email"),
                Type: awsdynamodb.AttributeType_STRING},
            Stream: awsdynamodb.StreamViewType_NEW_AND_OLD_IMAGES})

    sourceDynamoDBTable.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)
Войдите в полноэкранный режим Выход из полноэкранного режима

Затем мы обрабатываем функцию Lambda (она позаботится о создании и развертывании функции) и убеждаемся, что предоставили ей соответствующие разрешения на запись в таблицу DynamoDB.

    createUserFunction := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("create-function"),
        &awscdklambdagoalpha.GoFunctionProps{
            Runtime:     awslambda.Runtime_GO_1_X(),
            Environment: &map[string]*string{envVarName: sourceDynamoDBTable.TableName()},
            Entry:       jsii.String(createFunctionDir)})

    sourceDynamoDBTable.GrantWriteData(createUserFunction)
Вход в полноэкранный режим Выход из полноэкранного режима

Шлюз API (HTTP API) создан, вместе с интеграцией HTTP-Lambda Function, а также соответствующим маршрутом.

    api := awscdkapigatewayv2alpha.NewHttpApi(stack, jsii.String("http-api"), nil)

    createFunctionIntg := awscdkapigatewayv2integrationsalpha.NewHttpLambdaIntegration(jsii.String("create-function-integration"), createUserFunction, nil)

    api.AddRoutes(&awscdkapigatewayv2alpha.AddRoutesOptions{
        Path:        jsii.String("/"),
        Methods:     &[]awscdkapigatewayv2alpha.HttpMethod{awscdkapigatewayv2alpha.HttpMethod_POST},
        Integration: createFunctionIntg})
Войти в полноэкранный режим Выход из полноэкранного режима

Нам также нужна таблица target DynamoDB — обратите внимание, что эта таблица имеет составной первичный ключ (state и city):

    targetDynamoDBTable := awsdynamodb.NewTable(stack, jsii.String("target-dynamodb-table"),
        &awsdynamodb.TableProps{
            PartitionKey: &awsdynamodb.Attribute{
                Name: jsii.String("state"),
                Type: awsdynamodb.AttributeType_STRING},
            SortKey: &awsdynamodb.Attribute{
                Name: jsii.String("city"),
                Type: awsdynamodb.AttributeType_STRING},
        })

    targetDynamoDBTable.ApplyRemovalPolicy(awscdk.RemovalPolicy_DESTROY)
Войдите в полноэкранный режим Выход из полноэкранного режима

Наконец, мы создаем вторую функцию Lambda, которая отвечает за репликацию данных, предоставляем ей нужные разрешения и, самое главное, добавляем DynamoDB в качестве источника событий.

    replicateUserFunction := awscdklambdagoalpha.NewGoFunction(stack, jsii.String("replicate-function"),
        &awscdklambdagoalpha.GoFunctionProps{
            Runtime:     awslambda.Runtime_GO_1_X(),
            Environment: &map[string]*string{envVarName: targetDynamoDBTable.TableName()},
            Entry:       jsii.String(replicateFunctionDir)})

    replicateUserFunction.AddEventSource(awslambdaeventsources.NewDynamoEventSource(sourceDynamoDBTable, &awslambdaeventsources.DynamoEventSourceProps{StartingPosition: awslambda.StartingPosition_LATEST, Enabled: jsii.Bool(true)}))

    targetDynamoDBTable.GrantWriteData(replicateUserFunction)
Вход в полноэкранный режим Выход из полноэкранного режима

Лямбда-функция — создание пользователя

Вы можете обратиться к коду лямбда-функции здесь

Логика функции довольно проста — она преобразует входящую полезную нагрузку JSON в Go struct и затем вызывает DynamoDB PutItem API для сохранения данных.

func handler(ctx context.Context, req events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
    payload := req.Body
    var user User

    err := json.Unmarshal([]byte(payload), &user)
    if err != nil {//..handle}

    item := make(map[string]types.AttributeValue)

    item["email"] = &types.AttributeValueMemberS{Value: user.Email}
    item["state"] = &types.AttributeValueMemberS{Value: user.State}
    item["city"] = &types.AttributeValueMemberS{Value: user.City}
    item["zipcode"] = &types.AttributeValueMemberN{Value: user.Zipcode}
    item["active"] = &types.AttributeValueMemberBOOL{Value: true}

    _, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
        TableName: aws.String(table),
        Item:      item,
    })

    if err != nil {//..handle}
    return events.APIGatewayV2HTTPResponse{StatusCode: http.StatusCreated}, nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

Лямбда-функция — репликация данных

Вы можете обратиться к коду лямбда-функции здесь

Обработчик функции репликации данных принимает в качестве параметра DynamoDBEvent. Он извлекает новую добавленную запись и создает новую запись, которая может быть сохранена в таблице target DynamoDB. Тип данных для каждого атрибута проверяется и обрабатывается соответствующим образом. Хотя код показывает только типы String и Boolean, его можно использовать и для других типов данных DynamoDB, таких как Maps, Sets и т.д.

func handler(ctx context.Context, e events.DynamoDBEvent) {
    for _, r := range e.Records {
        item := make(map[string]types.AttributeValue)

        for k, v := range r.Change.NewImage {
            if v.DataType() == events.DataTypeString {
                item[k] = &types.AttributeValueMemberS{Value: v.String()}
            } else if v.DataType() == events.DataTypeBoolean {
                item[k] = &types.AttributeValueMemberBOOL{Value: v.Boolean()}
            }
        }

        _, err := client.PutItem(context.Background(), &dynamodb.PutItemInput{
            TableName: aws.String(table),
            Item:      item})

        if err != nil {//..handle}
    }
}
Вход в полноэкранный режим Выход из полноэкранного режима

Вот некоторые вещи, которые вы можете попробовать:

  • Вставить больше данных в исходную таблицу — поищите способы массовой вставки в таблицу DynamoDB.
  • Выполнять запросы в целевой таблице на основе state, city или обоих.

Подведение итогов

В этом блоге вы увидели простой пример того, как использовать потоки DynamoDB для реагирования на изменения данных в таблице практически в реальном времени, используя комбинацию потоков DynamoDB и функций Lambda. Вы также использовали AWS CDK для развертывания всей инфраструктуры, включая шлюз API, функции Lambda, таблицы DynamoDB, интеграции, а также сопоставления источников событий Lambda.

Все это было сделано с использованием языка программирования Go, который очень хорошо поддерживается в DynamoDB, AWS Lambda и AWS CDK.

Счастливого строительства!

Оцените статью
devanswers.ru
Добавить комментарий