Понимание Amazon SQS с помощью Python и Django — часть 2

Здравствуйте👋, Это вторая часть моего цикла статей о понимании Amazon SQS с помощью Python и Django. Эта статья предполагает, что вы прочитали первую статью цикла; вы можете найти эту статью по ссылке Понимание Amazon SQS с помощью Python и Django — часть 1. Соответствующий код можно найти в этом репозитории github.

Цели

  • Узнать, как получать сообщения от Amazon SQS с помощью Python.
  • Создать конечную точку для отслеживания нашей обработки файлов.

Как служба обработки файлов (Consumer) взаимодействует с очередью.

Есть несколько вещей, которые мы должны знать о взаимодействии с очередью при опросе сообщений.

  1. У нас может быть несколько потребителей, опрашивающих сообщения из одной очереди одновременно.
  2. Потребитель должен удалять сообщение сразу после обработки, чтобы другой потребитель не смог забрать то же самое сообщение.
  3. Существует так называемый VisibilityTimeout. Это время, в течение которого потребитель должен обработать и удалить сообщение, которое он опросил, чтобы другие потребители не забрали это же сообщение. Когда сообщение опрашивается потребителем, это сообщение скрывается от других потребителей на время, установленное в параметре VisibilityTimeout. По умолчанию это 30 секунд.
  4. Вы можете узнать больше о возможностях, которые SQS предоставляет нам при опросе сообщений, в Официальных документах.

Создание службы B — службы обработки файлов

Наша служба B будет представлять собой простой скрипт python, который будет выполняться бесконечно и постоянно опрашивать сообщения из нашей очереди для обработки. Давайте создадим новый файл process_messages_from_queue.py; Поскольку этот файл не является частью нашего «приложения Django», мы создадим его вне каталога нашего приложения Django, чтобы имитировать его на совершенно другом сервере. Теперь ваше дерево файлов должно выглядеть примерно так:

.
 amazon-sqs-and-django
    core
    db.sqlite3
    file
    manage.py
    test_file.txt
    requirements.txt
    venv
 process_messages_from_queue.py

Вход в полноэкранный режим Выход из полноэкранного режима

Во-первых, давайте создадим отдельное окружение для нашего скрипта и установим все, что ему понадобится. В терминале выполните следующие действия:

amazon-sqs-and-django git:(part-2) python3 -m venv serviceb_venv             
amazon-sqs-and-django git:(part-2) source serviceb_venv/bin/activate
(serviceb_venv) amazon-sqs-and-django git:(part-2) pip install boto3

Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь зайдите в файл process_messages_from_queue.py,

import json
import sqlite3
import boto3

session = boto3.Session(
    aws_access_key_id='<AWS_ACCESS_KEY_ID>', # replace with your key
    aws_secret_access_key='<AWS_SECRET_ACCESS_KEY>', # replace with your key
)
sqs = session.resource('sqs', region_name='us-east-1')
connection = sqlite3.connect("path/to/django/app/db.sqlite3")

# Statuses we defined in the File Model of our django app.
PROCESSING_STATUS = 1
PROCESSED_STATUS = 2
FAILED_STATUS = 3

def main():

    # Get the SQS queue we created.
    queue = sqs.get_queue_by_name(QueueName="MyFileProcessingQueue.fifo")

    # This loop runs infinitely since we want to constantly keep checking
    # if new messages have been sent to the queue, and if they have, retrieve them and process them.
    while True:
        cursor = connection.cursor()
        # retrieve some messages from the queue.
        messages = queue.receive_messages()

        for message in messages:
            data = json.loads(message.body)
            file_id = data["file_id"]

            # Update File to indicate that it is now in processing stage.
            cursor.execute("UPDATE file_file SET status = ? WHERE id = ?", (PROCESSING_STATUS, file_id,))
            connection.commit()

Войдите в полноэкранный режим Выйти из полноэкранного режима

Мы создаем ресурс sqs для взаимодействия с нашей очередью. Мы также создаем соединение с базой данных нашего приложения django; это необходимо потому, что нам нужно обновить БД после обработки файла. Затем мы создаем функцию main, которая будет содержать весь важный код. Внутри функции main мы:

  1. Получаем очередь SQS и внутри бесконечного цикла непрерывно опрашиваем сообщения в очереди.
  2. Для каждого сообщения мы получаем file_id, который был отправлен из приложения django.
  3. Мы используем file_id для обновления объекта File в базе данных и устанавливаем его статус на PROCESSING. Это позволит пользователю узнать, что его файл больше не PENDING, а скорее, что работа над ним началась.

