CQRS с Symfony Messenger (английский)

Просмотр версии на испанском языке

Введение

Обычно мы используем одну и ту же структуру данных для записи и запроса информации в системе, для больших систем это может привести к увеличению структуры данных, поскольку необходимо интегрировать чтение и запись в одну модель. Например, при записи информации нам может понадобиться множество валидаций, чтобы убедиться, что информация, которую мы хотим сохранить, верна, запрос этой информации может быть разным и сложным, чтобы получить отфильтрованные данные или различные структуры данных для каждого случая.

CQRS — это шаблон, который разделяет операции чтения и обновления для хранилища данных. Внедрение CQRS в ваше приложение может максимально повысить его производительность, масштабируемость и безопасность. Гибкость, обеспечиваемая переходом на CQRS, позволяет системе лучше развиваться со временем и предотвращает возникновение конфликтов слияния команд обновления на уровне домена.

Шаблон проектирования

CQRS разделяет структуру чтения с использованием запросов для чтения данных и модель записи с использованием команд для выполнения операций над данными.

  • Команды должны быть основаны на задачах, что означает, что мы должны сосредоточиться на операции команды, например, в приложении доставки при заказе чего-либо мы будем вызывать операцию OrderProductCommand вместо AddProductToClient или CreateNewOrderProduct, это также делает наш слой приложения более последовательным.
  • Запросы никогда не изменяют базу данных. Запрос возвращает DTO, который не содержит никаких знаний о домене. Мы должны сосредоточиться на необходимой информации, а не на поведении в домене.

Преимущества

  • Независимое масштабирование. Позволяет моделям чтения и записи масштабироваться независимо друг от друга.
  • Оптимизированные схемы данных. Модель чтения может использовать схему, оптимизированную для запросов, а модель записи — схему, оптимизированную для обновлений.
  • Безопасность. Так легче обеспечить, чтобы записи выполняли только правильные доменные сущности.
  • Разделение забот. Сложная бизнес-логика входит в модель записи. Модель чтения может быть простой.

Реализация CQRS с помощью Symfony Messenger

Компонент Messenger помогает приложениям отправлять и получать сообщения в/из других приложений или через очереди сообщений. Он также позволяет нам определять пользовательские шины сообщений, которые определяют типы сообщений и обработчики.

Давайте поговорим об архитектуре программного обеспечения.

CommandBus

Говоря о командах, нам необходимо модульно оформить общий интерфейс, которым шина сообщений может управлять и передавать контроллерам, интерфейс команд в итоге становится нашим базовым интерфейсом для каждой команды. Каждый обработчик команд будет выполнять операции над заданной командой, но сама команда не знает, какая операция выполняется. Кроме того, мы создадим интерфейс командной шины, чтобы создавать различные типы транспортеров для наших сообщений (команд), в данном случае мы создаем командную шину in-memory, но при необходимости мы можем легко расширить эту концепцию (например, работа в очереди).

В итоге мы получим что-то вроде этого в src-коде нашего приложения Symfony.

<?php

declare(strict_types=1);

namespace AppSharedDomainBusCommand;

interface Command
{
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppSharedDomainBusCommand;

interface CommandBus
{
    public function dispatch(Command $command) : void;
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppSharedDomainBusCommand;

interface CommandHandler
{
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppSharedInfrastructureBusCommand;

use ...

final class InMemoryCommandBus implements CommandBus
{
    private MessageBus $bus;

