Проект «ReactivePipeline» представляет собой простой интерфейс, проект одного класса. Вы можете найти ссылку на репозиторий Github в части. #1 этого поста.
Класс ReactiveContext
содержит только статические методы, необходимые для инстанцирования объектов, необходимых для создания персистентного Flux
pipeline для всего вашего приложения.
Мы рассмотрим каждый из них в первой части этого документа.
Тем не менее, эти методы / объекты — не единственные, которые необходимы для создания реактивного приложения.
Поэтому во второй части этого README будут представлены все объекты, необходимые для того, чтобы сделать этот API полным и работающим.
Как это может происходить, как это будет работать
Для начала просто подумайте о том, что среди всех операций конкретного процесса мы можем выделить :
- Начальные операции, которые не принимают никаких аргументов и, следовательно, не имеют предшественников
- Завершающие операции, которые принимают аргументы и не имеют преемников.
- Промежуточные операции: они нуждаются в аргументах своих предшественников и производят выходы, которые будут входами их преемников.
Чаще всего мы проектируем и создаем приложения, содержащие методы, которые запускаются последовательно, очень процедурным способом. Это можно представить в виде прямой линии операций обработки: A —> B —> C —> …
Но мы также можем представить операции в виде дерева, в котором методы A и B являются независимыми, поэтому распараллелены, и оба дают результат, которого заслуживает C. В таком случае функция C будет принимать в качестве аргументов result_A и result_B, и нам придется синхронизировать обе операции, чтобы передать их соответствующие результаты в C : C(result_A, result_B).
Или, наоборот, функция A производит результат_A, который будет потребляться распараллеленным способом B и C, как только он будет доступен : B(result_A) // C(result_A)
Чтобы справиться со всеми этими ситуациями, нам нужна гибкая структура данных, в которой данные и, соответственно, функции, производящие эти данные, будут организованы разумно. Для того чтобы это стало возможным, были использованы обёртки.
Инструментарий
Многие объекты, о которых мы будем говорить, являются обертками. Важно понимать, как они взаимодействуют друг с другом.
- Наконец,
Pipeline
является оберткой для набораTask
(а также для одной или несколькихWorkGroup
, поскольку он будет отправлять все задачи в разные рабочие группы). - Последний объект, о котором следует поговорить, это
DataStreamer
. Он стоит в стороне от предыдущих объектов, поскольку является не оберткой, а механизмом, используемым для экспорта состояния всех классовMonitorable
:Task
,WorkGroup
иPipeline
все имеют свойствоMonitor
, которое описывает их текущее состояние (новый, запущен, выполнен, в ошибке).DataStreamer
создаетFlux
, содержащий все состояния всех объектов внутриPipeline
. Каждый раз, когда состояние одного из объектов меняется, запускается тикNotifier
вDataStreamer
, который в свою очередь запускает новыйFlux
, который, например, может быть отображен на веб-странице для целей мониторинга. Но это может быть все, что вам нужно. Это просто способ, который я выбрал, чтобы говорить о том, что известно под названием «горячий поток».
Глобальная философия такова:
- Мы создаем все необходимые
Operation
ы. Попробуйте представить этот объект как чистую функцию, делающую только одну вещь. - Каждая
Operation
обернута в объектTask
. Чтобы быть инстанцированной,Task
должна иметь единственнуюOperation
, а набор всех предыдущихTask
, которые произвелиFlux
, является аргументом для этойTask
. - Все
задачи
в конечном итоге будут использованы как аргументы дляPipeline
.Pipeline
, благодаря своемуOptimizer
, создает одну или несколькоWorkGroup
. Как только это будет сделано, всеWorkGroup
будут выполняться в параллельных потоках и асинхронно.
Объекты из ReactiveContext
Конвейер
Класс Pipeline представляет собой обертку для набора Задач. При вызове его метода .execute()
будут выполнены все Operation
от самых первых начальных до конечных.
Вы можете получить Pipeline
, используя :
static Pipeline createPipeline(String pipelineName, Set<Task> allTasks)
static Pipeline createPipeline(String pipelineName, Set<Task> allTasks, WorkGroupOptimizer optimizer)
При использовании второго метода вам придется определить свой собственный Optimizer
. Это означает, что вы определяете собственную логику группировки задач в WorkGroup
. Чтобы определить свой собственный Оптимизатор
, вам нужно реализовать следующий Функциональный интерфейс
:
Collection<WorkGroup> optimize(Set<Task> allTasks)
Логика существующего по умолчанию Optimizer
очень проста и может быть значительно улучшена и оптимизирована (это часть моего списка дел).
- Сначала он ищет все завершающие (финальные, конечные) задачи
- Затем, он группирует в одну рабочую группу все задачи, вовлеченные в реализацию одной конечной задачи.
Задача
Task
является оберткой Operation
. Operation
определяет логику домена, в то время как Task
организует взаимодействие с другими Operation
.
Именно поэтому Task
принимает в качестве аргумента List<Task>
.
static Task createTask(String taskName, Operation operation, List<Task> predecessors)
Определяя задачу T, вы делаете следующее:
- Ссылаетесь на операцию, которую нужно выполнить
- Ссылка на задачу(ы), результирующий(ие) поток(ы) которых будет(ут) использоваться в качестве аргумента для операции задачи T.
DataStreamer
DataStreamer
производит горячий поток, потенциально бесконечный Flux
.
В целях демонстрации мы использовали его как экспортируемый инструмент мониторинга. Это означает, что вы можете определить REST-контроллер и метод GET, возвращающий Flux<ServerSentEvent>
, который будет потребляться веб-приложением.
static Flux<ServerSentEvent<String>> getAllPipelinesStatesFlux()
static Flux<ServerSentEvent<String>> getSinglePipelineStatesFlux(Pipeline pipeline)
В случае если вы определили несколько конвейеров, DataStreamer может создавать поток для всех конвейеров или для одного. Для одного конвейера вы будете использовать второй метод.
Другие полезные объекты
Другие полезные объекты не доступны напрямую через класс ReactiveContext
.
Операция
Как уже было сказано, Operation
является краеугольным камнем этого API. Этот интерфейс используется для определения логики вашего домена. Поскольку это функциональный интерфейс
, вы можете использовать его в качестве лямбда-выражения.
Flux<?> process(Flux<?>... inputs) throws TaskExecutionException;
Мы можем рассмотреть несколько неприятных примеров, чтобы показать, как его использовать:
Operation o1 = inputs -> Flux.range(1,10);
Operation o2 = inputs -> Flux.range(91,100);
Operation o3 = inputs -> {
Flux<?> int1 = inputs[0];
Flux<?> int2 = inputs[1];
return Flux.zip(int1, int2, (x, y) -> (int) x + (int) y);
};
Операция o1 произведет Flux<Integer>
: 1, 2, 3…. 10
Операция o2 выдает Flux<Integer>
: 91, 92, 93… 100
Операция o3 будет использовать каждое отдельное значение из предыдущих Flux
, создавая кортежи, которые будут обрабатываться для получения нового результата (это способ использования оператора .zip
) :
- (1, 91) произведет 92
- (2, 92) произведет 94
- (3, 93) произведет 96
- и т.д.
Конечно, это возможно только в том случае, если вы создали необходимые объекты Tasks вокруг ваших операций:
Task t1 = ReactiveContext.createTask("Integer Flux 1", o1, Collections.emptyList());
Task t2 = ReactiveContext.createTask("Integer Flux 2", o2, Collections.emptyList());
Task t3 = ReactiveContext.createTask("Sum t1 t2", o3, List.of(t1, t2));
Здесь можно сказать многое.
- Единственный абстрактный метод
Operation
,process(Flux... inputs)
, может принимать 0, 1 или NFlux
(es) в качестве аргумента. Поэтому лямбда-выражение начинается так :inputs -> ... ;
В случае начальнойOperation
,Operation
без предшественников, нет входов для обработки, но мы должны соблюдать сигнатуру метода. В приведенном примере только операция o3 имеет входы для обработки, и это делается путем их получения из массиваFlux
es, созданного аргументом varargs. - t1 и t2
Task
s обертывают стартовыеOperation
s, поэтому здесь нет предыдущихTask
s для объявления. Но мы все равно должны передать пустую коллекцию в качестве аргумента. - У t3
Task
действительно есть предшественники, соответственно t1 и t2Task
s, которые соответственно оборачивают o1 и o2Operation
s. В этом случае мы передаем коллекцию, составленную из t1 и t2Task
s. Эта коллекция представляет собойList
, поскольку порядок аргументов имеет значение.
Notifier / StateNotifier
Теоретически вам не придется работать с Notifier
(интерфейс) и StateNotifier
(реализация), отвечающими за уведомление DataStreamer
о любом изменении внутреннего состояния любого Monitorable
.
Здесь мы используем паттерн Visitor, чтобы делегировать действие уведомления независимому объекту, знающему DataStreamer
и Pipeline
, о котором он сообщает.
Monitorable / Monitor
Monitor
— это класс, хранящий внутреннее состояние любого объекта Monitorable
. Как мы уже видели, объекты Monitorable
являются :
Pipeline
WorkGroup
Task
Класс Monitorable
— это материнский класс, от которого произошли три вышеперечисленных объекта. Его свойствами являются :
String name;
Monitor monitor;
Notifier notifier;
Map<Task, Optional<Flux<?>>> inputFluxesMap = Collections.synchronizedMap(new LinkedHashMap<>());
Это последнее свойство является центральной частью нашей системы. Мы используем его исключительно для управления Task
s, но оно также может быть использовано для WorkGroup
s с адаптированным Optimizer
.
Вы могли заметить, что наши Monitorable
сконструированы как составной объект. Таким образом, не только Task
s могут получать Flux
s в качестве аргументов от предшественников, но и WorkGroup
s. Это все еще незавершенная работа, которая будет решена с помощью небазового Optimizer
.
Развлекайтесь.
Lovegiver
Написано с помощью StackEdit.