Понимание Amazon SQS с помощью Python и Django — часть 1


Цели

  • Познакомьтесь с Amazon SQS и очередями.
  • Изучить различные типы очередей и их различия.
  • Понять, как Amazon SQS будет использоваться в разделенном приложении или сервисе.
  • Создание простого приложения Django, интегрирующего Amazon SQS.

Предварительные условия

  • Хорошее знание языка python.
  • Знание Django Rest Framework.

Что такое Amazon SQS?

Amazon Simple Queue Service (SQS) — это полностью управляемый сервис очередей сообщений, который позволяет разделить и масштабировать микросервисы, распределенные системы и бессерверные приложения. Проще говоря, это сервис, предоставляемый AWS, который использует структуру данных очереди для хранения полученных сообщений до тех пор, пока они не будут готовы к извлечению.

Что такое очередь?

Очередь — это структура данных, которая принимает и возвращает данные в порядке F.I.F.O (First in First Out). Это то же самое, что и очередь в реальном мире: кто первым входит в очередь, тот первым и выходит.

Очереди SQS

Очереди SQS — это очереди, в которые мы отправляем сообщения (JSON, XML, e.t.c.), а затем опрашиваем, чтобы получить эти сообщения. Служба, отправляющая сообщение, называется производителем, а служба, опрашивающая сообщение, называется потребителем. Ограничение на размер одного сообщения составляет 256 КБ.

Изображение с сайта dzone.com

Amazon SQS предоставляет нам два типа очередей сообщений:

  1. Стандартная очередь: Эти очереди обеспечивают доставку сообщений по принципу «минимум один раз», это означает, что все сообщения доставляются минимум один раз, однако некоторые сообщения могут быть доставлены более одного раза. Она также обеспечивает упорядочивание по принципу best-effort, что означает, что сообщения могут быть доставлены не в том порядке, в котором они были получены.
  2. Очередь FIFO (First-in-First-out): Это очереди, предназначенные для доставки «только один раз», то есть все сообщения будут обработаны ровно один раз. Очередь FIFO также гарантирует упорядоченную доставку, т.е. ПЕРВОЕ сообщение ВХОДИТ, ПЕРВОЕ сообщение ВЫХОДИТ (FIFO).

Глядя на вышеприведенные объяснения, мы можем спросить себя, почему именно мы хотим использовать стандартную очередь? Ну,

  1. Стандартные очереди позволяют выполнять практически неограниченное количество транзакций в секунду, в то время как очереди FIFO позволяют обрабатывать не более 300 сообщений в секунду без пакетной обработки и 3000 с пакетной обработкой.
  2. Стандартная очередь в настоящее время поддерживается во всех регионах AWS, а FIFO, на момент написания статьи, нет.

Оба варианта подходят для разных случаев использования. Очередь FIFO хорошо подходит для приложений, которые сильно зависят от порядка сообщений и имеют низкую толерантность к дублирующимся сообщениям. Стандартная очередь подходит для приложений, которые могут обрабатывать дублирующиеся и неупорядоченные сообщения.

Преимущества Amazon SQS

  • Повышение производительности — SQS позволяет осуществлять асинхронную связь между различными частями приложения.
  • Повышение надежности — Если служба-потребитель выходит из строя или терпит крах. Поскольку сообщения все еще находятся в очереди, она сможет снова их принять, когда снова заработает. Это приводит к повышению надежности.
  • Масштабируемость — «стандартная» очередь SQS допускает практически неограниченное количество транзакций сообщений в секунду. Это облегчает масштабирование вашей службы от тысяч запросов в секунду до миллионов.
  • Буфер запросов — SQS помогает предотвратить перегрузку сервиса безумным количеством запросов. С помощью SQS сервис может выбрать, сколько запросов он хочет обрабатывать в любой момент времени, и SQS никогда не вернет сервису больше этого количества.

Пример использования Amazon SQS

