Уроки, извлеченные из разработки бессерверной реализации выполнения рабочих процессов

В Hash Rekayasa Teknologi мы разрабатываем и используем MocoBaaS, решение Backend-as-a-Service.
Одной из функций для реализации бизнес-логики является Custom Script.

Эта функция хорошо зарекомендовала себя во многих случаях использования.
Однако есть некоторые случаи использования, которые состоят из нескольких шагов. Они могут быть реализованы путем «цепочки» нескольких сценариев, один сценарий запускает другой. Хотя это может помочь в работе, трудно отследить, какие шаги были выполнены.

Представьте, что у нас есть такой сценарий использования, как Marketplace Order:

DISCLAIMER: Это может не соответствовать реальному случаю использования.

  1. Создать заказ
  2. Подтвердить оплату
  3. Подтвердить доставку
  4. Подтвердить завершение

Это можно сделать, определив данный поток:

  1. Скрипт: create-order
    • Срабатывает по: HTTP источнику
    • Триггеры: create-order-success событие
  2. Скрипт: confirm-payment
    • Срабатывает по: Источник события
    • Триггеры: событие confirm-payment-success
  3. Скрипт: confirm-delivery
    • Срабатывает по: Источник события
    • Триггеры: событие confirm-delivery-success
  4. Скрипт: confirm-completed
    • Срабатывает по: Источник события

В приведенном выше потоке сценарии были выполнены как есть. Нет централизованного механизма отслеживания выполненных шагов, правильно ли они были выполнены или нет.

Бессерверный рабочий процесс на помощь

Среди существующих языков управления потоками работ мы выбрали Serverless Workflow. Это нейтральная к поставщикам, открытая экосистема рабочих процессов с открытым исходным кодом, управляемая сообществом.
Определение рабочего процесса может быть записано в формате JSON или YAML.
Кроме того, доступны SDK на различных языках программирования, таких как Java, Go, TypeScript, .NET, Python.

Приведенный выше пример использования Marketplace Order может быть определен следующим образом:

id: marketplaceorder
version: "1.0"
specVersion: "0.7"
name: Marketplace Order Workflow
description: Create and process orders on the marketplace.
start: CreateOrder
functions:
  - name: createOrderFunction
    operation: mocobaas://marketplace-order#create-order
  - name: confirmPaymentFunction
    operation: mocobaas://marketplace-order#confirm-payment
  - name: confirmDeliveryFunction
    operation: mocobaas://marketplace-order#confirm-delivery
  - name: confirmCompletedFunction
    operation: mocobaas://marketplace-order#confirm-completed
states:
  - name: CreateOrder
    type: operation
    actions:
      - functionRef: createOrderFunction
    transition: ConfirmPayment
  - name: ConfirmPayment
    type: operation
    actions:
      - functionRef: confirmPaymentFunction
    transition: ConfirmDelivery
  - name: ConfirmDelivery
    type: operation
    actions:
      - functionRef: confirmDeliveryFunction
    transition: ConfirmCompleted
  - name: ConfirmCompleted
    type: operation
    actions:
      - functionRef: confirmCompletedFunction
    end: true
Войти в полноэкранный режим Выход из полноэкранного режима

А это визуализация диаграммы:


Если вы новичок в Serverless Workflow, или в workflow в целом, у вас может возникнуть много вопросов по этому поводу 😁.

Я рекомендую вам посмотреть эту презентацию:

А затем прочитать официальные примеры и спецификацию Serverless Workflow:

  • Версия 0.7: примеры, спецификация.
  • Версия 0.8: примеры, спецификация.

Позвольте мне продолжить рассказ…

Нам нужно создать реализацию среды выполнения, которая будет выполнять рабочие процессы на основе определений.
Golang стал важной частью нашего стека в Hash Rekayasa Teknologi. Поэтому мы просто выбрали Go SDK для Serverless Workflow. Хотя я не пробовал другие SDK, я уверен, что не должно быть большой разницы с тем, что я использую здесь.
Самый важный вопрос с SDK: Что он делает и чего не делает?

Он делает:

  • Разбирает JSON и YAML определения рабочих процессов.
  • Одно определение рабочего процесса имеет иерархическую структуру. Каждое определение от верхнего уровня до подуровней будет представлено в виде модели, такой как Workflow, State, Action, Function, Retry.

