В этой статье мы рассмотрим, как Amazon Simple Queue Service (SQS) помогает строить архитектуру, управляемую событиями, и как она делает ваше приложение более масштабируемым.
Основные части этой статьи:
- Что такое очередь
- О Amazon SQS (основные компоненты)
- Пример
Что такое очередь
Служба очереди сообщений — это система, которая помещает сообщение в очередь, предоставляя асинхронный протокол связи. В обслуживании очередей участвуют две стороны. Эти стороны называются отправитель и получатель.
SQS помогает вам разделить ваши компоненты, например, ваш компонент «A» посылает сообщение «B», а «B» не заботится о том, что происходит в «A», позже каждый компонент может быть заменен другим сервисом, таким образом, у вас будет очень независимая и гибкая архитектура.
Очередь используется, когда не требуется немедленная обработка данных, а также когда у вас есть общие ресурсы, и вы не хотите превышать нагрузку на ресурс.
Некоторые ключевые слова, которые нам необходимо знать:
- Производители: компоненты, которые отправляют сообщения в очередь
- Потребители: компоненты, получающие сообщения из очереди.
- Очередь: в которой хранятся сообщения
Об Amazon SQS
Amazon Simple Queue Service (SQS) — это полностью управляемый сервис очередей сообщений, который позволяет разделить и масштабировать микросервисы, распределенные системы и бессерверные приложения.
SQS предлагает два типа очередей сообщений. Стандартные очереди обеспечивают максимальную пропускную способность, упорядочивание по принципу best-effort и доставку по принципу at-least-once. Очереди SQS FIFO предназначены для гарантии того, что сообщения будут обработаны ровно один раз, в том же порядке, в котором они были отправлены.
Стандартная очередь:
- Неограниченная пропускная способность — стандартные очереди поддерживают практически неограниченное количество транзакций в секунду (TPS) для каждого действия API.
- At-Least-Once Delivery — сообщение доставляется как минимум один раз, но иногда доставляется более одной копии сообщения.
- Best-Effort Ordering — иногда сообщения могут быть доставлены в порядке, отличном от того, в котором они были отправлены.
Очередь FIFO:
- Ограниченная пропускная способность — 300 транзакций в секунду (TPS)
- Точная обработка — сообщение доставляется один раз и остается доступным до тех пор, пока потребитель не обработает и не удалит его. Дубликаты не попадают в очередь.
- First-In-First-Out Delivery — порядок отправки и получения сообщений строго сохраняется (т.е. First-In-First-Out).
📋 Примечание: для получения дополнительной информации о возможностях sqs вы можете посетить эту ссылку
Пример
В нашем примере мы будем иметь API, который запускает лямбда-функцию, после выполнения лямбды она помещает сообщение в очередь, которая позже запускает другую лямбда-функцию.
- Архитектура:
Первое, что вам нужно сделать, это создать очередь из SQS, в моем случае я использую AWS-CDK.
import { Construct, NestedStack, StackProps } from '@aws-cdk/core';
import { Queue } from '@aws-cdk/aws-sqs';
export class SQSStack extends NestedStack {
constructor(app: Construct, id: string, props?: StackProps) {
super(app, id);
const MyQueue = new Queue(app, 'my-queue', {
queueName: 'my-queue'
});
}
}
Для простоты статьи я просто передаю параметр queueName, для более глубокого изучения всех опций, которые вы можете настроить, посмотрите CDK Docs for SQS в разделе Construct Props.
Давайте добавим необходимые разрешения
iamRoleStatements:
- Effect: Allow
Action:
- "sqs:SendMessage"
Resource:
- "arn:aws:sqs:${env:region}:${env:accountId}:my-queue"
- Effect: Allow
Action:
- "s3:PutObject"
Resource:
- "arn:aws:s3:${env:region}:${env:accountId}:${env:bucket}/*"
Обратите внимание, что я добавляю минимальные разрешения, необходимые моим лямбда-функциям для выполнения работы
Файл .env
содержит необходимые параметры, такие как id моего аккаунта, регион, в котором я хочу развернуть свои сервисы, имя моего ведра и т.д…
my-queue
— это имя очереди, которую мы создали ранее
- routes.yml
addMessage:
handler: src/modules/Queue/controller/lambda.addMessage
events:
- http:
method: post
path: queue/message
cors: true
consumeMessage:
handler: src/modules/Queue/controller/lambda.getMessage
events:
- sqs:
arn: arn:aws:sqs:${env:region}:${env:accountId}:my-queue
batchSize: 5
Как мы видим, у нас есть 2 API, первый просто запускает лямбда-функцию addMessage
, которая отправляет сообщение в очередь, как только мы получим сообщение, будет выполнена вторая функция consumeMessage
.
Размер пакета — это количество записей, которые нужно отправить в функцию в каждом пакете. Для стандартной очереди это может быть до 10 000 записей. Для очереди FIFO максимальное значение равно 10.
- Наши лямбда-функции:
const { putMessage } = require('../service/queue.service');
const { putObject } = require('../service/s3.service');
function Response(statusCode, data) {
return {
statusCode,
body: JSON.stringify(data, null, 2),
};
}
module.exports.addMessage = async (event) => {
try {
console.log(event);
const body = JSON.parse(event.body);
const putMessageResult = await putMessage(body);
console.log('putMessageResult =>', putMessageResult);
const {
MessageId,
} = putMessageResult;
// save data in DynamoDB
return Response(200, {
message: 'Message sent to Queue',
messageId: MessageId,
});
} catch (error) {
console.log(error);
return Response(500, {
message: 'Something went wrong'
});
}
};
module.exports.getMessage = async (event) => {
try {
console.log(event);
await Promise.all(event.Records.map(async (item) => {
const body = JSON.parse(item.body);
const putObjectResult = await putObject(body);
console.log('putObjectResult =>', putObjectResult);
}));
} catch (error) {
console.log(error);
}
};
📋 Примечание: Чтобы увидеть, как можно добавить запись в DynamoDB, вы можете посмотреть этот блог, где мы глубоко погружаемся в работу EventBridge, а в статье я добавляю некоторые данные в DynamoDB
const AWS = require('aws-sdk');
const sqs = new AWS.SQS();
module.exports.putMessage = (body) => {
const QueueUrl = `https://sqs.${process.env.region}.amazonaws.com/${process.env.accountId}/my-queue`;
const params = {
MessageBody: JSON.stringify(body),
QueueUrl,
};
return sqs.sendMessage(params).promise();
}
Мы можем видеть, как мы отправляем сообщение в очередь, используя AWS-SDK, функция sendMessage
принимает атрибут MessageBody, который содержит данные, которые мы хотим отправить, и url нашей очереди.
Чтобы сохранить наши JSON данные в S3, я также использую AWS-SDK, с функцией putObject
, которая принимает определенные параметры, как написано ниже
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const { v4: uuidv4 } = require('uuid');
module.exports.putObject = (body) => {
const Key = `${uuidv4()}.json`;
const ContentType = "application/json";
return s3.putObject({
Bucket: process.env.bucket,
Key,
Body: JSON.stringify(body),
ContentType
}).promise();
}
Теперь давайте протестируем нашу систему, сначала я отправлю тело, написанное ниже, из postman, используя конечную точку нашего шлюза API, который будет создан после развертывания нашей системы.
{
"title": "this is test queue",
"data": "some description goes here"
}
После вызова API вы можете проверить в CloudWatch, что была запущена лямбда-функция consumeMessage, которая создаст json файл в S3 bucket
- Внутри нашего ведра S3:
После загрузки файла вы увидите данные, которые мы сохранили из нашей лямбда-функции и которые содержат полезную нагрузку нашего тела.
Заключение
Как мы видим, SQS является очень важной функцией, а с AWS она упрощает соединение ваших сервисов друг с другом.
После использования очередей ваша архитектура станет более простой в управлении и готовой к масштабированию.
Эта статья является частью серии «Messaging with Serverless», которую я пишу уже некоторое время, если вам интересно прочитать другие предложения Serverless от AWS, не стесняйтесь посетить ссылки ниже