Представьте, что у вас есть служба, которая в настоящее время:

  1. Принимает от пользователя URL-пути к большим файлам.
  2. Выполняет некоторую трудоемкую обработку файла и сохраняет результат в базе данных.
  3. И в конечном итоге возвращает результат пользователю.

Какие здесь возможны проблемы?

  1. Если служба B будет перегружена большим количеством файлов для обработки, это может замедлить работу сервера, что в конечном итоге приведет к его падению.
  2. Если весь сервер выйдет из строя, это приведет к отказу всех пользователей, использующих эту службу для обработки файлов.

Как можно использовать amazon SQS для решения этой проблемы? Используя SQS, мы можем вынести нашу функцию обработки файлов и разместить ее на новом выделенном сервере, тем самым «отделив» службу A от функции обработки файлов (служба B).

Теперь поток услуг с использованием SQS может быть следующим:

  1. Пользователь посылает запрос на наш сервер 1 с указанием пути к файлу для обработки.
  2. Служба A сохраняет имя файла и путь к файлу в базе данных с полем статуса «Pending» и получает ID нового сохраненного объекта.
  3. Служба A отправляет сообщение в очередь, содержащее ID сохраненного объекта, после чего возвращает ответ пользователю, указывающий, что файл теперь находится в очереди на обработку.
  4. Служба B опрашивает определенное количество сообщений из очереди, чтобы обрабатывать их в своем собственном темпе (это предотвращает перегрузку и падение сервера). Затем она получает ID из этих сообщений.
  5. Служба B использует ID, чтобы получить сохраненный объект из базы данных, получить путь к файлу и начать его обработку.
  6. После успеха или неудачи он обновляет хранимый объект до соответствующего статуса (Succeeded/Failed).

Сервер 1 также имеет конечную точку, которую пользователи могут использовать для проверки статуса запроса на обработку файла и его данных, если он уже был успешно обработан.

Настройка проекта

Мы будем создавать пример службы обработки файлов. Для этого нам потребуется создать учетную запись aws и получить ключи доступа aws access и secret access из консоли. После получения ключей храните их в безопасном месте.

Давайте также создадим нашу первую очередь. С помощью строки поиска перейдите к Amazon SQS.

Теперь на странице SQS нажмите на Create queue. Мы попадаем на страницу создания очереди SQS и видим несколько предложенных вариантов.

Для этого руководства выберите FIFO Queue, а затем введите имя очереди. Обратите внимание, что имена очередей FIFO должны заканчиваться на .fifo. Найдите опцию Content-based deduplication и включите ее (Это помогает предотвратить получение потребителями возможных дубликатов сообщений из очереди). Оставьте все остальные опции по умолчанию и нажмите Создать очередь внизу. Скопируйте имя очереди, в моем случае MyFileProcessingQueue.fifo, и сохраните его где-нибудь, мы будем использовать его позже. Также обратите внимание на имя региона, в котором создана очередь, его можно посмотреть в правом верхнем углу консоли. Для меня это будет us-east-1.

Теперь перейдем к части Django, давайте начнем с создания новой директории и настройки нового проекта.

$ mkdir amazon-sqs-django && cd amazon-sqs-django
$ python3.8 -m venv venv # --> Create virtual environment.
$ source venv/bin/activate # --> Activate virtual environment.

(venv)$ pip install django==4.0.5 djangorestframework==3.13.1
(venv)$ django-admin startproject core .

Вход в полноэкранный режим Выход из полноэкранного режима

Мы также установим boto3, чтобы облегчить взаимодействие с нашей очередью SQS.

(venv)$ pip install boto3==1.24.20

Войти в полноэкранный режим Выход из полноэкранного режима

Далее создадим приложение Django под названием file, которое будет содержать модели и логику для всего сервиса A.

(venv)$ python manage.py startapp file

Войти в полноэкранный режим Выход из полноэкранного режима

Зарегистрируйте приложение в core/settings.py внутри INSTALLED_APPS:

# *core/settings.py*

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    'file', # new
]

Войти в полноэкранный режим Выйти из полноэкранного режима

Создание нашей модели файла

