Go EventSourcing и CQRS с PostgreSQL, Kafka, MongoDB и ElasticSearch 👋✨💫


👨💻 Полный список того, что было использовано:

PostgeSQL в качестве базы данных хранилища событий
Kafka как брокер сообщений
gRPC Go реализация gRPC
Jaeger с открытым исходным кодом, сквозная распределенная трассировка
Prometheus мониторинг и оповещение
Grafana для создания наблюдаемых приборных панелей с использованием всего из Prometheus
MongoDB База данных MongoDB
Elasticsearch Клиент Elasticsearch для Go.
Веб-фреймворк Echo
Kibana Kibana — программное обеспечение для создания приборных панелей визуализации данных для Elasticsearch
Migrate для миграций

Исходный код вы можете найти в репозитории GitHub.
Основная идея этого проекта заключается в реализации Event Sourcing и CQRS с использованием Go, Postgresql, Kafka для хранения событий и Mongo, ElasticSearch для проекций чтения.
Ранее я уже писал подобные статьи, где реализовывал такой же микросервис с использованием Go и EventStoreDB,
и Spring,
Как уже писалось ранее, повторюсь, считаю EventStoreDB лучшим выбором для сорсинга событий, но в реальной жизни в некоторых проектах у нас обычно есть бизнес ограничения и например
использование EventStoreDB может быть не разрешено, в этом случае, думаю, что postgres и kafka являются хорошей альтернативой для реализации нашего собственного хранилища событий.
Если вы не знакомы с паттернами EventSourcing и CQRS, лучшее место для чтения — microservices.io,
блог и документация сайта eventstore тоже очень хороши,
и очень рекомендую книгу Алексея Зимарева «Hands-on Domain-Driven Design with .NET Core».

В данном проекте мы имеем микросервис с eventstore, реализованный с помощью PostgeSQL и Kafka,
в качестве баз данных для чтения проекций MongoDB и Elasticsearch.
Некоторые описания в этой статье повторяются, потому что есть другая реализация с использованием postgres и kafka, но идея та же.

Все интерфейсы UI будут доступны на портах:

Jaeger UI: http://localhost:16686

Prometheus UI: http://localhost:9090

Grafana UI: http://localhost:3005

Файл Docker compose для этого проекта:

version: "3.9"

services:
  es_postgesql:
    image: postgres:14.4
    container_name: es_postgesql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=bank_accounts
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - ./microservices_pgdata:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka:
    image: 'bitnami/kafka:3.2.0'
    ports:
      - "9092:9092"
      - "9093:9093"
    volumes:
      - "./kafka_data:/bitnami"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - ./mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.35
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

  node01:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.2.3
    container_name: node01
    restart: always
    environment:
      - node.name=node01
      - cluster.name=es-cluster-8
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.license.self_generated.type=basic
      - xpack.security.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - ./es-data01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    networks: [ "microservices" ]

  kibana:
    image: docker.elastic.co/kibana/kibana:8.2.3
    restart: always
    environment:
      ELASTICSEARCH_HOSTS: http://node01:9200
    ports:
      - "5601:5601"
    depends_on:
      - node01
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:
  es-data01:
    driver: local

networks:
  microservices:
    name: microservices
Войти в полноэкранный режим Выход из полноэкранного режима

В Event Sourcing мы храним историю всех действий, которые произошли с сущностью, и на основе этого получаем состояние,
Можно просмотреть эту историю назад, чтобы определить, каким было состояние в определенный момент времени.
Это шаблон для хранения данных в виде событий в журнале.

Каждое новое событие — это изменение.
AggregateBase должна отслеживать все изменения, которые происходят во время выполнения команды,
чтобы мы могли сохранить эти изменения в обработчике команды.
Агрегаты берут текущее состояние, проверяют бизнес-правила для конкретной операции
и применяют бизнес-логику, которая возвращает новое состояние. Важной частью этого процесса является сохранение всего или ничего.
Все агрегированные данные должны быть успешно сохранены. Если одно правило или операция не сработает, то все изменение состояния будет отклонено.
AggregateRoot может быть реализован различными способами, основные методы — это загрузка событий — apply и raise changes.
Когда мы получаем агрегат из базы данных, вместо того, чтобы читать его состояние как одну запись в таблице или документе,
мы читаем все события, которые были сохранены ранее, и вызываем метод when для каждого из них.
После всех этих шагов мы восстановим всю историю данного агрегата.
Тем самым мы приведем агрегат в его последнее состояние.

// AggregateRoot contains all methods of AggregateBase
type AggregateRoot interface {
    GetID() string
    SetID(id string) *AggregateBase
    GetType() AggregateType
    SetType(aggregateType AggregateType)
    GetChanges() []any
    ClearChanges()
    GetVersion() uint64
    ToSnapshot()
    String() string
    Load
    Apply
    RaiseEvent
}

// AggregateType type of the Aggregate
type AggregateType string

// AggregateBase base aggregate contains all main necessary fields
type AggregateBase struct {
    ID      string
    Version uint64
    Changes []any
    Type    AggregateType
    when    when
}

func NewAggregateBase(when when) *AggregateBase {
    if when == nil {
        return nil
    }

    return &AggregateBase{
        Version: startVersion,
        Changes: make([]any, 0, changesEventsCap),
        when:    when,
    }
}

// ClearChanges clear AggregateBase uncommitted Event's
func (a *AggregateBase) ClearChanges() {
    a.Changes = make([]any, 0, changesEventsCap)
}

// GetChanges get AggregateBase uncommitted Event's
func (a *AggregateBase) GetChanges() []any {
    return a.Changes
}

// Load add existing events from event store to aggregate using When interface method
func (a *AggregateBase) Load(events []any) error {
    for _, evt := range events {
        if err := a.when(evt); err != nil {
            return err
        }
        a.Version++
    }
    return nil
}

// Apply push event to aggregate uncommitted events using When method
func (a *AggregateBase) Apply(event any) error {
    if err := a.when(event); err != nil {
        return err
    }
    a.Version++
    a.Changes = append(a.Changes, event)
    return nil
}

// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event any) error {
    if err := a.when(event); err != nil {
        return err
    }
    a.Version++
    return nil
}

Вход в полноэкранный режим Выход из полноэкранного режима