Это не так:

  • Не существует представления экземпляра рабочего процесса. Для выполнения вы должны сами определить уникальный идентификатор.
  • Значения длительности в формате ISO 8601 duration не анализируются.
  • Выражения рабочего процесса в формате jq не анализируются.

С этими ограничениями, кажется, не так уж много можно сделать с помощью SDK. Просто разберите определение рабочего процесса и используйте иерархическую структуру в качестве руководства для выполнения.

package sw

import (
    "errors"
    "os"
    "path/filepath"

    "github.com/google/uuid"
    "github.com/serverlessworkflow/sdk-go/v2/model"
    "github.com/serverlessworkflow/sdk-go/v2/parser"
)

type StartWorkflowResult struct {
    InstanceID string `json:"instanceId"`
}

var workflows map[string]*model.Workflow

func LoadWorkflows() error {
    const definitionsDir = "definitions"

    dirEntries, err := os.ReadDir(definitionsDir)
    if err != nil {
        return err
    }

    workflows = make(map[string]*model.Workflow)

    for _, entry := range dirEntries {
        name := entry.Name()
        path := filepath.Join(definitionsDir, name)
        wf, err := parser.FromFile(path)
        if err != nil {
            return err
        }

        workflows[name] = wf
    }

    return nil
}

func StartWorkflow(name string, input map[string]interface{}) (*StartWorkflowResult, error) {
    wf, ok := workflows[name]
    if !ok {
        return nil, errors.New("Workflow not found: " + name)
    }

    instanceID := uuid.NewString()

    // Start a new instance.
    // Parameters: instanceID, wf, input

    return &StartWorkflowResult{instanceID}, nil
}
Вход в полноэкранный режим Выход из полноэкранного режима

Здесь мы храним модели рабочих процессов в карте, поэтому функцию LoadWorkflows() нужно вызвать только один раз.
А затем функция StartWorkflow() будет вызываться при каждом выполнении.

Делайте заметки для реализованных функций

Мы можем не реализовать все функции в спецификации. Одно, что мы можем сделать, это задокументировать их. Каждая функция будет иметь статус:

  • реализована согласно спецификации 🟢🟢🟢
  • реализована, но не по спецификации или с использованием собственного стандарта 🟢🔴
  • не реализован/еще не реализован 🔴

Я делал заметки в электронной таблице. Вы можете посмотреть ее здесь.
Я использую свой родной язык, Bahasa Indonesia.
И он не полный. Я записываю определение только тогда, когда начинаю его реализовывать.

Давайте посмотрим один пример, определение функции:

  • Как мы знаем, здесь определяется вызов сервиса.
  • Время выполнения рабочего процесса написано на Go, а сценарии — на JavaScript (Node.js).
  • MocoBaaS уже имеет внутренний механизм RPC, поэтому мы хотим использовать «пользовательский» тип.
  • В спецификации v0.8 есть тип «custom». Но на момент написания этой статьи Go SDK поддерживает только spec v0.7.

Как видите, мы старались придерживаться спецификации настолько, насколько это было возможно. Но иногда нам приходится использовать собственные стандарты.

Выполнение рабочего процесса

Рабочий процесс Marketplace Order Workflow имеет линейный поток, от создания заказа до подтверждения его выполнения. Это структура директории, содержащая определение рабочего процесса и скрипты:

.
└── marketplace-order
    ├── definition.sw.yaml
    └── scripts
        ├── confirm-completed.js
        ├── confirm-delivery.js
        ├── confirm-payment.js
        └── create-order.js
Вход в полноэкранный режим Выход из полноэкранного режима

Конечным результатом будет JSON следующего вида:

{
  "createOrder": true,
  "confirmPayment": true,
  "confirmDelivery": true,
  "confirmCompleted": true
}
Вход в полноэкранный режим Выйти из полноэкранного режима

При выполнении рабочего процесса, начиная с create-order.js, данные представляют собой новый объект:

module.exports = async (ctx) => {
  return {
    data: { createOrder: true },
  };
};
Войти в полноэкранный режим Выход из полноэкранного режима

Далее confirm-payment.js расширяет данные из предыдущего состояния:

module.exports = async (ctx) => {
  return {
    data: { ...ctx.data, confirmPayment: true },
  };
};
Вход в полноэкранный режим Выход из полноэкранного режима

И так далее.

Отслеживание выполнения рабочего процесса

Как написано в спецификации:
В зависимости от определения рабочего процесса, экземпляры рабочего процесса могут быть кратковременными или выполняться в течение дней, недель или лет.