Внутри файла file/models.py добавьте следующее

# *file/models.py*

...
class File(models.Model):
    class FileStatus(models.IntegerChoices):
        PENDING = 0
        PROCESSING = 1
        PROCESSED = 2
        FAILED = 3

    lines_count = models.IntegerField(null=True)
    file_size = models.IntegerField(null=True)
    file_path = models.CharField(max_length=120)
    status = models.IntegerField(choices=FileStatus.choices, default=FileStatus.PENDING)
...

Вход в полноэкранный режим Выйти из полноэкранного режима

Эта модель будет хранить некоторую базовую информацию, которую мы «обрабатываем» из файлов. Статус показывает, на какой стадии обработки находится файл. У нас есть четыре типа статусов:

  1. PENDING : Это означает, что файл еще не был принят для обработки.
  2. PROCESSING : Файл был принят, и в настоящее время над ним ведется работа.
  3. ОБРАБОТАН Файл был успешно обработан; никаких проблем не возникло.
  4. FAILED : Что-то где-то пошло не так; файл не может быть обработан правильно.

Создание представления

Внутри файла views.py добавим следующее

import boto3

from rest_framework.views import APIView
from .models import File

session = boto3.Session(
    aws_access_key_id='<AWS_ACCESS_KEY_ID>', # replace with your key
    aws_secret_access_key='<AWS_SECRET_ACCESS_KEY>', # replace with your key
)
sqs = session.resource('sqs', region_name='<AWS_REGION_NAME>') # replace with your region name

class FileView(APIView):
    """
    Process file and saves its data.
    :param file_path: path to file on a remote or local server
    :return: status
    """
    def post(self, request):
        file_path = request.data.get('file_path')
        file_obj = File.objects.create(file_path=file_path) # save file unprocessed.

        # Get our recently created queue.
        queue = sqs.get_queue_by_name(QueueName="MyFileProcessingQueue.fifo")

Войти в полноэкранный режим Выйти из полноэкранного режима

Используя boto3, мы создаем класс утилиты sqs для взаимодействия с ресурсом Amazon SQS. Затем мы создаем представление и теперь внутри его метода post получаем от пользователя путь к файлу и сохраняем его в базе данных. Сохраненный объект File имеет статус по умолчанию PENDING, который мы ранее установили в классе модели. Мы также получаем нашу очередь по ее имени, которое в моем случае MyFileProcessingQueue.fifo.

Чтобы отправить сообщение в нашу очередь, давайте обновим представление

import json # new
import boto3

from rest_framework.views import APIView
from rest_framework.response import Response # new
from .models import File

.....

class FileView(APIView):

    def post(self, request):
        .....
        message_body = {
            'file_id': str(file_obj.id)
        }

        # Send a message to the queue, so we can process this particular file eventually.
        response = queue.send_message(
            MessageBody=json.dumps(message_body),
            MessageGroupId='messageGroupId'
        )

        # Let the user know the file has been sent to the queue and is PENDING processing.
        return Response({"message": "File has been scheduled for processing..."}, 
    status=200)

Войдите в полноэкранный режим Выйти из полноэкранного режима

Здесь,

  1. Мы отправляем sqs-сообщение, содержащее сохраненный нами идентификатор объекта File, чтобы впоследствии использовать его для извлечения объекта из базы данных и получения пути к обрабатываемому файлу. ПараметрMessageGroupId гарантирует, что все сообщения с одним и тем же MessageGroupId будут обрабатываться в порядке FIFO. Обычно этот параметр задается чем-то уникальным, например, идентификатором пользователя или идентификатором сессии, но для простоты мы используем строку ‘messageGroupId’. Вы можете узнать больше о возможных параметрах метода send_message.
  2. После отправки сообщения в нашу очередь SQS. Мы немедленно отправляем ответ пользователю, сообщая ему, что его файл теперь находится в очереди на обработку, так что он может спокойно заниматься своими делами с нашим сервисом.

Теперь ваше представление должно выглядеть примерно так:

# *file/views.py*
import json
import boto3