Событие представляет собой факт, имевший место в домене. Они являются источником истины, текущее состояние определяется на основе событий.
События неизменяемы и представляют собой факты бизнеса.
В Event Sourcing каждая операция, выполненная над совокупностью, должна привести к новому событию.
Событие представляет собой факт, произошедший в домене. Они являются источником истины, ваше текущее состояние выводится из событий.
Они неизменяемы и представляют факты бизнеса.
Это означает, что мы никогда ничего не меняем и не удаляем в базе данных, а только добавляем новые события.

type Event struct {
    EventID       string
    AggregateID   string
    EventType     EventType
    AggregateType AggregateType
    Version       uint64
    Data          []byte
    Metadata      []byte
    Timestamp     time.Time
}
Войти в полноэкранный режим Выход из полноэкранного режима

Снимки — это представление текущего состояния в определенный «момент времени».
Если мы будем следовать шаблону Event Sourcing буквально, нам нужно получить все эти транзакции, чтобы вычислить баланс текущего счета.
Это будет неэффективно. Ваша первая мысль, чтобы сделать это более эффективным, может быть кэширование последнего состояния где-нибудь.
Вместо того чтобы получать все эти события, мы можем получить одну запись и использовать ее для нашей бизнес-логики. Это и есть моментальный снимок.
Общая логика: читаем снимок (если он существует), затем читаем события из EventStore,
если снапшот существует, читаем события с момента последней ревизии потока, для которого был создан снапшот, в противном случае читаем все события.
В нашем микросервисе мы храним моментальный снимок каждого N-го количества событий.
Снимки могут быть не нужны, так как производительность может быть достаточно высокой.

type Snapshot struct {
    ID      string        `json:"id"`
    Type    AggregateType `json:"type"`
    State   []byte        `json:"state"`
    Version uint64        `json:"version"`
}
Вход в полноэкранный режим Выход из полноэкранного режима

Хранилище событий является ключевым элементом системы. Каждое изменение, произошедшее в домене, записывается в базу данных.
Она специально предназначена для хранения истории изменений, состояние представлено журналом событий.
События неизменяемы: они не могут быть изменены.
Реализация AggregateStore — это методы Load, Save и Exists.
Load и Save принимают агрегат, а затем загружают или применяют события с помощью клиента EventStoreDB.
Метод Load: выяснить имя потока для агрегата, прочитать все события из потока агрегата,
перебирает все события и вызывает обработчик RaiseEvent для каждого из них.
А метод Save сохраняет агрегаты, сохраняя историю изменений, обрабатывая параллелизм,
Когда вы получаете поток из EventStore, вы записываете номер текущей версии,
затем, когда вы сохраняете его обратно, вы можете определить, не изменил ли кто-то другой запись за это время.

// Load es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Load(ctx context.Context, aggregate Aggregate) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Load")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    snapshot, err := p.GetSnapshot(ctx, aggregate.GetID())
    if err != nil && !errors.Is(err, pgx.ErrNoRows) {
        return tracing.TraceWithErr(span, err)
    }

    if snapshot != nil {
        if err := serializer.Unmarshal(snapshot.State, aggregate); err != nil {
            p.log.Errorf("(Load) serializer.Unmarshal err: %v", err)
            return tracing.TraceWithErr(span, errors.Wrap(err, "json.Unmarshal"))
        }

        err := p.loadAggregateEventsByVersion(ctx, aggregate)
        if err != nil {
            return err
        }

        p.log.Debugf("(Load Aggregate By Version) aggregate: %s", aggregate.String())
        span.LogFields(log.String("aggregate with events", aggregate.String()))
        return nil
    }

    err = p.loadEvents(ctx, aggregate)
    if err != nil {
        return err
    }

    p.log.Debugf("(Load Aggregate): aggregate: %s", aggregate.String())
    span.LogFields(log.String("aggregate with events", aggregate.String()))
    return nil
}

// Save es.Aggregate events using snapshots with given frequency
func (p *pgEventStore) Save(ctx context.Context, aggregate Aggregate) (err error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "pgEventStore.Save")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    if len(aggregate.GetChanges()) == 0 {
        p.log.Debug("(Save) aggregate.GetChanges()) == 0")
        span.LogFields(log.Int("events", len(aggregate.GetChanges())))
        return nil
    }

    tx, err := p.db.Begin(ctx)
    if err != nil {
        p.log.Errorf("(Save) db.Begin err: %v", err)
        return tracing.TraceWithErr(span, errors.Wrap(err, "db.Begin"))
    }

    defer func() {
        if tx != nil {
            if txErr := tx.Rollback(ctx); txErr != nil && !errors.Is(txErr, pgx.ErrTxClosed) {
                err = txErr
                tracing.TraceErr(span, err)
                return
            }
        }
    }()

    changes := aggregate.GetChanges()
    events := make([]Event, 0, len(changes))

    for i := range changes {
        event, err := p.serializer.SerializeEvent(aggregate, changes[i])
        if err != nil {
            p.log.Errorf("(Save) serializer.SerializeEvent err: %v", err)
            return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.SerializeEvent"))
        }
        events = append(events, event)
    }

    if err := p.saveEventsTx(ctx, tx, events); err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "saveEventsTx"))
    }

    if aggregate.GetVersion()%p.cfg.SnapshotFrequency == 0 {
        aggregate.ToSnapshot()
        if err := p.saveSnapshotTx(ctx, tx, aggregate); err != nil {
            return tracing.TraceWithErr(span, errors.Wrap(err, "saveSnapshotTx"))
        }
    }

    if err := p.processEvents(ctx, events); err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "processEvents"))
    }

    p.log.Debugf("(Save Aggregate): aggregate: %s", aggregate.String())
    span.LogFields(log.String("aggregate with events", aggregate.String()))
    return tx.Commit(ctx)
}
Вход в полноэкранный режим Выход из полноэкранного режима

Для сериализации и десериализации событий нам нужна реализация интерфейса Serializer:

type Serializer interface {
    SerializeEvent(aggregate Aggregate, event any) (Event, error)
    DeserializeEvent(event Event) (any, error)
}
Вход в полноэкранный режим Выход из полноэкранного режима

реализация для агрегации банковских счетов:

type eventSerializer struct {
}

func NewEventSerializer() *eventSerializer {
    return &eventSerializer{}
}