    public function __construct(
        iterable $commandHandlers
    ) {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($commandHandlers),
                ),
            ),
        ]);
    }

    /**
     * @throws Throwable
     */
    public function dispatch(Command $command): void
    {
        try {
            $this->bus->dispatch($command);
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The command has not a valid handler: %s', $command::class));
        } catch (HandlerFailedException $e) {
            throw $e->getPrevious();
        }
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Для класса In Memory Command Bus нам нужно будет зарегистрировать каждый обработчик команд, исходящий из определения сервиса, в этом нам поможет функция Symfony под названием Service tags, предоставляемая контейнером сервисов и автоконфигурацией, она позволяет нам помечать сервис, который мы затем можем запросить в config/services. yaml мы указываем Контейнер Сервиса, который мы хотим пометить каждый экземпляр интерфейса Command Handler тегом internal.command_handler, а затем объявляем нашу In Memory Command Bus, передавая все реализаторы Command Handler в качестве итерабельного аргумента. Командная шина примет каждый обработчик команды и объявит ожидаемую команду с соответствующим обработчиком.

parameters:

services:
    _defaults:
        autowire: true
        autoconfigure: true

    _instanceof:
        AppSharedDomainBusCommandCommandHandler:
            tags: ['internal.command_handler']
...
    ### Buses
    AppSharedDomainBusCommandCommandBus:
        class: AppSharedInfrastructureBusCommandInMemoryCommandBus
        arguments: [!tagged internal.command_handler]
Войдите в полноэкранный режим Выход из полноэкранного режима

Мы можем создать инструмент создания драйвера, который будет искать в функции __invoke реализатора обработчика команд и принимать первый тип аргумента как команду, необходимую для вызова обработчика. На этом этапе мы создаем соглашение, согласно которому каждый контроллер команд должен иметь возможность быть вызванным как функция и иметь только один параметр с типом команды.

<?php

declare(strict_types=1);

namespace AppSharedInfrastructureBus;

use ...

final class HandlerBuilder
{
    /**
     * @throws ReflectionException
     */
    public static function fromCallables(iterable $callables) : array
    {
        $callablesHandlers = [];

        foreach ($callables as $callable) {
            $envelop = self::extractFirstParam($callable);

            if (! array_key_exists($envelop, $callablesHandlers)) {
                $callablesHandlers[self::extractFirstParam($callable)] = [];
            }

            $callablesHandlers[self::extractFirstParam($callable)][] = $callable;
        }

        return $callablesHandlers;
    }

    /**
     * @throws ReflectionException
     */
    private static function extractFirstParam(object|string $class) : string|null
    {
        $reflection = new ReflectionClass($class);
        $method     = $reflection->getMethod('__invoke');

        if ($method->getNumberOfParameters() === 1) {
            return $method->getParameters()[0]->getClass()?->getName();
        }

        return null;
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

С этим мы можем начать создавать наши команды, например:

<?php

declare(strict_types=1);

namespace AppEmailSenderApplicationCreate;

use ...

final class CreateEmailCommand implements Command
{
    public function __construct(
        private readonly string $sender,
        private readonly string $addressee,
        private readonly string $message,
    ) {
    }

    public function sender(): string
    {
        return $this->sender;
    }

    public function addressee(): string
    {
        return $this->addressee;
    }

    public function message(): string
    {
        return $this->message;
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Мы можем создать команду Create Email Command, она содержит информацию, необходимую для создания нового письма, но она не знает процесса, необходимого для этого.

<?php

declare(strict_types=1);

namespace AppEmailSenderApplicationCreate;

use ...

class CreateEmailCommandHandler implements CommandHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(CreateEmailCommand $command) : EmailId {
        $email = Email::createNewEmail(
            sender: new EmailAddress($command->sender()),
            addressee: new EmailAddress($command->addressee()),
            message: new Message($command->message()),
        );

        $this->repository->save($email);

        return $email->id();
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Мы создаем обработчик команд для вышеуказанной команды, он знает, что функция __invoke объекта содержит единственный аргумент типа Create Email Command и знает все процессы, необходимые для создания нового письма.

<?php

declare(strict_types=1);

namespace AppEmailSenderInfrastructureHttp;

use ...

class CreateEmailAction
{
    public function __construct(
        private readonly CreateEmailResponder $responder,
        private readonly CommandBus $commandBus,
    ) {
    }

    public function __invoke(Request $request) : Response
    {
        try {
            $this->commandBus->dispatch(
                new CreateEmailCommand(
                    sender: $request->request->get('sender'),
                    addressee: $request->request->get('addressee'),
                    message: $request->request->get('message'),
                ),
            );
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Затем мы можем легко внедрить командную шину в класс Action (Controller) и отправить команду, действие не знает, что происходит в основном приложении, но командная шина может убедиться, что мы отправим команду соответствующему обработчику и выполним действие. Обратите внимание, что нам известно действие, которое должно произойти, оно предоставлено из имени команды.

QueryBus

Давайте рассмотрим модель для QueryBus. Мы можем определить очень похожую архитектуру, но теперь нам нужно вернуть значение, если мы запрашиваем что-то с помощью запроса, нам нужно ввести понятие Response. Response может быть коллекцией доменных объектов или это может быть один объект или что угодно, кто может определить, что такое Response — это обработчик запросов, который знает, какую информацию ему нужно сгенерировать.

В итоге мы получаем что-то вроде этого:

<?php

declare(strict_types=1);

namespace AppSharedDomainBusQuery;

interface Query
{
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppSharedDomainBusQuery;

interface QueryBus
{
    public function ask(Query $query) : Response|null;
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppSharedDomainBusQuery;

interface QueryHandler
{
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppSharedDomainBusQuery;

interface Response
{
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppSharedInfrastructureBusQuery;

use ...

final class InMemoryQueryBus implements QueryBus
{
    private MessageBus $bus;

    public function __construct(iterable $queryHandlers)
    {
        $this->bus = new MessageBus([
            new HandleMessageMiddleware(
                new HandlersLocator(
                    HandlerBuilder::fromCallables($queryHandlers),
                ),
            ),
        ]);
    }

    public function ask(Query $query): Response|null
    {
        try {
            /** @var HandledStamp $stamp */
            $stamp = $this->bus->dispatch($query)->last(HandledStamp::class);

            return $stamp->getResult();
        } catch (NoHandlerForMessageException $e) {
            throw new InvalidArgumentException(sprintf('The query has not a valid handler: %s', $query::class));
        }
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Мы можем использовать тот же подход для регистрации обработчиков запросов и запросов, используя инструмент для создания обработчиков и зная, что у нас есть контракт, в котором функция __invoke должна иметь только один аргумент, который должен быть реализатором интерфейса Query.

Чтобы получить возвращаемое значение из обработчика запроса, нам нужно использовать штамп Handled, который пометит сообщение как обработанное и даст нам доступ к возвращаемому значению, которое, как мы знаем на данный момент, должно быть реализатором Response.

В config/service.yaml мы можем пометить любой экземпляр обработчика запросов тегом internal.query_handler и позволить контейнеру сервиса внедрить все теги в шину запросов в памяти.

services:
    _defaults:
        autowire: true 
        autoconfigure: true

    _instanceof:
        ...

        AppSharedDomainBusQueryQueryHandler:
            tags: ['internal.query_handler']
        ...
    ### Buses
    ...

    AppSharedDomainBusQueryQueryBus:
        class: AppSharedInfrastructureBusQueryInMemoryQueryBus
        arguments: [ !tagged internal.query_handler ]
Войдите в полноэкранный режим Выход из полноэкранного режима

Когда все готово, мы можем начать создавать, например, запросы:

<?php

declare(strict_types=1);

namespace AppEmailSenderApplicationFindEmail;

use ...

final class FindEmailQuery implements Query
{
    public function __construct(private readonly int $id)
    {
    }

    public function id() : int
    {
        return $this->id;
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Простой запрос, содержащий идентификатор искомого электронного письма, будет отправлен в обработчик запросов Find Email. Он имеет достаточно информации о письме, чтобы найти его и сгенерировать ответ с необходимой информацией.

<?php

declare(strict_types=1);

namespace AppEmailSenderApplicationFindEmail;

use ...

final class FindEmail implements QueryHandler
{
    public function __construct(private EmailRepository $repository)
    {
    }

    public function __invoke(FindEmailQuery $query) : FindEmailResponse
    {
        $email = $this->repository->findById(
            EmailId::fromInt(
                $query->id(),
            ),
        );

        if ($email === null) {
            throw new InvalidArgumentException('Email unreachable');
        }

        return new FindEmailResponse(
            email: $email,
        );
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима
<?php

declare(strict_types=1);

namespace AppEmailSenderApplicationFindEmail;

use ...

final class FindEmailResponse implements Response
{
    public function __construct(private readonly EmailDto $email)
    {
    }

    public function email() : EmailDto
    {
        return $this->email;
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Наконец, мы можем использовать Query Bus в любом классе Action

<?php

declare(strict_types=1);

namespace AppEmailSenderInfrastructureHttp;

use ...

class GetEmailAction
{
    public function __construct(
        private GetEmailResponder $responder,
        private QueryBus $queryBus,
    ) {
    }

    public function __invoke(Request $request, int $id) : Response
    {
        try {
            /** @var FindEmailResponse $findEmailResponse */
            $findEmailResponse = $this->queryBus->ask(
                new FindEmailQuery(id: $id)
            );

            $email = $findEmailResponse->email();

            $this->responder->loadEmail($email);
        } catch (Exception $e) {
            $this->responder->loadError($e->getMessage());
        }

        return $this->responder->response();
    }
}
Войдите в полноэкранный режим Выход из полноэкранного режима

Опять же, Действие знает, что оно ищет, но не знает полного процесса, чтобы получить это.

Заключение

Мы можем легко реализовать паттерн CQRS с помощью компонентов Symfony, создав пользовательские шины сообщений и определив модель, которую можно использовать повторно во всем приложении. CQRS может помочь нам разделить операции и проблемы поиска по описательным классам Command/Query для создания более изолированных процессов, делая классы открытыми для изменений.

Смотрите код на

AdGARAY / cqrs-symfony

Пример CQRS с Symfony Messenger

CQRS с Symfony Messenger

Требования

  • Docker compose

Настройка

Инициализация контейнеров

$ docker compose up -d
Войдите в полноэкранный режим Выход из полноэкранного режима

Войдите в контейнер php

$ docker compose exec -it php bash
Войдите в полноэкранный режим Выход из полноэкранного режима

Установите зависимости composer

/var/www/html# $ composer install
Войдите в полноэкранный режим Выход из полноэкранного режима

Запуск миграций

/var/www/html# $ php bin/console doctrine:migrations:migrate --no-interaction
Войдите в полноэкранный режим Выход из полноэкранного режима

Перейдите на localhost:8080

В php-образе уже есть xDebug, прослушивающий порт 9003 с именем сервера serverName=application если вы хотите пройти шаг за шагом

Смотрите полный текст сообщения на dev.to/adgaray и испанскую версию

Просмотр на GitHub

Оцените статью
devanswers.ru
Добавить комментарий