from rest_framework.views import APIView
from rest_framework.response import Response
from .models import File

session = boto3.Session(
    aws_access_key_id='<AWS_ACCESS_KEY_ID>', # replace with your key
    aws_secret_access_key='<AWS_SECRET_ACCESS_KEY>', # replace with your key
)
sqs = session.resource('sqs', region_name='<AWS_REGION_NAME>') # replace with your region name

class FileView(APIView):
    """
    Process file and saves its data.
    :param file_path: path to file.
    :return: status
    """
    def post(self, request):
        file_path = request.data.get('file_path')
        file_obj = File.objects.create(file_path=file_path) # save file unprocessed.

        # Get our recently created queue.
        queue = sqs.get_queue_by_name(QueueName="MyFileProcessingQueue.fifo")
        message_body = {
            'file_id': str(file_obj.id)
        }

        # Send a message to the queue, so we can process this particular file eventually.
        response = queue.send_message(
            MessageBody=json.dumps(message_body),
            MessageGroupId='messageGroupId'
        )

        # Let the user know the file has been sent to the queue and is PENDING processing.
        return Response({"message": "File has been scheduled for processing..."}, 
    status=200)

Вход в полноэкранный режим Выход из полноэкранного режима

Тестирование того, что мы имеем на данный момент

Во-первых, давайте соединим URL-адреса приложения file с URL-адресами нашего проекта. Внутри папки file создайте новый файл urls.py.

# file/urls.py

from django.urls import path
from .views import FileView

urlpatterns = [
    path('process', FileView.as_view())
]

Войдите в полноэкранный режим Выйдите из полноэкранного режима

Теперь отредактируйте файл core/urls.py,

from django.contrib import admin
from django.urls import path, include # new

urlpatterns = [
    path('admin/', admin.site.urls),
    path('files/', include('file.urls')), # new
]

Войдите в полноэкранный режим Выйдите из полноэкранного режима

Далее, создайте и запустите миграции

(venv)$ python manage.py makemigrations
(venv)$ python manage.py migrate

Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь запустите сервер разработки.

(venv)$ python manage.py runserver

Войти в полноэкранный режим Выход из полноэкранного режима

Для удобства тестирования мы создадим файл в корневой папке с именем test_file.txt и заполним его 10 или более строками, чтобы он не был пустым. Это будет наш файл для обработки. Теперь путь к вашей папке должен выглядеть следующим образом

amazon-sqs-and-django
 core
 db.sqlite3
 file
 manage.py
 test_file.txt
 venv

Войдите в полноэкранный режим Выйти из полноэкранного режима

Теперь запустите postman и протестируйте конечную точку http://127.0.0.1:8000/files/process с абсолютным путем к файлу test_file.txt.

Теперь давайте вернемся на страницу очередей в консоли AWS и щелкнем нашу недавно созданную очередь — MyFileProcessingQueue.fifo. Затем в правом верхнем углу новой страницы нажмите Send and Receive messages.

Теперь нажмите на poll messages и щелкните первое пришедшее сообщение, вы должны увидеть данные, которые мы отправили из приложения Django.

Теперь мы знаем, что первая часть нашего сервиса работает. Теперь мы можем отправить сообщение из нашего приложения Django в очередь SQS.

Заключение и следующие шаги

В первой части статьи AWS SQS python и Django вы узнали об основах очередей в целом и Amazon SQS, его преимуществах и некоторых случаях использования. Мы узнали, как создать очередь и создали Django-приложение для отправки сообщений в эту очередь.

В следующей статье мы

  1. Создадим наш сервис B, сервис обработки файлов, который будет принимать сообщения из очереди, обрабатывать файл и обновлять базу данных.
  2. Мы также предоставим конечную точку для пользователя, чтобы он мог проверить результат обработки файла в любой момент времени.

Если вы нашли эту статью полезной или узнали что-то новое, оставьте большой палец вверх!

До следующего раза, счастливого кодинга!

Леви

Оцените статью
devanswers.ru
Добавить комментарий