func (s *eventSerializer) SerializeEvent(aggregate es.Aggregate, event any) (es.Event, error) {
    eventBytes, err := serializer.Marshal(event)
    if err != nil {
        return es.Event{}, errors.Wrapf(err, "serializer.Marshal aggregateID: %s", aggregate.GetID())
    }

    switch evt := event.(type) {

    case *events.BankAccountCreatedEventV1:
        return es.NewEvent(aggregate, events.BankAccountCreatedEventType, eventBytes, evt.Metadata), nil

    case *events.BalanceDepositedEventV1:
        return es.NewEvent(aggregate, events.BalanceDepositedEventType, eventBytes, evt.Metadata), nil

    case *events.BalanceWithdrawnEventV1:
        return es.NewEvent(aggregate, events.BalanceWithdrawnEventType, eventBytes, evt.Metadata), nil

    case *events.EmailChangedEventV1:
        return es.NewEvent(aggregate, events.EmailChangedEventType, eventBytes, evt.Metadata), nil

    default:
        return es.Event{}, errors.Wrapf(ErrInvalidEvent, "aggregateID: %s, type: %T", aggregate.GetID(), event)
    }

}

func (s *eventSerializer) DeserializeEvent(event es.Event) (any, error) {
    switch event.GetEventType() {

    case events.BankAccountCreatedEventType:
        return deserializeEvent(event, new(events.BankAccountCreatedEventV1))

    case events.BalanceDepositedEventType:
        return deserializeEvent(event, new(events.BalanceDepositedEventV1))

    case events.BalanceWithdrawnEventType:
        return deserializeEvent(event, new(events.BalanceWithdrawnEventV1))

    case events.EmailChangedEventType:
        return deserializeEvent(event, new(events.EmailChangedEventV1))

    default:
        return nil, errors.Wrapf(ErrInvalidEvent, "type: %s", event.GetEventType())
    }
}

Войти в полноэкранный режим Выход из полноэкранного режима

Для следующего шага создадим агрегат банковских счетов:

const (
    BankAccountAggregateType es.AggregateType = "BankAccount"
)

type BankAccountAggregate struct {
    *es.AggregateBase
    BankAccount *BankAccount
}

func NewBankAccountAggregate(id string) *BankAccountAggregate {
    if id == "" {
        return nil
    }

    bankAccountAggregate := &BankAccountAggregate{BankAccount: NewBankAccount(id)}
    aggregateBase := es.NewAggregateBase(bankAccountAggregate.When)
    aggregateBase.SetType(BankAccountAggregateType)
    aggregateBase.SetID(id)
    bankAccountAggregate.AggregateBase = aggregateBase
    return bankAccountAggregate
}

func (a *BankAccountAggregate) When(event any) error {

    switch evt := event.(type) {

    case *events.BankAccountCreatedEventV1:
        a.BankAccount.Email = evt.Email
        a.BankAccount.Address = evt.Address
        a.BankAccount.Balance = evt.Balance
        a.BankAccount.FirstName = evt.FirstName
        a.BankAccount.LastName = evt.LastName
        a.BankAccount.Status = evt.Status
        return nil

    case *events.BalanceDepositedEventV1:
        return a.BankAccount.DepositBalance(evt.Amount)

    case *events.BalanceWithdrawnEventV1:
        return a.BankAccount.WithdrawBalance(evt.Amount)

    case *events.EmailChangedEventV1:
        a.BankAccount.Email = evt.Email
        return nil

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "event: %#v", event)
    }
}

func (a *BankAccountAggregate) CreateNewBankAccount(ctx context.Context, email, address, firstName, lastName, status string, amount int64) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.CreateNewBankAccount")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount < 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BankAccountCreatedEventV1{
        Email:     email,
        Address:   address,
        FirstName: firstName,
        LastName:  lastName,
        Balance:   money.New(amount, money.USD),
        Status:    status,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) DepositBalance(ctx context.Context, amount int64, paymentID string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.DepositBalance")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount <= 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BalanceDepositedEventV1{
        Amount:    amount,
        PaymentID: paymentID,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) WithdrawBalance(ctx context.Context, amount int64, paymentID string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.WithdrawBalance")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if amount <= 0 {
        return errors.Wrapf(bankAccountErrors.ErrInvalidBalanceAmount, "amount: %d", amount)
    }

    balance, err := money.New(a.BankAccount.Balance.Amount(), money.USD).Subtract(money.New(amount, money.USD))
    if err != nil {
        return errors.Wrapf(err, "Balance.Subtract amount: %d", amount)
    }

    if balance.IsNegative() {
        return errors.Wrapf(bankAccountErrors.ErrNotEnoughBalance, "amount: %d", amount)
    }

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.BalanceWithdrawnEventV1{
        Amount:    amount,
        PaymentID: paymentID,
        Metadata:  metaDataBytes,
    }

    return a.Apply(event)
}

func (a *BankAccountAggregate) ChangeEmail(ctx context.Context, email string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "BankAccountAggregate.ChangeEmail")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    metaDataBytes, err := serializer.Marshal(tracing.ExtractTextMapCarrier(span.Context()))
    if err != nil {
        return errors.Wrap(err, "serializer.Marshal")
    }

    event := &events.EmailChangedEventV1{Email: email, Metadata: metaDataBytes}

    return a.Apply(event)
}
Войти в полноэкранный режим Выход из полноэкранного режима

Наш микросервис принимает HTTP и gRPC запросы:
REST-контроллер банковского счета, который принимает запросы, проверяет их с помощью валидатора,
затем вызывают службу команд или запросов.
Основной причиной роста популярности CQRS является возможность обрабатывать чтение
и записи отдельно из-за серьезных различий в методах оптимизации для этих гораздо более разных операций.
В качестве http-фреймворка Go используется echo.

Http-обработчики:

func (h *bankAccountHandlers) CreateBankAccount() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.CreateBankAccount")
        defer span.Finish()
        h.metrics.HttpCreateBankAccountRequests.Inc()

        var command commands.CreateBankAccountCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        command.AggregateID = uuid.NewV4().String()

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(CreateBankAccount.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(BankAccount created) id: %s", command.AggregateID)
        return c.JSON(http.StatusCreated, command.AggregateID)
    }
}

func (h *bankAccountHandlers) DepositBalance() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.DepositBalance")
        defer span.Finish()
        h.metrics.HttpDepositBalanceRequests.Inc()

        var command commands.DepositBalanceCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(DepositBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance deposited) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) WithdrawBalance() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
        defer span.Finish()
        h.metrics.HttpWithdrawBalanceRequests.Inc()

        var command commands.WithdrawBalanceCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(WithdrawBalance.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) ChangeEmail() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.WithdrawBalance")
        defer span.Finish()
        h.metrics.HttpChangeEmailRequests.Inc()

        var command commands.ChangeEmailCommand
        if err := c.Bind(&command); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        command.AggregateID = c.Param(constants.ID)

        if err := h.validate.StructCtx(ctx, command); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        err := h.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", command.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(balance withdraw) id: %s, amount: %d", command.AggregateID)
        return c.NoContent(http.StatusOK)
    }
}