Нет никаких рекомендаций по хранению информации об отслеживании. Можно использовать любую базу данных.
Нам необходимо учесть эти требования:

  • Один экземпляр может иметь более одного состояния.
  • Входные данные состояния обычно являются выходными данными предыдущего состояния.
  • Если состояние является начальным состоянием рабочего процесса, то его входные данные — это входные данные рабочего процесса.
  • Когда выполнение рабочего процесса заканчивается, выходом данных последнего выполненного состояния становится выход данных рабочего процесса.

Например, у нас есть две таблицы:

  • экземпляры
  • экземпляры_состояний

Выполнение Marketplace Order Workflow может быть сохранено следующим образом:

Повторные действия

Если состояние возвращает ошибку, мы можем оставить его в качестве окончательного результата или определить политику повторных действий.
Например, у нас есть рабочий процесс «Шанс на успех».

Структура каталога:

.
└── chance-of-success
    ├── definition.sw.yaml
    └── scripts
        └── chance.js
Вход в полноэкранный режим Выход из полноэкранного режима

chance.js рандомизирует булево значение. Если true, возвращает данные. Если false, возвращает ошибку:

const chance = require("chance").Chance();

module.exports = async (ctx) => {
  const isTrue = chance.bool({ likelihood: ctx.data.likelihood });

  if (!isTrue) {
    return {
      error: { message: "failed" },
    };
  }

  return {
    data: { message: "success" },
  };
};
Войти в полноэкранный режим Выход из полноэкранного режима

А определение рабочего процесса содержит определение повторной попытки:

id: chanceofsuccess
version: "1.0"
specVersion: "0.7"
name: Chance of Success Workflow
description: Try your chance of success. Retry if failed.
start: TakeAChance
functions:
  - name: chanceFunction
    operation: mocobaas://chance-of-success#chance
retries:
  - name: chanceRetryStrategy
    delay: PT10S
    maxAttempts: 3
states:
  - name: TakeAChance
    type: operation
    actions:
      - functionRef: chanceFunction
        retryRef: chanceRetryStrategy
    end: true
Войти в полноэкранный режим Выйти из полноэкранного режима

С этим определением повторной попытки рабочий процесс выполнит этот механизм:

  • Максимальное количество попыток — 3 раза.
  • Задержка между повторными попытками составляет 10 секунд.
  • Если данные получены до достижения maxAttempts, повторных попыток больше не будет.
  • Если maxAttempts достигнуто, то повторных попыток больше не будет, независимо от результата.

Прежде чем мы сможем использовать длительность задержки, ее необходимо разобрать. Например, я использую sosodev/duration, и это хорошо работает.

Визуализация диаграммы

Генерирование визуализации диаграммы из определения рабочего процесса действительно полезно, особенно когда у вас сложные рабочие процессы.
Один из способов — использовать веб-редактор на официальном сайте. Он может генерировать диаграмму из JSON или YAML, но линтер в текстовом редакторе всегда будет ожидать JSON.

Для пользователей VS Code есть официальное расширение, но на момент написания этой статьи оно устарело, поддерживает только spec v0.6.
Лучшей альтернативой является расширение от Red Hat. Оно поддерживает spec v0.8. Оно также хорошо работает со спецификацией v0.7. Единственное требование — вы должны назвать файлы определений *.sw.json, *.sw.yaml или *.sw.yml.

Предостережение:
Похоже, что эти инструменты используют один и тот же генератор, поскольку они создают одну и ту же визуализацию диаграммы. Я заметил, что они могут визуализировать только поток, но не включают другие детали, такие как функции или повторные попытки.

Заключительные размышления

Рабочий поток — это довольно большая функция. И как вы можете видеть, Serverless Workflow предлагает большую гибкость между стандартными и настраиваемыми функциями. Но если вам нужно больше тренировок в использовании системы рабочих процессов, возможно, есть лучшие решения.

Мы еще не реализовали большинство функций Serverless Workflow.
Например, выражения рабочего процесса, о которых я говорил выше. Использование такой библиотеки, как itchyny/gojq, выглядит многообещающе, хотя я еще не пробовал.
Но, по крайней мере, этих небольших усилий достаточно для минимально функционирующей системы.

Что ж, надеюсь, вам понравилась эта статья и вы нашли ее полезной 😉

Оцените статью
devanswers.ru
Добавить комментарий