Реактивный трубопровод: приложение (часть 2)

Проект «ReactivePipeline» представляет собой простой интерфейс, проект одного класса. Вы можете найти ссылку на репозиторий Github в части. #1 этого поста.

Класс ReactiveContext содержит только статические методы, необходимые для инстанцирования объектов, необходимых для создания персистентного Flux pipeline для всего вашего приложения.
Мы рассмотрим каждый из них в первой части этого документа.

Тем не менее, эти методы / объекты — не единственные, которые необходимы для создания реактивного приложения.
Поэтому во второй части этого README будут представлены все объекты, необходимые для того, чтобы сделать этот API полным и работающим.

Как это может происходить, как это будет работать

Для начала просто подумайте о том, что среди всех операций конкретного процесса мы можем выделить :

  1. Начальные операции, которые не принимают никаких аргументов и, следовательно, не имеют предшественников
  2. Завершающие операции, которые принимают аргументы и не имеют преемников.
  3. Промежуточные операции: они нуждаются в аргументах своих предшественников и производят выходы, которые будут входами их преемников.

Чаще всего мы проектируем и создаем приложения, содержащие методы, которые запускаются последовательно, очень процедурным способом. Это можно представить в виде прямой линии операций обработки: A —> B —> C —> …

Но мы также можем представить операции в виде дерева, в котором методы A и B являются независимыми, поэтому распараллелены, и оба дают результат, которого заслуживает C. В таком случае функция C будет принимать в качестве аргументов result_A и result_B, и нам придется синхронизировать обе операции, чтобы передать их соответствующие результаты в C : C(result_A, result_B).
Или, наоборот, функция A производит результат_A, который будет потребляться распараллеленным способом B и C, как только он будет доступен : B(result_A) // C(result_A)

Чтобы справиться со всеми этими ситуациями, нам нужна гибкая структура данных, в которой данные и, соответственно, функции, производящие эти данные, будут организованы разумно. Для того чтобы это стало возможным, были использованы обёртки.

Инструментарий

Многие объекты, о которых мы будем говорить, являются обертками. Важно понимать, как они взаимодействуют друг с другом.

  1. Наконец, Pipeline является оберткой для набора Task (а также для одной или нескольких WorkGroup, поскольку он будет отправлять все задачи в разные рабочие группы).
  2. Последний объект, о котором следует поговорить, это DataStreamer. Он стоит в стороне от предыдущих объектов, поскольку является не оберткой, а механизмом, используемым для экспорта состояния всех классов Monitorable: Task, WorkGroup и Pipeline все имеют свойство Monitor, которое описывает их текущее состояние (новый, запущен, выполнен, в ошибке). DataStreamer создает Flux, содержащий все состояния всех объектов внутри Pipeline. Каждый раз, когда состояние одного из объектов меняется, запускается тик Notifier в DataStreamer, который в свою очередь запускает новый Flux, который, например, может быть отображен на веб-странице для целей мониторинга. Но это может быть все, что вам нужно. Это просто способ, который я выбрал, чтобы говорить о том, что известно под названием «горячий поток».

Глобальная философия такова:

  1. Мы создаем все необходимые Operationы. Попробуйте представить этот объект как чистую функцию, делающую только одну вещь.
  2. Каждая Operation обернута в объект Task. Чтобы быть инстанцированной, Task должна иметь единственную Operation, а набор всех предыдущих Task, которые произвели Flux, является аргументом для этой Task.
  3. Все задачи в конечном итоге будут использованы как аргументы для Pipeline. Pipeline, благодаря своему Optimizer, создает одну или несколько WorkGroup. Как только это будет сделано, все WorkGroup будут выполняться в параллельных потоках и асинхронно.

Объекты из ReactiveContext

Конвейер

Класс Pipeline представляет собой обертку для набора Задач. При вызове его метода .execute() будут выполнены все Operation от самых первых начальных до конечных.

Вы можете получить Pipeline, используя :

    static Pipeline createPipeline(String pipelineName, Set<Task> allTasks)

    static Pipeline createPipeline(String pipelineName, Set<Task> allTasks, WorkGroupOptimizer optimizer)
Войти в полноэкранный режим Выйти из полноэкранного режима

При использовании второго метода вам придется определить свой собственный Optimizer. Это означает, что вы определяете собственную логику группировки задач в WorkGroup. Чтобы определить свой собственный Оптимизатор, вам нужно реализовать следующий Функциональный интерфейс :

    Collection<WorkGroup> optimize(Set<Task> allTasks)
Вход в полноэкранный режим Выход из полноэкранного режима

Логика существующего по умолчанию Optimizer очень проста и может быть значительно улучшена и оптимизирована (это часть моего списка дел).

  1. Сначала он ищет все завершающие (финальные, конечные) задачи
  2. Затем, он группирует в одну рабочую группу все задачи, вовлеченные в реализацию одной конечной задачи.

Задача

Task является оберткой Operation. Operation определяет логику домена, в то время как Task организует взаимодействие с другими Operation.
Именно поэтому Task принимает в качестве аргумента List<Task>.

    static Task createTask(String taskName, Operation operation, List<Task> predecessors)
Вход в полноэкранный режим Выход из полноэкранного режима

