В Hash Rekayasa Teknologi мы разрабатываем и используем MocoBaaS, решение Backend-as-a-Service.
Одной из функций для реализации бизнес-логики является Custom Script.
Эта функция хорошо зарекомендовала себя во многих случаях использования.
Однако есть некоторые случаи использования, которые состоят из нескольких шагов. Они могут быть реализованы путем «цепочки» нескольких сценариев, один сценарий запускает другой. Хотя это может помочь в работе, трудно отследить, какие шаги были выполнены.
Представьте, что у нас есть такой сценарий использования, как Marketplace Order:
DISCLAIMER: Это может не соответствовать реальному случаю использования.
- Создать заказ
- Подтвердить оплату
- Подтвердить доставку
- Подтвердить завершение
Это можно сделать, определив данный поток:
- Скрипт:
create-order
- Срабатывает по: HTTP источнику
- Триггеры:
create-order-success
событие
- Скрипт:
confirm-payment
- Срабатывает по: Источник события
- Триггеры: событие
confirm-payment-success
- Скрипт:
confirm-delivery
- Срабатывает по: Источник события
- Триггеры: событие
confirm-delivery-success
- Скрипт:
confirm-completed
- Срабатывает по: Источник события
В приведенном выше потоке сценарии были выполнены как есть. Нет централизованного механизма отслеживания выполненных шагов, правильно ли они были выполнены или нет.
Бессерверный рабочий процесс на помощь
Среди существующих языков управления потоками работ мы выбрали Serverless Workflow. Это нейтральная к поставщикам, открытая экосистема рабочих процессов с открытым исходным кодом, управляемая сообществом.
Определение рабочего процесса может быть записано в формате JSON или YAML.
Кроме того, доступны SDK на различных языках программирования, таких как Java, Go, TypeScript, .NET, Python.
Приведенный выше пример использования Marketplace Order может быть определен следующим образом:
id: marketplaceorder
version: "1.0"
specVersion: "0.7"
name: Marketplace Order Workflow
description: Create and process orders on the marketplace.
start: CreateOrder
functions:
- name: createOrderFunction
operation: mocobaas://marketplace-order#create-order
- name: confirmPaymentFunction
operation: mocobaas://marketplace-order#confirm-payment
- name: confirmDeliveryFunction
operation: mocobaas://marketplace-order#confirm-delivery
- name: confirmCompletedFunction
operation: mocobaas://marketplace-order#confirm-completed
states:
- name: CreateOrder
type: operation
actions:
- functionRef: createOrderFunction
transition: ConfirmPayment
- name: ConfirmPayment
type: operation
actions:
- functionRef: confirmPaymentFunction
transition: ConfirmDelivery
- name: ConfirmDelivery
type: operation
actions:
- functionRef: confirmDeliveryFunction
transition: ConfirmCompleted
- name: ConfirmCompleted
type: operation
actions:
- functionRef: confirmCompletedFunction
end: true
А это визуализация диаграммы:
Если вы новичок в Serverless Workflow, или в workflow в целом, у вас может возникнуть много вопросов по этому поводу 😁.
Я рекомендую вам посмотреть эту презентацию:
А затем прочитать официальные примеры и спецификацию Serverless Workflow:
- Версия 0.7: примеры, спецификация.
- Версия 0.8: примеры, спецификация.
Позвольте мне продолжить рассказ…
Нам нужно создать реализацию среды выполнения, которая будет выполнять рабочие процессы на основе определений.
Golang стал важной частью нашего стека в Hash Rekayasa Teknologi. Поэтому мы просто выбрали Go SDK для Serverless Workflow. Хотя я не пробовал другие SDK, я уверен, что не должно быть большой разницы с тем, что я использую здесь.
Самый важный вопрос с SDK: Что он делает и чего не делает?
Он делает:
- Разбирает JSON и YAML определения рабочих процессов.
- Одно определение рабочего процесса имеет иерархическую структуру. Каждое определение от верхнего уровня до подуровней будет представлено в виде модели, такой как Workflow, State, Action, Function, Retry.
Это не так:
- Не существует представления экземпляра рабочего процесса. Для выполнения вы должны сами определить уникальный идентификатор.
- Значения длительности в формате ISO 8601 duration не анализируются.
- Выражения рабочего процесса в формате jq не анализируются.
С этими ограничениями, кажется, не так уж много можно сделать с помощью SDK. Просто разберите определение рабочего процесса и используйте иерархическую структуру в качестве руководства для выполнения.
package sw
import (
"errors"
"os"
"path/filepath"
"github.com/google/uuid"
"github.com/serverlessworkflow/sdk-go/v2/model"
"github.com/serverlessworkflow/sdk-go/v2/parser"
)
type StartWorkflowResult struct {
InstanceID string `json:"instanceId"`
}
var workflows map[string]*model.Workflow
func LoadWorkflows() error {
const definitionsDir = "definitions"
dirEntries, err := os.ReadDir(definitionsDir)
if err != nil {
return err
}
workflows = make(map[string]*model.Workflow)
for _, entry := range dirEntries {
name := entry.Name()
path := filepath.Join(definitionsDir, name)
wf, err := parser.FromFile(path)
if err != nil {
return err
}
workflows[name] = wf
}
return nil
}
func StartWorkflow(name string, input map[string]interface{}) (*StartWorkflowResult, error) {
wf, ok := workflows[name]
if !ok {
return nil, errors.New("Workflow not found: " + name)
}
instanceID := uuid.NewString()
// Start a new instance.
// Parameters: instanceID, wf, input
return &StartWorkflowResult{instanceID}, nil
}
Здесь мы храним модели рабочих процессов в карте, поэтому функцию LoadWorkflows()
нужно вызвать только один раз.
А затем функция StartWorkflow()
будет вызываться при каждом выполнении.
Делайте заметки для реализованных функций
Мы можем не реализовать все функции в спецификации. Одно, что мы можем сделать, это задокументировать их. Каждая функция будет иметь статус:
- реализована согласно спецификации 🟢🟢🟢
- реализована, но не по спецификации или с использованием собственного стандарта 🟢🔴
- не реализован/еще не реализован 🔴
Я делал заметки в электронной таблице. Вы можете посмотреть ее здесь.
Я использую свой родной язык, Bahasa Indonesia.
И он не полный. Я записываю определение только тогда, когда начинаю его реализовывать.
Давайте посмотрим один пример, определение функции:
- Как мы знаем, здесь определяется вызов сервиса.
- Время выполнения рабочего процесса написано на Go, а сценарии — на JavaScript (Node.js).
- MocoBaaS уже имеет внутренний механизм RPC, поэтому мы хотим использовать «пользовательский» тип.
- В спецификации v0.8 есть тип «custom». Но на момент написания этой статьи Go SDK поддерживает только spec v0.7.
Как видите, мы старались придерживаться спецификации настолько, насколько это было возможно. Но иногда нам приходится использовать собственные стандарты.
Выполнение рабочего процесса
Рабочий процесс Marketplace Order Workflow имеет линейный поток, от создания заказа до подтверждения его выполнения. Это структура директории, содержащая определение рабочего процесса и скрипты:
.
└── marketplace-order
├── definition.sw.yaml
└── scripts
├── confirm-completed.js
├── confirm-delivery.js
├── confirm-payment.js
└── create-order.js
Конечным результатом будет JSON следующего вида:
{
"createOrder": true,
"confirmPayment": true,
"confirmDelivery": true,
"confirmCompleted": true
}
При выполнении рабочего процесса, начиная с create-order.js
, данные представляют собой новый объект:
module.exports = async (ctx) => {
return {
data: { createOrder: true },
};
};
Далее confirm-payment.js
расширяет данные из предыдущего состояния:
module.exports = async (ctx) => {
return {
data: { ...ctx.data, confirmPayment: true },
};
};
И так далее.
Отслеживание выполнения рабочего процесса
Как написано в спецификации:
В зависимости от определения рабочего процесса, экземпляры рабочего процесса могут быть кратковременными или выполняться в течение дней, недель или лет.
Нет никаких рекомендаций по хранению информации об отслеживании. Можно использовать любую базу данных.
Нам необходимо учесть эти требования:
- Один экземпляр может иметь более одного состояния.
- Входные данные состояния обычно являются выходными данными предыдущего состояния.
- Если состояние является начальным состоянием рабочего процесса, то его входные данные — это входные данные рабочего процесса.
- Когда выполнение рабочего процесса заканчивается, выходом данных последнего выполненного состояния становится выход данных рабочего процесса.
Например, у нас есть две таблицы:
- экземпляры
- экземпляры_состояний
Выполнение Marketplace Order Workflow может быть сохранено следующим образом:
Повторные действия
Если состояние возвращает ошибку, мы можем оставить его в качестве окончательного результата или определить политику повторных действий.
Например, у нас есть рабочий процесс «Шанс на успех».
Структура каталога:
.
└── chance-of-success
├── definition.sw.yaml
└── scripts
└── chance.js
chance.js
рандомизирует булево значение. Если true, возвращает данные. Если false, возвращает ошибку:
const chance = require("chance").Chance();
module.exports = async (ctx) => {
const isTrue = chance.bool({ likelihood: ctx.data.likelihood });
if (!isTrue) {
return {
error: { message: "failed" },
};
}
return {
data: { message: "success" },
};
};
А определение рабочего процесса содержит определение повторной попытки:
id: chanceofsuccess
version: "1.0"
specVersion: "0.7"
name: Chance of Success Workflow
description: Try your chance of success. Retry if failed.
start: TakeAChance
functions:
- name: chanceFunction
operation: mocobaas://chance-of-success#chance
retries:
- name: chanceRetryStrategy
delay: PT10S
maxAttempts: 3
states:
- name: TakeAChance
type: operation
actions:
- functionRef: chanceFunction
retryRef: chanceRetryStrategy
end: true
С этим определением повторной попытки рабочий процесс выполнит этот механизм:
- Максимальное количество попыток — 3 раза.
- Задержка между повторными попытками составляет 10 секунд.
- Если данные получены до достижения maxAttempts, повторных попыток больше не будет.
- Если maxAttempts достигнуто, то повторных попыток больше не будет, независимо от результата.
Прежде чем мы сможем использовать длительность задержки, ее необходимо разобрать. Например, я использую sosodev/duration, и это хорошо работает.
Визуализация диаграммы
Генерирование визуализации диаграммы из определения рабочего процесса действительно полезно, особенно когда у вас сложные рабочие процессы.
Один из способов — использовать веб-редактор на официальном сайте. Он может генерировать диаграмму из JSON или YAML, но линтер в текстовом редакторе всегда будет ожидать JSON.
Для пользователей VS Code есть официальное расширение, но на момент написания этой статьи оно устарело, поддерживает только spec v0.6.
Лучшей альтернативой является расширение от Red Hat. Оно поддерживает spec v0.8. Оно также хорошо работает со спецификацией v0.7. Единственное требование — вы должны назвать файлы определений *.sw.json
, *.sw.yaml
или *.sw.yml
.
Предостережение:
Похоже, что эти инструменты используют один и тот же генератор, поскольку они создают одну и ту же визуализацию диаграммы. Я заметил, что они могут визуализировать только поток, но не включают другие детали, такие как функции или повторные попытки.
Заключительные размышления
Рабочий поток — это довольно большая функция. И как вы можете видеть, Serverless Workflow предлагает большую гибкость между стандартными и настраиваемыми функциями. Но если вам нужно больше тренировок в использовании системы рабочих процессов, возможно, есть лучшие решения.
Мы еще не реализовали большинство функций Serverless Workflow.
Например, выражения рабочего процесса, о которых я говорил выше. Использование такой библиотеки, как itchyny/gojq, выглядит многообещающе, хотя я еще не пробовал.
Но, по крайней мере, этих небольших усилий достаточно для минимально функционирующей системы.
Что ж, надеюсь, вам понравилась эта статья и вы нашли ее полезной 😉