func (h *bankAccountHandlers) GetByID() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.GetByID")
        defer span.Finish()
        h.metrics.HttpGetBuIdRequests.Inc()

        var query queries.GetBankAccountByIDQuery
        if err := c.Bind(&query); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query.AggregateID = c.Param(constants.ID)

        fromStore := c.QueryParam("store")
        if fromStore != "" {
            isFromStore, err := strconv.ParseBool(fromStore)
            if err != nil {
                h.log.Errorf("strconv.ParseBool err: %v", tracing.TraceWithErr(span, err))
                return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
            }
            query.FromEventStore = isFromStore
        }

        if err := h.validate.StructCtx(ctx, query); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        bankAccountProjection, err := h.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
        if err != nil {
            h.log.Errorf("(ChangeEmail.Handle) id: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(get bank account) id: %s", bankAccountProjection.AggregateID)
        return c.JSON(http.StatusOK, mappers.BankAccountMongoProjectionToHttp(bankAccountProjection))
    }
}

func (h *bankAccountHandlers) Search() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "bankAccountHandlers.Search")
        defer span.Finish()
        h.metrics.HttpSearchRequests.Inc()

        var query queries.SearchBankAccountsQuery
        if err := c.Bind(&query); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        query.QueryTerm = c.QueryParam("search")
        query.Pagination = utils.NewPaginationFromQueryParams(c.QueryParam(constants.Size), c.QueryParam(constants.Page))

        if err := h.validate.StructCtx(ctx, query); err != nil {
            h.log.Errorf("(validate) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        searchResult, err := h.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
        if err != nil {
            h.log.Errorf("(SearchBankAccounts.Handle) id: %s, err: %v", query.QueryTerm, tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        response := mappers.SearchResultToHttp(searchResult.List, searchResult.PaginationResponse)

        h.log.Infof("(search) result: %+v", response)
        return c.JSON(http.StatusOK, response)
    }
}
Вход в полноэкранный режим Выход из полноэкранного режима

Могу порекомендовать bloomrpc — это хороший GUI клиент для GRPC сервисов.
Обработчики служб GRPC:

func (g *grpcService) CreateBankAccount(ctx context.Context, request *bankAccountService.CreateBankAccountRequest) (*bankAccountService.CreateBankAccountResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.CreateBankAccount")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcCreateBankAccountRequests.Inc()

    aggregateID := uuid.NewV4().String()
    command := commands.CreateBankAccountCommand{
        AggregateID: aggregateID,
        Email:       request.GetEmail(),
        Address:     request.GetAddress(),
        FirstName:   request.GetFirstName(),
        LastName:    request.GetLastName(),
        Balance:     request.GetBalance(),
        Status:      request.GetStatus(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.CreateBankAccount.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(CreateBankAccount.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [created account] aggregateID: %s", aggregateID)
    return &bankAccountService.CreateBankAccountResponse{Id: aggregateID}, nil
}

func (g *grpcService) DepositBalance(ctx context.Context, request *bankAccountService.DepositBalanceRequest) (*bankAccountService.DepositBalanceResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.DepositBalance")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcDepositBalanceRequests.Inc()

    command := commands.DepositBalanceCommand{
        AggregateID: request.GetId(),
        Amount:      request.GetAmount(),
        PaymentID:   request.GetPaymentId(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.DepositBalance.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(DepositBalance.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [deposited balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
    return new(bankAccountService.DepositBalanceResponse), nil
}

func (g *grpcService) WithdrawBalance(ctx context.Context, request *bankAccountService.WithdrawBalanceRequest) (*bankAccountService.WithdrawBalanceResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.WithdrawBalance")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcWithdrawBalanceRequests.Inc()

    command := commands.WithdrawBalanceCommand{
        AggregateID: request.GetId(),
        Amount:      request.GetAmount(),
        PaymentID:   request.GetPaymentId(),
    }

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.WithdrawBalance.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(WithdrawBalance.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [withdraw balance] aggregateID: %s, amount: %v", request.GetId(), request.GetAmount())
    return new(bankAccountService.WithdrawBalanceResponse), nil
}

func (g *grpcService) ChangeEmail(ctx context.Context, request *bankAccountService.ChangeEmailRequest) (*bankAccountService.ChangeEmailResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.ChangeEmail")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcChangeEmailRequests.Inc()

    command := commands.ChangeEmailCommand{AggregateID: request.GetId(), NewEmail: request.GetEmail()}

    if err := g.validate.StructCtx(ctx, command); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    err := g.bankAccountService.Commands.ChangeEmail.Handle(ctx, command)
    if err != nil {
        g.log.Errorf("(ChangeEmail.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [changed email] aggregateID: %s, newEmail: %s", request.GetId(), request.GetEmail())
    return new(bankAccountService.ChangeEmailResponse), nil
}

func (g *grpcService) GetById(ctx context.Context, request *bankAccountService.GetByIdRequest) (*bankAccountService.GetByIdResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.GetById")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcGetBuIdRequests.Inc()

    query := queries.GetBankAccountByIDQuery{AggregateID: request.GetId(), FromEventStore: request.IsOwner}

    if err := g.validate.StructCtx(ctx, query); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    bankAccountProjection, err := g.bankAccountService.Queries.GetBankAccountByID.Handle(ctx, query)
    if err != nil {
        g.log.Errorf("(GetBankAccountByID.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [get account by id] projection: %+v", bankAccountProjection)
    return &bankAccountService.GetByIdResponse{BankAccount: mappers.BankAccountMongoProjectionToProto(bankAccountProjection)}, nil
}

func (g *grpcService) SearchBankAccounts(ctx context.Context, request *bankAccountService.SearchBankAccountsRequest) (*bankAccountService.SearchBankAccountsResponse, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "grpcService.SearchBankAccounts")
    defer span.Finish()
    span.LogFields(log.String("req", request.String()))
    g.metrics.GrpcSearchRequests.Inc()

    query := queries.SearchBankAccountsQuery{
        QueryTerm: request.GetSearchText(),
        Pagination: &utils.Pagination{
            Size: int(request.GetSize()),
            Page: int(request.GetPage()),
        },
    }

    if err := g.validate.StructCtx(ctx, query); err != nil {
        g.log.Errorf("validation err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    searchQueryResult, err := g.bankAccountService.Queries.SearchBankAccounts.Handle(ctx, query)
    if err != nil {
        g.log.Errorf("(SearchBankAccounts.Handle) err: %v", err)
        return nil, grpc_errors.ErrResponse(tracing.TraceWithErr(span, err))
    }

    g.log.Infof("(grpcService) [search] result: %+vv", searchQueryResult.PaginationResponse)
    return &bankAccountService.SearchBankAccountsResponse{
        BankAccounts: mappers.SearchBankAccountsListToProto(searchQueryResult.List),
        Pagination:   mappers.PaginationResponseToProto(searchQueryResult.PaginationResponse),
    }, nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

Основным атрибутом команды является то, что при ее успешном выполнении система переходит в новое состояние.
Обработчики команд отвечают за обработку команд, изменение состояния или выполнение других побочных эффектов.
Служба команд обрабатывает команды cqrs, загружает агрегат из хранилища событий и вызывает его методы, зависящие от бизнес-логики приложения,
агрегат применяет эти изменения, а затем мы сохраняем список изменений этих событий в хранилище событий.

Команда создания банковского счета здесь простая, но в реальном мире, конечно, бизнес-логика намного сложнее, мы должны проверить доступность электронной почты и т.д.

type CreateBankAccountCommand struct {
    AggregateID string `json:"id" validate:"required,gte=0"`
    Email       string `json:"email" validate:"required,gte=0,email"`
    Address     string `json:"address" validate:"required,gte=0"`
    FirstName   string `json:"firstName" validate:"required,gte=0"`
    LastName    string `json:"lastName" validate:"required,gte=0"`
    Balance     int64  `json:"balance" validate:"gte=0"`
    Status      string `json:"status"`
}

type CreateBankAccount interface {
    Handle(ctx context.Context, cmd CreateBankAccountCommand) error
}

type createBankAccountCmdHandler struct {
    log            logger.Logger
    aggregateStore es.AggregateStore
}

func NewCreateBankAccountCmdHandler(log logger.Logger, aggregateStore es.AggregateStore) *createBankAccountCmdHandler {
    return &createBankAccountCmdHandler{log: log, aggregateStore: aggregateStore}
}

func (c *createBankAccountCmdHandler) Handle(ctx context.Context, cmd CreateBankAccountCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createBankAccountCmdHandler.Handle")
    defer span.Finish()
    span.LogFields(log.Object("command", cmd))

    exists, err := c.aggregateStore.Exists(ctx, cmd.AggregateID)
    if err != nil {
        return tracing.TraceWithErr(span, err)
    }
    if exists {
        return tracing.TraceWithErr(span, errors.New("already exists"))
    }

    bankAccountAggregate := domain.NewBankAccountAggregate(cmd.AggregateID)
    err = bankAccountAggregate.CreateNewBankAccount(ctx, cmd.Email, cmd.Address, cmd.FirstName, cmd.LastName, cmd.Status, cmd.Balance)
    if err != nil {
        return tracing.TraceWithErr(span, err)
    }

    return c.aggregateStore.Save(ctx, bankAccountAggregate)
}
Вход в полноэкранный режим Выход из полноэкранного режима

В Event Sourcing проекции (также известные как модели представления или модели запросов) обеспечивают представление основной модели данных, основанной на событиях.
Часто они представляют логику преобразования исходной модели записи в модель чтения.
Идея заключается в том, что проекция получает все события, которые она может спроецировать, и выполняет обычные CRUD-операции над моделью чтения, которой она управляет,
используя обычные операции CRUD, предоставляемые базой данных модели чтения.
Проекции не ограничиваются обработкой событий только одной сущности и могут собирать и агрегировать данные для нескольких сущностей, даже для разных типов сущностей.
События, добавленные в хранилище событий, запускают логику проекции, которая создает или обновляет модель чтения.
Мы можем подписаться на наши проекции для событий потока типа «заказ».
Когда мы выполняем команду, агрегат генерирует новое событие, которое представляет переходы состояния агрегата.
Эти события фиксируются в хранилище, поэтому хранилище добавляет их в конец потока агрегата.
Проекция получает эти события и обновляет свои модели чтения, используя метод When, как и агрегат, он применяет изменения в зависимости от типа события:

func (s *mongoSubscription) ProcessMessagesErrGroup(ctx context.Context, r *kafka.Reader, workerID int) error {

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        m, err := r.FetchMessage(ctx)
        if err != nil {
            s.log.Warnf("(mongoSubscription) workerID: %d, err: %v", workerID, err)
            continue
        }

        switch m.Topic {
        case es.GetTopicName(s.cfg.KafkaPublisherConfig.TopicPrefix, string(domain.BankAccountAggregateType)):
            s.handleBankAccountEvents(ctx, r, m)
        }
    }
}

func (s *mongoSubscription) handleBankAccountEvents(ctx context.Context, r *kafka.Reader, m kafka.Message) {
    ctx, span := tracing.StartKafkaConsumerTracerSpan(ctx, m.Headers, "mongoSubscription.handleBankAccountEvents")
    defer span.Finish()

    var events []es.Event
    if err := serializer.Unmarshal(m.Value, &events); err != nil {
        s.log.Errorf("serializer.Unmarshal: %v", tracing.TraceWithErr(span, err))
        s.commitErrMessage(ctx, r, m)
        return
    }

    for _, event := range events {
        if err := s.handle(ctx, r, m, event); err != nil {
            return
        }
    }
    s.commitMessage(ctx, r, m)
}

func (s *mongoSubscription) handle(ctx context.Context, r *kafka.Reader, m kafka.Message, event es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoSubscription.handle")
    defer span.Finish()

    err := s.projection.When(ctx, event)
    if err != nil {
        s.log.Errorf("MongoSubscription When err: %v", err)

        recreateErr := s.recreateProjection(ctx, event)
        if recreateErr != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(recreateErr, "recreateProjection err: %v", err))
        }

        s.commitErrMessage(ctx, r, m)
        return tracing.TraceWithErr(span, errors.Wrapf(err, "When type: %s, aggregateID: %s", event.GetEventType(), event.GetAggregateID()))
    }

    s.log.Infof("MongoSubscription <<<commit>>> event: %s", event.String())
    return nil
}
Вход в полноэкранный режим Выход из полноэкранного режима


Проекция MongoDB обрабатывает события, реализуя метод When,
обрабатывает события и применяет изменения подобно aggregate, затем сохраняет их в хранилище mongodb:

type bankAccountMongoProjection struct {
    log             logger.Logger
    cfg             *config.Config
    serializer      es.Serializer
    mongoRepository domain.MongoRepository
}

func NewBankAccountMongoProjection(
    log logger.Logger,
    cfg *config.Config,
    serializer es.Serializer,
    mongoRepository domain.MongoRepository,
) *bankAccountMongoProjection {
    return &bankAccountMongoProjection{log: log, cfg: cfg, serializer: serializer, mongoRepository: mongoRepository}
}

func (b *bankAccountMongoProjection) When(ctx context.Context, esEvent es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.When")
    defer span.Finish()

    deserializedEvent, err := b.serializer.DeserializeEvent(esEvent)
    if err != nil {
        return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
    }

    switch event := deserializedEvent.(type) {

    case *events.BankAccountCreatedEventV1:
        return b.onBankAccountCreated(ctx, esEvent, event)

    case *events.BalanceDepositedEventV1:
        return b.onBalanceDeposited(ctx, esEvent, event)

    case *events.BalanceWithdrawnEventV1:
        return b.onBalanceWithdrawn(ctx, esEvent, event)

    case *events.EmailChangedEventV1:
        return b.onEmailChanged(ctx, esEvent, event)

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
    }
}

func (b *bankAccountMongoProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBankAccountCreated")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if esEvent.GetVersion() != 1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
    }

    projection := &domain.BankAccountMongoProjection{
        AggregateID: esEvent.GetAggregateID(),
        Version:     esEvent.GetVersion(),
        Email:       event.Email,
        Address:     event.Address,
        FirstName:   event.FirstName,
        LastName:    event.LastName,
        Balance: domain.Balance{
            Amount:   event.Balance.AsMajorUnits(),
            Currency: event.Balance.Currency().Code,
        },
        Status:    event.Status,
        UpdatedAt: time.Now().UTC(),
        CreatedAt: time.Now().UTC(),
    }

    err := b.mongoRepository.Insert(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBankAccountCreated] mongoRepository.Insert aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBankAccountCreated] projection: %#v", projection)
    return nil
}

func (b *bankAccountMongoProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceDeposited")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
        projection.Version = esEvent.GetVersion()
        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBalanceDeposited] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}

func (b *bankAccountMongoProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onBalanceWithdrawn")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
        projection.Version = esEvent.GetVersion()

        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onBalanceWithdrawn] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}

func (b *bankAccountMongoProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoProjection.onEmailChanged")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if err := b.mongoRepository.UpdateConcurrently(ctx, esEvent.GetAggregateID(), func(projection *domain.BankAccountMongoProjection) *domain.BankAccountMongoProjection {
        projection.Email = event.Email
        projection.Version = esEvent.GetVersion()
        return projection
    }, esEvent.GetVersion()-1); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] mongoRepository.UpdateConcurrently aggregateID: %s", esEvent.GetAggregateID()))
    }

    b.log.Infof("[onEmailChanged] aggregateID: %s, eventType: %s, version: %d", esEvent.GetAggregateID(), esEvent.GetEventType(), esEvent.GetVersion())
    return nil
}
Войти в полноэкранный режим Выход из полноэкранного режима

Проекция ElasticSearch делает то же самое, индексирует документы для поиска:

type elasticProjection struct {
    log               logger.Logger
    cfg               *config.Config
    serializer        es.Serializer
    elasticSearchRepo domain.ElasticSearchRepository
}

func NewElasticProjection(log logger.Logger, cfg *config.Config, serializer es.Serializer, elasticSearchRepo domain.ElasticSearchRepository) *elasticProjection {
    return &elasticProjection{log: log, cfg: cfg, serializer: serializer, elasticSearchRepo: elasticSearchRepo}
}

func (e *elasticProjection) When(ctx context.Context, esEvent es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.When")
    defer span.Finish()

    deserializedEvent, err := e.serializer.DeserializeEvent(esEvent)
    if err != nil {
        return errors.Wrapf(err, "serializer.DeserializeEvent aggregateID: %s, type: %s", esEvent.GetAggregateID(), esEvent.GetEventType())
    }

    switch event := deserializedEvent.(type) {

    case *events.BankAccountCreatedEventV1:
        return e.onBankAccountCreated(ctx, esEvent, event)

    case *events.BalanceDepositedEventV1:
        return e.onBalanceDeposited(ctx, esEvent, event)

    case *events.BalanceWithdrawnEventV1:
        return e.onBalanceWithdrawn(ctx, esEvent, event)

    case *events.EmailChangedEventV1:
        return e.onEmailChanged(ctx, esEvent, event)

    default:
        return errors.Wrapf(bankAccountErrors.ErrUnknownEventType, "esEvent: %s", esEvent.String())
    }
}

func (e *elasticProjection) onBankAccountCreated(ctx context.Context, esEvent es.Event, event *events.BankAccountCreatedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBankAccountCreated")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    if esEvent.GetVersion() != 1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, version: %d", esEvent.GetEventType(), esEvent.GetVersion())
    }

    projection := &domain.ElasticSearchProjection{
        ID:          esEvent.GetAggregateID(),
        AggregateID: esEvent.GetAggregateID(),
        Version:     esEvent.GetVersion(),
        Email:       event.Email,
        Address:     event.Address,
        FirstName:   event.FirstName,
        LastName:    event.LastName,
        Balance: domain.Balance{
            Amount:   event.Balance.AsMajorUnits(),
            Currency: event.Balance.Currency().Code,
        },
        Status:    event.Status,
        UpdatedAt: time.Now().UTC(),
        CreatedAt: time.Now().UTC(),
    }

    err := e.elasticSearchRepo.Index(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.Index aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBankAccountCreated] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onBalanceDeposited(ctx context.Context, esEvent es.Event, event *events.BalanceDepositedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceDeposited")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceDeposited] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Balance.Amount += money.New(event.Amount, money.USD).AsMajorUnits()
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBalanceDeposited] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onBalanceWithdrawn(ctx context.Context, esEvent es.Event, event *events.BalanceWithdrawnEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onBalanceWithdrawn")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Balance.Amount -= money.New(event.Amount, money.USD).AsMajorUnits()
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onBalanceWithdrawn] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onBalanceWithdrawn] projection: %s", projection)
    return nil
}