Определяя задачу T, вы делаете следующее:

  1. Ссылаетесь на операцию, которую нужно выполнить
  2. Ссылка на задачу(ы), результирующий(ие) поток(ы) которых будет(ут) использоваться в качестве аргумента для операции задачи T.

DataStreamer

DataStreamer производит горячий поток, потенциально бесконечный Flux.
В целях демонстрации мы использовали его как экспортируемый инструмент мониторинга. Это означает, что вы можете определить REST-контроллер и метод GET, возвращающий Flux<ServerSentEvent>, который будет потребляться веб-приложением.

    static Flux<ServerSentEvent<String>> getAllPipelinesStatesFlux()

    static Flux<ServerSentEvent<String>> getSinglePipelineStatesFlux(Pipeline pipeline)
Вход в полноэкранный режим Выход из полноэкранного режима

В случае если вы определили несколько конвейеров, DataStreamer может создавать поток для всех конвейеров или для одного. Для одного конвейера вы будете использовать второй метод.

Другие полезные объекты

Другие полезные объекты не доступны напрямую через класс ReactiveContext.

Операция

Как уже было сказано, Operation является краеугольным камнем этого API. Этот интерфейс используется для определения логики вашего домена. Поскольку это функциональный интерфейс, вы можете использовать его в качестве лямбда-выражения.

    Flux<?> process(Flux<?>... inputs) throws TaskExecutionException;
Вход в полноэкранный режим Выход из полноэкранного режима

Мы можем рассмотреть несколько неприятных примеров, чтобы показать, как его использовать:

    Operation o1 = inputs -> Flux.range(1,10);
    Operation o2 = inputs -> Flux.range(91,100);
    Operation o3 = inputs -> {  
      Flux<?> int1 = inputs[0];  
      Flux<?> int2 = inputs[1];  
      return Flux.zip(int1, int2, (x, y) -> (int) x + (int) y);  
    };
Вход в полноэкранный режим Выход из полноэкранного режима

Операция o1 произведет Flux<Integer> : 1, 2, 3…. 10
Операция o2 выдает Flux<Integer> : 91, 92, 93… 100
Операция o3 будет использовать каждое отдельное значение из предыдущих Flux, создавая кортежи, которые будут обрабатываться для получения нового результата (это способ использования оператора .zip) :

  • (1, 91) произведет 92
  • (2, 92) произведет 94
  • (3, 93) произведет 96
  • и т.д.

Конечно, это возможно только в том случае, если вы создали необходимые объекты Tasks вокруг ваших операций:

    Task t1 = ReactiveContext.createTask("Integer Flux 1", o1, Collections.emptyList());
    Task t2 = ReactiveContext.createTask("Integer Flux 2", o2, Collections.emptyList());
    Task t3 = ReactiveContext.createTask("Sum t1 t2", o3, List.of(t1, t2));
Войти в полноэкранный режим Выход из полноэкранного режима

Здесь можно сказать многое.

  • Единственный абстрактный метод Operation, process(Flux... inputs), может принимать 0, 1 или N Flux(es) в качестве аргумента. Поэтому лямбда-выражение начинается так : inputs -> ... ; В случае начальной Operation, Operation без предшественников, нет входов для обработки, но мы должны соблюдать сигнатуру метода. В приведенном примере только операция o3 имеет входы для обработки, и это делается путем их получения из массива Fluxes, созданного аргументом varargs.
  • t1 и t2 Tasks обертывают стартовые Operations, поэтому здесь нет предыдущих Tasks для объявления. Но мы все равно должны передать пустую коллекцию в качестве аргумента.
  • У t3 Task действительно есть предшественники, соответственно t1 и t2 Tasks, которые соответственно оборачивают o1 и o2 Operations. В этом случае мы передаем коллекцию, составленную из t1 и t2 Tasks. Эта коллекция представляет собой List, поскольку порядок аргументов имеет значение.

Notifier / StateNotifier

Теоретически вам не придется работать с Notifier (интерфейс) и StateNotifier (реализация), отвечающими за уведомление DataStreamer о любом изменении внутреннего состояния любого Monitorable.
Здесь мы используем паттерн Visitor, чтобы делегировать действие уведомления независимому объекту, знающему DataStreamer и Pipeline, о котором он сообщает.

Monitorable / Monitor

Monitor — это класс, хранящий внутреннее состояние любого объекта Monitorable. Как мы уже видели, объекты Monitorable являются :

  • Pipeline
  • WorkGroup
  • Task

Класс Monitorable — это материнский класс, от которого произошли три вышеперечисленных объекта. Его свойствами являются :

    String name;
    Monitor monitor;
    Notifier notifier;
    Map<Task, Optional<Flux<?>>> inputFluxesMap = Collections.synchronizedMap(new LinkedHashMap<>());
Войти в полноэкранный режим Выход из полноэкранного режима

Это последнее свойство является центральной частью нашей системы. Мы используем его исключительно для управления Tasks, но оно также может быть использовано для WorkGroups с адаптированным Optimizer.
Вы могли заметить, что наши Monitorable сконструированы как составной объект. Таким образом, не только Tasks могут получать Fluxs в качестве аргументов от предшественников, но и WorkGroups. Это все еще незавершенная работа, которая будет решена с помощью небазового Optimizer.

Развлекайтесь.

Lovegiver

Написано с помощью StackEdit.

Оцените статью
devanswers.ru
Добавить комментарий