Просмотр версии на испанском языке

CQRS с Symfony Messenger
Адриан Гарай ・ Aug 22 ・ 8 min read
Введение
Обычно мы используем одну и ту же структуру данных для записи и запроса информации в системе, для больших систем это может привести к увеличению структуры данных, поскольку необходимо интегрировать чтение и запись в одну модель. Например, при записи информации нам может понадобиться множество валидаций, чтобы убедиться, что информация, которую мы хотим сохранить, верна, запрос этой информации может быть разным и сложным, чтобы получить отфильтрованные данные или различные структуры данных для каждого случая.
CQRS — это шаблон, который разделяет операции чтения и обновления для хранилища данных. Внедрение CQRS в ваше приложение может максимально повысить его производительность, масштабируемость и безопасность. Гибкость, обеспечиваемая переходом на CQRS, позволяет системе лучше развиваться со временем и предотвращает возникновение конфликтов слияния команд обновления на уровне домена.
Шаблон проектирования
CQRS разделяет структуру чтения с использованием запросов для чтения данных и модель записи с использованием команд для выполнения операций над данными.
- Команды должны быть основаны на задачах, что означает, что мы должны сосредоточиться на операции команды, например, в приложении доставки при заказе чего-либо мы будем вызывать операцию OrderProductCommand вместо AddProductToClient или CreateNewOrderProduct, это также делает наш слой приложения более последовательным.
- Запросы никогда не изменяют базу данных. Запрос возвращает DTO, который не содержит никаких знаний о домене. Мы должны сосредоточиться на необходимой информации, а не на поведении в домене.
Преимущества
- Независимое масштабирование. Позволяет моделям чтения и записи масштабироваться независимо друг от друга.
- Оптимизированные схемы данных. Модель чтения может использовать схему, оптимизированную для запросов, а модель записи — схему, оптимизированную для обновлений.
- Безопасность. Так легче обеспечить, чтобы записи выполняли только правильные доменные сущности.
- Разделение забот. Сложная бизнес-логика входит в модель записи. Модель чтения может быть простой.
Реализация CQRS с помощью Symfony Messenger
Компонент Messenger помогает приложениям отправлять и получать сообщения в/из других приложений или через очереди сообщений. Он также позволяет нам определять пользовательские шины сообщений, которые определяют типы сообщений и обработчики.
Давайте поговорим об архитектуре программного обеспечения.
CommandBus
Говоря о командах, нам необходимо модульно оформить общий интерфейс, которым шина сообщений может управлять и передавать контроллерам, интерфейс команд в итоге становится нашим базовым интерфейсом для каждой команды. Каждый обработчик команд будет выполнять операции над заданной командой, но сама команда не знает, какая операция выполняется. Кроме того, мы создадим интерфейс командной шины, чтобы создавать различные типы транспортеров для наших сообщений (команд), в данном случае мы создаем командную шину in-memory, но при необходимости мы можем легко расширить эту концепцию (например, работа в очереди).
В итоге мы получим что-то вроде этого в src-коде нашего приложения Symfony.
<?php
declare(strict_types=1);
namespace AppSharedDomainBusCommand;
interface Command
{
}
<?php
declare(strict_types=1);
namespace AppSharedDomainBusCommand;
interface CommandBus
{
public function dispatch(Command $command) : void;
}
<?php
declare(strict_types=1);
namespace AppSharedDomainBusCommand;
interface CommandHandler
{
}
<?php
declare(strict_types=1);
namespace AppSharedInfrastructureBusCommand;
use ...
final class InMemoryCommandBus implements CommandBus
{
private MessageBus $bus;
public function __construct(
iterable $commandHandlers
) {
$this->bus = new MessageBus([
new HandleMessageMiddleware(
new HandlersLocator(
HandlerBuilder::fromCallables($commandHandlers),
),
),
]);
}
/**
* @throws Throwable
*/
public function dispatch(Command $command): void
{
try {
$this->bus->dispatch($command);
} catch (NoHandlerForMessageException $e) {
throw new InvalidArgumentException(sprintf('The command has not a valid handler: %s', $command::class));
} catch (HandlerFailedException $e) {
throw $e->getPrevious();
}
}
}
Для класса In Memory Command Bus нам нужно будет зарегистрировать каждый обработчик команд, исходящий из определения сервиса, в этом нам поможет функция Symfony под названием Service tags, предоставляемая контейнером сервисов и автоконфигурацией, она позволяет нам помечать сервис, который мы затем можем запросить в config/services. yaml мы указываем Контейнер Сервиса, который мы хотим пометить каждый экземпляр интерфейса Command Handler тегом internal.command_handler
, а затем объявляем нашу In Memory Command Bus, передавая все реализаторы Command Handler в качестве итерабельного аргумента. Командная шина примет каждый обработчик команды и объявит ожидаемую команду с соответствующим обработчиком.
parameters:
services:
_defaults:
autowire: true
autoconfigure: true
_instanceof:
AppSharedDomainBusCommandCommandHandler:
tags: ['internal.command_handler']
...
### Buses
AppSharedDomainBusCommandCommandBus:
class: AppSharedInfrastructureBusCommandInMemoryCommandBus
arguments: [!tagged internal.command_handler]
Мы можем создать инструмент создания драйвера, который будет искать в функции __invoke реализатора обработчика команд и принимать первый тип аргумента как команду, необходимую для вызова обработчика. На этом этапе мы создаем соглашение, согласно которому каждый контроллер команд должен иметь возможность быть вызванным как функция и иметь только один параметр с типом команды.
<?php
declare(strict_types=1);
namespace AppSharedInfrastructureBus;
use ...
final class HandlerBuilder
{
/**
* @throws ReflectionException
*/
public static function fromCallables(iterable $callables) : array
{
$callablesHandlers = [];
foreach ($callables as $callable) {
$envelop = self::extractFirstParam($callable);
if (! array_key_exists($envelop, $callablesHandlers)) {
$callablesHandlers[self::extractFirstParam($callable)] = [];
}
$callablesHandlers[self::extractFirstParam($callable)][] = $callable;
}
return $callablesHandlers;
}
/**
* @throws ReflectionException
*/
private static function extractFirstParam(object|string $class) : string|null
{
$reflection = new ReflectionClass($class);
$method = $reflection->getMethod('__invoke');
if ($method->getNumberOfParameters() === 1) {
return $method->getParameters()[0]->getClass()?->getName();
}
return null;
}
}
С этим мы можем начать создавать наши команды, например:
<?php
declare(strict_types=1);
namespace AppEmailSenderApplicationCreate;
use ...
final class CreateEmailCommand implements Command
{
public function __construct(
private readonly string $sender,
private readonly string $addressee,
private readonly string $message,
) {
}
public function sender(): string
{
return $this->sender;
}
public function addressee(): string
{
return $this->addressee;
}
public function message(): string
{
return $this->message;
}
}
Мы можем создать команду Create Email Command, она содержит информацию, необходимую для создания нового письма, но она не знает процесса, необходимого для этого.
<?php
declare(strict_types=1);
namespace AppEmailSenderApplicationCreate;
use ...
class CreateEmailCommandHandler implements CommandHandler
{
public function __construct(private EmailRepository $repository)
{
}
public function __invoke(CreateEmailCommand $command) : EmailId {
$email = Email::createNewEmail(
sender: new EmailAddress($command->sender()),
addressee: new EmailAddress($command->addressee()),
message: new Message($command->message()),
);
$this->repository->save($email);
return $email->id();
}
}
Мы создаем обработчик команд для вышеуказанной команды, он знает, что функция __invoke объекта содержит единственный аргумент типа Create Email Command и знает все процессы, необходимые для создания нового письма.
<?php
declare(strict_types=1);
namespace AppEmailSenderInfrastructureHttp;
use ...
class CreateEmailAction
{
public function __construct(
private readonly CreateEmailResponder $responder,
private readonly CommandBus $commandBus,
) {
}
public function __invoke(Request $request) : Response
{
try {
$this->commandBus->dispatch(
new CreateEmailCommand(
sender: $request->request->get('sender'),
addressee: $request->request->get('addressee'),
message: $request->request->get('message'),
),
);
} catch (Exception $e) {
$this->responder->loadError($e->getMessage());
}
return $this->responder->response();
}
}
Затем мы можем легко внедрить командную шину в класс Action (Controller) и отправить команду, действие не знает, что происходит в основном приложении, но командная шина может убедиться, что мы отправим команду соответствующему обработчику и выполним действие. Обратите внимание, что нам известно действие, которое должно произойти, оно предоставлено из имени команды.
QueryBus
Давайте рассмотрим модель для QueryBus. Мы можем определить очень похожую архитектуру, но теперь нам нужно вернуть значение, если мы запрашиваем что-то с помощью запроса, нам нужно ввести понятие Response. Response может быть коллекцией доменных объектов или это может быть один объект или что угодно, кто может определить, что такое Response — это обработчик запросов, который знает, какую информацию ему нужно сгенерировать.
В итоге мы получаем что-то вроде этого:
<?php
declare(strict_types=1);
namespace AppSharedDomainBusQuery;
interface Query
{
}
<?php
declare(strict_types=1);
namespace AppSharedDomainBusQuery;
interface QueryBus
{
public function ask(Query $query) : Response|null;
}
<?php
declare(strict_types=1);
namespace AppSharedDomainBusQuery;
interface QueryHandler
{
}
<?php
declare(strict_types=1);
namespace AppSharedDomainBusQuery;
interface Response
{
}
<?php
declare(strict_types=1);
namespace AppSharedInfrastructureBusQuery;
use ...
final class InMemoryQueryBus implements QueryBus
{
private MessageBus $bus;
public function __construct(iterable $queryHandlers)
{
$this->bus = new MessageBus([
new HandleMessageMiddleware(
new HandlersLocator(
HandlerBuilder::fromCallables($queryHandlers),
),
),
]);
}
public function ask(Query $query): Response|null
{
try {
/** @var HandledStamp $stamp */
$stamp = $this->bus->dispatch($query)->last(HandledStamp::class);
return $stamp->getResult();
} catch (NoHandlerForMessageException $e) {
throw new InvalidArgumentException(sprintf('The query has not a valid handler: %s', $query::class));
}
}
}
Мы можем использовать тот же подход для регистрации обработчиков запросов и запросов, используя инструмент для создания обработчиков и зная, что у нас есть контракт, в котором функция __invoke должна иметь только один аргумент, который должен быть реализатором интерфейса Query.
Чтобы получить возвращаемое значение из обработчика запроса, нам нужно использовать штамп Handled, который пометит сообщение как обработанное и даст нам доступ к возвращаемому значению, которое, как мы знаем на данный момент, должно быть реализатором Response.
В config/service.yaml мы можем пометить любой экземпляр обработчика запросов тегом internal.query_handler
и позволить контейнеру сервиса внедрить все теги в шину запросов в памяти.
services:
_defaults:
autowire: true
autoconfigure: true
_instanceof:
...
AppSharedDomainBusQueryQueryHandler:
tags: ['internal.query_handler']
...
### Buses
...
AppSharedDomainBusQueryQueryBus:
class: AppSharedInfrastructureBusQueryInMemoryQueryBus
arguments: [ !tagged internal.query_handler ]
Когда все готово, мы можем начать создавать, например, запросы:
<?php
declare(strict_types=1);
namespace AppEmailSenderApplicationFindEmail;
use ...
final class FindEmailQuery implements Query
{
public function __construct(private readonly int $id)
{
}
public function id() : int
{
return $this->id;
}
}
Простой запрос, содержащий идентификатор искомого электронного письма, будет отправлен в обработчик запросов Find Email. Он имеет достаточно информации о письме, чтобы найти его и сгенерировать ответ с необходимой информацией.
<?php
declare(strict_types=1);
namespace AppEmailSenderApplicationFindEmail;
use ...
final class FindEmail implements QueryHandler
{
public function __construct(private EmailRepository $repository)
{
}
public function __invoke(FindEmailQuery $query) : FindEmailResponse
{
$email = $this->repository->findById(
EmailId::fromInt(
$query->id(),
),
);
if ($email === null) {
throw new InvalidArgumentException('Email unreachable');
}
return new FindEmailResponse(
email: $email,
);
}
}
<?php
declare(strict_types=1);
namespace AppEmailSenderApplicationFindEmail;
use ...
final class FindEmailResponse implements Response
{
public function __construct(private readonly EmailDto $email)
{
}
public function email() : EmailDto
{
return $this->email;
}
}
Наконец, мы можем использовать Query Bus в любом классе Action
<?php
declare(strict_types=1);
namespace AppEmailSenderInfrastructureHttp;
use ...
class GetEmailAction
{
public function __construct(
private GetEmailResponder $responder,
private QueryBus $queryBus,
) {
}
public function __invoke(Request $request, int $id) : Response
{
try {
/** @var FindEmailResponse $findEmailResponse */
$findEmailResponse = $this->queryBus->ask(
new FindEmailQuery(id: $id)
);
$email = $findEmailResponse->email();
$this->responder->loadEmail($email);
} catch (Exception $e) {
$this->responder->loadError($e->getMessage());
}
return $this->responder->response();
}
}
Опять же, Действие знает, что оно ищет, но не знает полного процесса, чтобы получить это.
Заключение
Мы можем легко реализовать паттерн CQRS с помощью компонентов Symfony, создав пользовательские шины сообщений и определив модель, которую можно использовать повторно во всем приложении. CQRS может помочь нам разделить операции и проблемы поиска по описательным классам Command/Query для создания более изолированных процессов, делая классы открытыми для изменений.
Смотрите код на
AdGARAY / cqrs-symfony
Пример CQRS с Symfony Messenger
CQRS с Symfony Messenger
Требования
- Docker compose
Настройка
Инициализация контейнеров
$ docker compose up -d
Войдите в контейнер php
$ docker compose exec -it php bash
Установите зависимости composer
/var/www/html# $ composer install
Запуск миграций
/var/www/html# $ php bin/console doctrine:migrations:migrate --no-interaction
Перейдите на localhost:8080
В php-образе уже есть xDebug, прослушивающий порт 9003
с именем сервера serverName=application
если вы хотите пройти шаг за шагом
Смотрите полный текст сообщения на dev.to/adgaray и испанскую версию