Разделение компонентов приложения с помощью Amazon SQS | Serverless

В этой статье мы рассмотрим, как Amazon Simple Queue Service (SQS) помогает строить архитектуру, управляемую событиями, и как она делает ваше приложение более масштабируемым.

Основные части этой статьи:

  1. Что такое очередь
  2. О Amazon SQS (основные компоненты)
  3. Пример

Что такое очередь

Служба очереди сообщений — это система, которая помещает сообщение в очередь, предоставляя асинхронный протокол связи. В обслуживании очередей участвуют две стороны. Эти стороны называются отправитель и получатель.

SQS помогает вам разделить ваши компоненты, например, ваш компонент «A» посылает сообщение «B», а «B» не заботится о том, что происходит в «A», позже каждый компонент может быть заменен другим сервисом, таким образом, у вас будет очень независимая и гибкая архитектура.

Очередь используется, когда не требуется немедленная обработка данных, а также когда у вас есть общие ресурсы, и вы не хотите превышать нагрузку на ресурс.

Некоторые ключевые слова, которые нам необходимо знать:

  • Производители: компоненты, которые отправляют сообщения в очередь
  • Потребители: компоненты, получающие сообщения из очереди.
  • Очередь: в которой хранятся сообщения

Об Amazon SQS

Amazon Simple Queue Service (SQS) — это полностью управляемый сервис очередей сообщений, который позволяет разделить и масштабировать микросервисы, распределенные системы и бессерверные приложения.

SQS предлагает два типа очередей сообщений. Стандартные очереди обеспечивают максимальную пропускную способность, упорядочивание по принципу best-effort и доставку по принципу at-least-once. Очереди SQS FIFO предназначены для гарантии того, что сообщения будут обработаны ровно один раз, в том же порядке, в котором они были отправлены.

Стандартная очередь:

  • Неограниченная пропускная способность — стандартные очереди поддерживают практически неограниченное количество транзакций в секунду (TPS) для каждого действия API.
  • At-Least-Once Delivery — сообщение доставляется как минимум один раз, но иногда доставляется более одной копии сообщения.
  • Best-Effort Ordering — иногда сообщения могут быть доставлены в порядке, отличном от того, в котором они были отправлены.

Очередь FIFO:

  • Ограниченная пропускная способность — 300 транзакций в секунду (TPS)
  • Точная обработка — сообщение доставляется один раз и остается доступным до тех пор, пока потребитель не обработает и не удалит его. Дубликаты не попадают в очередь.
  • First-In-First-Out Delivery — порядок отправки и получения сообщений строго сохраняется (т.е. First-In-First-Out).

📋 Примечание: для получения дополнительной информации о возможностях sqs вы можете посетить эту ссылку

Пример

В нашем примере мы будем иметь API, который запускает лямбда-функцию, после выполнения лямбды она помещает сообщение в очередь, которая позже запускает другую лямбда-функцию.

  • Архитектура:

Первое, что вам нужно сделать, это создать очередь из SQS, в моем случае я использую AWS-CDK.

import { Construct, NestedStack, StackProps } from '@aws-cdk/core';
import { Queue } from '@aws-cdk/aws-sqs';

export class SQSStack extends NestedStack {
  constructor(app: Construct, id: string, props?: StackProps) {
    super(app, id);

    const MyQueue = new Queue(app, 'my-queue', {
      queueName: 'my-queue'
    });

  }
}
Войдите в полноэкранный режим Выйти из полноэкранного режима

Для простоты статьи я просто передаю параметр queueName, для более глубокого изучения всех опций, которые вы можете настроить, посмотрите CDK Docs for SQS в разделе Construct Props.

Давайте добавим необходимые разрешения

iamRoleStatements:
    - Effect: Allow
      Action:
        - "sqs:SendMessage"
      Resource: 
        - "arn:aws:sqs:${env:region}:${env:accountId}:my-queue"
    - Effect: Allow
      Action:
        - "s3:PutObject"
      Resource:
        - "arn:aws:s3:${env:region}:${env:accountId}:${env:bucket}/*"
Войти в полноэкранный режим Выйти из полноэкранного режима

Обратите внимание, что я добавляю минимальные разрешения, необходимые моим лямбда-функциям для выполнения работы

Файл .env содержит необходимые параметры, такие как id моего аккаунта, регион, в котором я хочу развернуть свои сервисы, имя моего ведра и т.д…

my-queue — это имя очереди, которую мы создали ранее

  • routes.yml