func (e *elasticProjection) onEmailChanged(ctx context.Context, esEvent es.Event, event *events.EmailChangedEventV1) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticProjection.onEmailChanged")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", esEvent.GetAggregateID()))

    projection, err := e.elasticSearchRepo.GetByAggregateID(ctx, esEvent.GetAggregateID())
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.GetByAggregateID aggregateID: %s", esEvent.GetAggregateID()))
    }

    if err := e.validateEventVersion(projection.Version, esEvent); err != nil {
        return tracing.TraceWithErr(span, err)
    }

    projection.Email = event.Email
    projection.Version = esEvent.GetVersion()

    if err := e.elasticSearchRepo.Update(ctx, projection); err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[onEmailChanged] elasticSearchRepo.Update aggregateID: %s", esEvent.GetAggregateID()))
    }

    e.log.Infof("ElasticSearch when [onEmailChanged] projection: %s", projection)
    return nil
}

func (e *elasticProjection) validateEventVersion(version uint64, esEvent es.Event) error {
    if version != esEvent.GetVersion()-1 {
        return errors.Wrapf(es.ErrInvalidEventVersion, "type: %s, eventVersion: %d, projectionVersion: %d", esEvent.GetEventType(), esEvent.GetVersion(), version)
    }
    return nil
}
Войти в полноэкранный режим Выход из полноэкранного режима