Следует отметить следующее:

  • В настоящее время метод receive_messages извлекает только одно сообщение за раз, это может помочь предотвратить перегрузку сервера слишком большим количеством запросов одновременно. Вы можете увеличить это значение до максимума 10, задав аргумент MaxNumberOfMessages.

  • Вы можете и должны оптимизировать код для обновления значения по умолчанию VisibilityTimeout, если служба не может обработать определенный файл достаточно быстро. Это делается для того, чтобы другая очередь не забрала его до того, как он будет завершен. Это можно сделать, вызвав метод change_visibility для конкретного сообщения.

Теперь давайте действительно обработаем файл и обновим базу данных обработанными данными. А если обработка не удалась, мы обновим базу данных, чтобы отразить неудачу.

import os # <----- NEW
...
def main():
    ...
    while True:
        ...
        for message in messages:
            ...
            # <------- ADD THIS -------->
            # Get the File obj from the database through the file_id we got from the SQS message.
            file_object = cursor.execute("SELECT lines_count, file_size, file_path, status 
                FROM file_file WHERE id = ?", (file_id,)).fetchone()

            # file_object order ------ (lines_count, file_size, file_path, status)
            file_path = file_object[2]

            # Checking the state of the File Object before we start processing...
            print(f"FILE ID: {file_id}, LINES_COUNT: {file_object[0]}, FILE_SIZE in bytes: {file_object[1]}, STATUS: {file_object[3]}")

            try:
                lines = None
                with open(file_path, "r", encoding="utf-8") as file:
                    lines = len(file.readlines())
                file_size = os.path.getsize(file_path)

                cursor.execute("UPDATE file_file SET status = ?, lines_count = ?, file_size = ? WHERE id = ?",
                    (PROCESSED_STATUS, lines, file_size, file_id,))
            except Exception:
                cursor.execute("""UPDATE file_file SET status = ? WHERE id = ?""",
                    (FAILED_STATUS, file_id,),
                )

            connection.commit()
            # Delete the message, to avoid duplicate processing
            message.delete()

            # check updated database
            updated_file_object = cursor.execute("SELECT lines_count, file_size, file_path, status 
                FROM file_file WHERE id = ?", (file_id,)).fetchone()
            print(f"FILE ID: {file_id}, LINES_COUNT: {updated_file_object[0]}, FILE_SIZE in bytes: {updated_file_object[1]}, STATUS: {updated_file_object[3]}")
            # <------- END-------->

Вход в полноэкранный режим Выход из полноэкранного режима

Кратко о том, что здесь происходит:

  1. Мы получаем путь_файла из базы данных и получаем количество строк в файле, а также его размер.
  2. Эти обработанные данные мы сохраняем в базе данных.
  3. Если в процессе обработки произошла ошибка, мы обновляем объект базы данных до FAILED_STATUS.
  4. И наконец, мы удаляем сообщение, чтобы ни один потребитель не смог получить его снова.

Примечание: Действие обработки файла может быть немного сложнее. Обычно оно работает с гораздо большими файлами, обработка которых занимает гораздо больше времени, но для примера мы оставим все просто.

Теперь осталось только вызвать нашу функцию main при запуске нашего скрипта.

import os
import json
import sqlite3
import boto3
...
def main():
    ...

if __name__ == " __main__":
    main()

Вход в полноэкранный режим Выйти из полноэкранного режима

Полный код будет выглядеть следующим образом:

import os
import json
import sqlite3
import boto3

session = boto3.Session(
    aws_access_key_id='<AWS_ACCESS_KEY_ID>', # replace with your key
    aws_secret_access_key='<AWS_SECRET_ACCESS_KEY>', # replace with your key
)
sqs = session.resource('sqs', region_name='us-east-1')
connection = sqlite3.connect("path/to/django/app/db.sqlite3")

# Statuses we defined in the File Model of our django app.
PROCESSING_STATUS = 1
PROCESSED_STATUS = 2
FAILED_STATUS = 3

def main():

    # Get the SQS queue we created.
    queue = sqs.get_queue_by_name(QueueName="MyFileProcessingQueue.fifo")

    # This loop runs infinitely since we want to constantly keep checking
    # if new messages have been sent to the queue, and if they have, retrieve them and process them.
    while True:
        cursor = connection.cursor()
        # retrieve some messages from the queue.
        messages = queue.receive_messages()

        for message in messages:
            data = json.loads(message.body)
            file_id = data["file_id"]

            # Update File to indicate that it is now in processing stage.
            cursor.execute("UPDATE file_file SET status = ? WHERE id = ?", (PROCESSING_STATUS, file_id,))
            connection.commit()

            # Get the File obj from the database through the file_id we got from the SQS message.
            file_object = cursor.execute("SELECT lines_count, file_size, file_path, status 
                FROM file_file WHERE id = ?", (file_id,)).fetchone()

            # file_object order ------ (lines_count, file_size, file_path, status)
            file_path = file_object[2]

            # Checking the state of the File Object before we start processing...
            print(f"FILE ID: {file_id}, LINES_COUNT: {file_object[0]}, FILE_SIZE in bytes: {file_object[1]}, STATUS: {file_object[3]}")

            try:
                lines = None
                with open(file_path, "r", encoding="utf-8") as file:
                    lines = len(file.readlines())
                file_size = os.path.getsize(file_path)

                cursor.execute("UPDATE file_file SET status = ?, lines_count = ?, file_size = ? WHERE id = ?",
                    (PROCESSED_STATUS, lines, file_size, file_id,))
            except Exception:
                cursor.execute("""UPDATE file_file SET status = ? WHERE id = ?""",
                    (FAILED_STATUS, file_id,),
                )

            connection.commit()

            # Delete the message, to avoid duplicate processing
            message.delete()

            # check updated database
            updated_file_object = cursor.execute("SELECT lines_count, file_size, file_path, status 
                FROM file_file WHERE id = ?", (file_id,)).fetchone()
            print(f"FILE ID: {file_id}, LINES_COUNT: {updated_file_object[0]}, FILE_SIZE in bytes: {updated_file_object[1]}, STATUS: {updated_file_object[3]}")

if __name__ == " __main__":
    main()

Вход в полноэкранный режим Выход из полноэкранного режима

Тестирование службы обработки файлов

В терминале запустите скрипт process_messages_from_queue.py.

(serviceb_venv) amazon-sqs-and-django git:(part-2) python3 process_messages_from_queue.py

Войти в полноэкранный режим Выход из полноэкранного режима

Теперь откройте второй терминал и запустите сервер приложения django. Помните, что во втором терминале вам нужно будет активировать виртуальную среду, предназначенную для приложения django. Теперь перейдем к postman и отправим новый POST-запрос для обработки нашего файла test_file.txt. В конечном итоге в вашем терминале должно появиться что-то вроде

Отлично! С помощью операторов печати мы видим, что файл obj, с ID 8 в моем случае, который изначально был пуст и находился в состоянии PROCESSING or 1, в конечном итоге был заполнен обработанными данными и обновлен до состояния PROCESSED or 2. Вот что может предложить нам SQS; она позволяет нам отложить фактическую обработку, и мы обновляем базу данных с нашими результатами, когда она завершена.

Один производитель, несколько потребителей

На данный момент мы уже знаем, что SQS ограничивает скорость обработки запросов, что, в свою очередь, помогает избежать перегрузки сервера. Но если нам нужно обработать больше сообщений в более быстром темпе, мы можем добавить несколько экземпляров потребителей, чтобы разделить нагрузку и сократить общее время обработки. Это будет выглядеть примерно так:

Создание конечной точки для проверки результата/статуса обработки файла

Мы получили сообщение, обработали файл и сохранили данные, но нам все еще нужно дать пользователю возможность просмотреть результат процесса: не удалось ли это или нет. Обновите FileView в файле views.py нашего приложения Django следующим образом:

...
from rest_framework.response import Response
from .models import File
from django.shortcuts import get_object_or_404 # <----- NEW

Class FileView(APIView):
    def post(self, request):
        ....

    # <----- ADD THIS ----->
    def get(self, request, pk: int):
        file = get_object_or_404(File, pk=pk)
        return Response({
                'lines_count': file.lines_count,
                'file_size': file.file_size,
                'status': file.status
            })
    # <-------- END -------->

Войти в полноэкранный режим Выйти из полноэкранного режима

Далее, обновите файл urls.py нашего проекта.

from django.urls import path
from .views import FileView

urlpatterns = [
    path('process', FileView.as_view()),
    path('check-status/<int:pk>', FileView.as_view()), # <---- NEW
]

Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь, внутри postman, мы можем запросить нашу новую конечную точку check-status. Мы заменим pk на id нашего файлового объекта, в моем случае это будет 8.

Теперь пользователь всегда может подтвердить статус своего запроса на обработку файла, который может быть PENDING, PROCESSING, FAILED, или PROCESSED (в этом случае также отображаются обработанные данные).

Заключение

Вот и все! Теперь у вас есть достаточное понимание SQS и того, как интегрировать его в приложение. Спасибо, что читали и кодировали вместе с нами в этой серии из 2 частей о понимании Amazon SQS с помощью Python и Django. Если вы пропустили, то вот часть 1 Понимание Amazon SQS с помощью Python и Django — часть 1.

Если вы нашли эту статью полезной или узнали что-то новое, оставьте большой палец вверх и следите за мной, чтобы быть в курсе всех последних публикаций!

До следующего раза, счастливого кодинга!

Леви

Оцените статью
devanswers.ru
Добавить комментарий