addMessage:
  handler: src/modules/Queue/controller/lambda.addMessage
  events:
    - http:
        method: post
        path: queue/message
        cors: true
consumeMessage:
  handler: src/modules/Queue/controller/lambda.getMessage
  events:
    - sqs:
        arn: arn:aws:sqs:${env:region}:${env:accountId}:my-queue
        batchSize: 5
Вход в полноэкранный режим Выйти из полноэкранного режима

Как мы видим, у нас есть 2 API, первый просто запускает лямбда-функцию addMessage, которая отправляет сообщение в очередь, как только мы получим сообщение, будет выполнена вторая функция consumeMessage.

Размер пакета — это количество записей, которые нужно отправить в функцию в каждом пакете. Для стандартной очереди это может быть до 10 000 записей. Для очереди FIFO максимальное значение равно 10.

  • Наши лямбда-функции:
const { putMessage } = require('../service/queue.service');
const { putObject } = require('../service/s3.service');

function Response(statusCode, data) {
  return {
    statusCode,
    body: JSON.stringify(data, null, 2),
  };
}

module.exports.addMessage = async (event) => {
  try {
    console.log(event);
    const body = JSON.parse(event.body);
    const putMessageResult = await putMessage(body);
    console.log('putMessageResult =>', putMessageResult);

    const {
      MessageId,
    } = putMessageResult;

    // save data in DynamoDB

    return Response(200, {
      message: 'Message sent to Queue',
      messageId: MessageId,
    });
  } catch (error) {
    console.log(error);
    return Response(500, {
      message: 'Something went wrong'
    });
  }
};

module.exports.getMessage = async (event) => {
  try {
    console.log(event);
    await Promise.all(event.Records.map(async (item) => {
      const body = JSON.parse(item.body);

      const putObjectResult = await putObject(body);
      console.log('putObjectResult =>', putObjectResult);

    }));
  } catch (error) {
    console.log(error);
  }
};
Вход в полноэкранный режим Выход из полноэкранного режима

📋 Примечание: Чтобы увидеть, как можно добавить запись в DynamoDB, вы можете посмотреть этот блог, где мы глубоко погружаемся в работу EventBridge, а в статье я добавляю некоторые данные в DynamoDB

const AWS = require('aws-sdk');
const sqs = new AWS.SQS();

module.exports.putMessage = (body) => {
  const QueueUrl = `https://sqs.${process.env.region}.amazonaws.com/${process.env.accountId}/my-queue`;
  const params = {
    MessageBody: JSON.stringify(body),
    QueueUrl,
  };
  return sqs.sendMessage(params).promise();
}
Войдите в полноэкранный режим Выйти из полноэкранного режима

Мы можем видеть, как мы отправляем сообщение в очередь, используя AWS-SDK, функция sendMessage принимает атрибут MessageBody, который содержит данные, которые мы хотим отправить, и url нашей очереди.

Чтобы сохранить наши JSON данные в S3, я также использую AWS-SDK, с функцией putObject, которая принимает определенные параметры, как написано ниже

const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const { v4: uuidv4 } = require('uuid');

module.exports.putObject = (body) => {
  const Key = `${uuidv4()}.json`;
  const ContentType = "application/json";

  return s3.putObject({
    Bucket: process.env.bucket,
    Key,
    Body: JSON.stringify(body),
    ContentType
  }).promise();
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь давайте протестируем нашу систему, сначала я отправлю тело, написанное ниже, из postman, используя конечную точку нашего шлюза API, который будет создан после развертывания нашей системы.

{
    "title": "this is test queue",
    "data": "some description goes here"
}
Войти в полноэкранный режим Выйти из полноэкранного режима

После вызова API вы можете проверить в CloudWatch, что была запущена лямбда-функция consumeMessage, которая создаст json файл в S3 bucket

  • Внутри нашего ведра S3:

После загрузки файла вы увидите данные, которые мы сохранили из нашей лямбда-функции и которые содержат полезную нагрузку нашего тела.

Заключение

Как мы видим, SQS является очень важной функцией, а с AWS она упрощает соединение ваших сервисов друг с другом.

После использования очередей ваша архитектура станет более простой в управлении и готовой к масштабированию.

Эта статья является частью серии «Messaging with Serverless», которую я пишу уже некоторое время, если вам интересно прочитать другие предложения Serverless от AWS, не стесняйтесь посетить ссылки ниже

Оцените статью
devanswers.ru
Добавить комментарий