Запросы в CQRS представляют собой намерение получить данные и отвечают за возврат результата запрошенного запроса.
Модель чтения может быть, но не обязательно должна быть, производной от модели записи.
Это преобразование результатов бизнес-операции в читаемую форму.
Одним из замечательных результатов использования системы, основанной на событиях, является возможность создавать новые модели чтения по своему усмотрению,
в любое время, не затрагивая ничего другого.
Затем мы можем получить данные проекции с помощью запроса:

Get bank account by id Запрос может загрузить его из mongodb или агрегатного хранилища, если это необходимо:

type GetBankAccountByIDQuery struct {
    AggregateID    string `json:"aggregateID" validate:"required,gte=0"`
    FromEventStore bool   `json:"fromEventStore"`
}

type GetBankAccountByID interface {
    Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error)
}

type getBankAccountByIDQuery struct {
    log             logger.Logger
    aggregateStore  es.AggregateStore
    mongoRepository domain.MongoRepository
}

func NewGetBankAccountByIDQuery(log logger.Logger, aggregateStore es.AggregateStore, mongoRepository domain.MongoRepository) *getBankAccountByIDQuery {
    return &getBankAccountByIDQuery{log: log, aggregateStore: aggregateStore, mongoRepository: mongoRepository}
}

func (q *getBankAccountByIDQuery) Handle(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.Handle")
    defer span.Finish()
    span.LogFields(log.Object("query", query))

    if query.FromEventStore {
        return q.loadFromAggregateStore(ctx, query)
    }

    projection, err := q.mongoRepository.GetByAggregateID(ctx, query.AggregateID)
    if err != nil {
        if errors.Is(err, mongo.ErrNoDocuments) {
            bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
            if err = q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
                return nil, tracing.TraceWithErr(span, err)
            }
            if bankAccountAggregate.GetVersion() == 0 {
                return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
            }

            mongoProjection := mappers.BankAccountToMongoProjection(bankAccountAggregate)
            err = q.mongoRepository.Upsert(ctx, mongoProjection)
            if err != nil {
                q.log.Errorf("(GetBankAccountByIDQuery) mongo Upsert AggregateID: %s, err: %v", query.AggregateID, tracing.TraceWithErr(span, err))
            }
            q.log.Debugf("(GetBankAccountByIDQuery) Upsert %+v", query)
            return mongoProjection, nil

        }
        return nil, tracing.TraceWithErr(span, err)
    }

    q.log.Debugf("(GetBankAccountByIDQuery) from mongo %+v", query)
    return projection, nil
}

func (q *getBankAccountByIDQuery) loadFromAggregateStore(ctx context.Context, query GetBankAccountByIDQuery) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "getBankAccountByIDQuery.loadFromAggregateStore")
    defer span.Finish()

    bankAccountAggregate := domain.NewBankAccountAggregate(query.AggregateID)
    if err := q.aggregateStore.Load(ctx, bankAccountAggregate); err != nil {
        return nil, tracing.TraceWithErr(span, err)
    }
    if bankAccountAggregate.GetVersion() == 0 {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(bankAccountErrors.ErrBankAccountNotFound, "id: %s", query.AggregateID))
    }

    q.log.Debugf("(GetBankAccountByIDQuery) from aggregateStore bankAccountAggregate: %+v", bankAccountAggregate.BankAccount)
    return mappers.BankAccountToMongoProjection(bankAccountAggregate), nil
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Банковский счет методы репозитория Mongo с помощью официального клиента mongo:

type bankAccountMongoRepository struct {
    log logger.Logger
    cfg *config.Config
    db  *mongo.Client
}

func NewBankAccountMongoRepository(log logger.Logger, cfg *config.Config, db *mongo.Client) *bankAccountMongoRepository {
    return &bankAccountMongoRepository{log: log, cfg: cfg, db: db}
}

func (b *bankAccountMongoRepository) Insert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Insert")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    _, err := b.bankAccountsCollection().InsertOne(ctx, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[InsertOne] AggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Insert] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) Update(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.ID = ""
    projection.UpdatedAt = time.Now().UTC()

    ops := options.FindOneAndUpdate()
    ops.SetReturnDocument(options.After)
    ops.SetUpsert(false)
    filter := bson.M{constants.MongoAggregateID: projection.AggregateID}

    err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Update] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) UpdateConcurrently(ctx context.Context, aggregateID string, updateCb domain.UpdateProjectionCallback, expectedVersion uint64) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    session, err := b.db.StartSession()
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "StartSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
    }
    defer session.EndSession(ctx)

    err = mongo.WithSession(ctx, session, func(sessionContext mongo.SessionContext) error {
        if err := session.StartTransaction(); err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "StartTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        filter := bson.M{constants.MongoAggregateID: aggregateID}
        foundProjection := &domain.BankAccountMongoProjection{}

        err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(foundProjection)
        if err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        if foundProjection.Version != expectedVersion {
            return tracing.TraceWithErr(span, errors.Wrapf(es.ErrInvalidEventVersion, "[FindOne] aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }

        foundProjection = updateCb(foundProjection)

        foundProjection.ID = ""
        foundProjection.UpdatedAt = time.Now().UTC()

        ops := options.FindOneAndUpdate()
        ops.SetReturnDocument(options.After)
        ops.SetUpsert(false)
        filter = bson.M{constants.MongoAggregateID: foundProjection.AggregateID}

        err = b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": foundProjection}, ops).Decode(foundProjection)
        if err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOneAndUpdate] aggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion))
        }

        b.log.Infof("[UpdateConcurrently] result AggregateID: %s, expectedVersion: %d", foundProjection.AggregateID, expectedVersion)
        return session.CommitTransaction(ctx)
    })
    if err != nil {
        if err := session.AbortTransaction(ctx); err != nil {
            return tracing.TraceWithErr(span, errors.Wrapf(err, "AbortTransaction aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
        }
        return tracing.TraceWithErr(span, errors.Wrapf(err, "mongo.WithSession aggregateID: %s, expectedVersion: %d", aggregateID, expectedVersion))
    }

    return nil
}

func (b *bankAccountMongoRepository) Upsert(ctx context.Context, projection *domain.BankAccountMongoProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.UpdatedAt = time.Now().UTC()

    ops := options.FindOneAndUpdate()
    ops.SetReturnDocument(options.After)
    ops.SetUpsert(true)
    filter := bson.M{constants.MongoAggregateID: projection.AggregateID}

    err := b.bankAccountsCollection().FindOneAndUpdate(ctx, filter, bson.M{"$set": projection}, ops).Decode(projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "Upsert [FindOneAndUpdate] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[Upsert] result AggregateID: %s", projection.AggregateID)
    return nil
}

func (b *bankAccountMongoRepository) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.DeleteByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    filter := bson.M{constants.MongoAggregateID: aggregateID}
    ops := options.Delete()

    result, err := b.bankAccountsCollection().DeleteOne(ctx, filter, ops)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "DeleteByAggregateID [FindOneAndDelete] aggregateID: %s", aggregateID))
    }

    b.log.Debugf("[DeleteByAggregateID] result AggregateID: %s, deleteCount: %d", aggregateID, result.DeletedCount)
    return nil
}

func (b *bankAccountMongoRepository) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.BankAccountMongoProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "bankAccountMongoRepository.GetByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    filter := bson.M{constants.MongoAggregateID: aggregateID}
    var projection domain.BankAccountMongoProjection

    err := b.bankAccountsCollection().FindOne(ctx, filter).Decode(&projection)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "[FindOne] aggregateID: %s", projection.AggregateID))
    }

    b.log.Debugf("[GetByAggregateID] result projection: %+v", projection)
    return &projection, nil
}

func (b *bankAccountMongoRepository) bankAccountsCollection() *mongo.Collection {
    return b.db.Database(b.cfg.Mongo.Db).Collection(b.cfg.MongoCollections.BankAccounts)
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Для реализации репозитория ElasticSearch используется официальная библиотека go-elasticsearch,
Еще одна хорошая библиотека — olivere elastic, но она не поддерживает 8 версию, которая используется в данном проекте.

type elasticRepo struct {
    log    logger.Logger
    cfg    *config.Config
    client *elasticsearch.Client
}

func NewElasticRepository(log logger.Logger, cfg *config.Config, client *elasticsearch.Client) *elasticRepo {
    return &elasticRepo{log: log, cfg: cfg, client: client}
}

func (e *elasticRepo) Index(ctx context.Context, projection *domain.ElasticSearchProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Index")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    response, err := esclient.Index(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Index id: %s", projection.AggregateID))
    }
    defer response.Body.Close()

    if response.IsError() {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Index warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch index result: %s", response.String())
    return nil
}

func (e *elasticRepo) Update(ctx context.Context, projection *domain.ElasticSearchProjection) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Update")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", projection.AggregateID))

    projection.UpdatedAt = time.Now().UTC()

    response, err := esclient.Update(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, projection.AggregateID, projection)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Update id: %s", projection.AggregateID))
    }
    defer response.Body.Close()

    if response.IsError() {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch request err"), "response.IsError id: %s", projection.AggregateID))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Update warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch update result: %s", response.String())
    return nil
}

func (e *elasticRepo) DeleteByAggregateID(ctx context.Context, aggregateID string) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.DeleteByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    response, err := esclient.Delete(ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.Delete id: %s", aggregateID))
    }
    defer response.Body.Close()

    if response.IsError() && response.StatusCode != http.StatusNotFound {
        return tracing.TraceWithErr(span, errors.Wrapf(errors.New("ElasticSearch delete"), "response.IsError aggregateID: %s, status: %s", aggregateID, response.Status()))
    }
    if response.HasWarnings() {
        e.log.Warnf("ElasticSearch Delete warnings: %+v", response.Warnings())
    }

    e.log.Infof("ElasticSearch delete result: %s", response.String())
    return nil
}

func (e *elasticRepo) GetByAggregateID(ctx context.Context, aggregateID string) (*domain.ElasticSearchProjection, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.GetByAggregateID")
    defer span.Finish()
    span.LogFields(log.String("aggregateID", aggregateID))

    response, err := esclient.GetByID[*domain.ElasticSearchProjection](ctx, e.client, e.cfg.ElasticIndexes.BankAccounts, aggregateID)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.GetByID id: %s", aggregateID))
    }

    e.log.Infof("ElasticSearch delete result: %+v", response)
    return response.Source, nil
}

func (e *elasticRepo) Search(ctx context.Context, term string, options esclient.SearchOptions) (*esclient.SearchListResponse[*domain.ElasticSearchProjection], error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepo.Search")
    defer span.Finish()
    span.LogFields(log.String("term", term))

    searchMatchPrefixRequest := esclient.SearchMatchPrefixRequest{
        Index:   []string{e.cfg.ElasticIndexes.BankAccounts},
        Term:    term,
        Size:    options.Size,
        From:    options.From,
        Sort:    []string{"balance.amount"},
        Fields:  options.Fields,
        SortMap: map[string]interface{}{"balance.amount": "asc"},
    }

    if options.Sort != nil {
        searchMatchPrefixRequest.Sort = options.Sort
    }

    response, err := esclient.SearchMultiMatchPrefix[*domain.ElasticSearchProjection](ctx, e.client, searchMatchPrefixRequest)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrapf(err, "esclient.SearchMultiMatchPrefix term: %s", term))
    }

    return response, nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

Более подробную информацию и исходный код полного проекта вы можете найти здесь,
Конечно, в реальных приложениях код бизнес-логики и инфраструктуры намного сложнее, и нам приходится реализовывать гораздо больше необходимых функций.
Я надеюсь, что эта статья будет полезной и нужной, и буду рад любым отзывам и вопросам, обращайтесь ко мне по электронной почте или в любых мессенджерах 🙂

Оцените статью
devanswers.ru
Добавить комментарий