Введение
В Trybe в конце каждого раздела содержания студенты должны разработать проект, чтобы мы могли оценить их обучение. Репозиторий проекта доступен на Github, и они клонируют его, чтобы начать разработку на своих машинах. При каждом коммите человека в репозитории мы запускаем выполнение оценки через Github Actions.
Обработка поставки (фиксация)
После открытия pull request с разработанным кодом, каждый коммит запускает действие на github, которое оценивает код студента и сохраняет в нашей базе данных информацию об оценке кода этого человека.
Коммиты ставятся в очередь, поэтому мы можем контролировать статус их обработки, чтобы убедиться, что все коммиты имеют правильно рассчитанные оценки. В случае сбоев мы автоматически проводим повторную обработку каждые 5 минут.
Обработка фиксации в бэкенде была реализована на основе концепции многоэтапной обработки данных с помощью библиотеки Broadway. Состоит из модуля производителя, который отвечает за поиск в базе данных всех поставок, ожидающих обработки для передачи потребителю. Потребитель — это модуль Broadway, который отвечает за требование оценок производителю, обработку оценок и создание комментария к отзыву на Github.
Поскольку в проектах выполняется много коммитов, первое, что мы пытаемся сделать, это использовать возможности параллельного программирования, доступные в языке Elixir, для одновременной обработки нескольких поставок, чтобы избежать задержки в обратной связи со студентами.
В приведенном ниже коде в конфигурациях Broadway обратите внимание, что в строке 10 был определен модуль producer, а в строках 16 и 17 было определено количество параллельных процессов и объем спроса, который каждый процесс будет запрашивать у производителя.
defmodule Trybe.Consumer do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: Trybe.Producer,
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [
default: [
concurrency: 10,
max_demand: 10
]
]
)
end
def handle_message(_processor, message, _context),
do: Message.update_data(message, &process_data/1)
def transform(event, _opts),
do: %Message{data: event, acknowledger: {__MODULE__, :ack_id, :ack_data}}
def ack(:ack_id, _successful, _failed), do: :ok
defp process_data(delivery) do
# => 1- Calcula a nota e salva no banco
# => 2- Faz request no Github criando o comentário de feedback.
# => 3- Atualiza o status da avaliação para `finished`.
end
end
Что касается производителя, то на основе спроса, определенного в конфигурации Broadway, его роль заключается в поиске в базе данных поставок, ожидающих обработки, и отправке их потребителю. Если производитель не может найти больше ни одной поставки, планируется новый процесс для поиска новых поставок в течение следующих 10 секунд.
defmodule Trybe.Producer do
use GenStage
alias Trybe.Deliveries
def start_link(initial \ []) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(deliveries) do
{:producer, deliveries}
end
# Função que irá enviar as deliveries para o consumidor
def handle_demand(demand, state) when demand > 0 do
send_deliveries(demand, state)
end
# Função que é chamada quando um processo é agendado
# solicitando mais deliveries.
def handle_info({:get_deliveries, demand}, state) do
send_deliveries(demand, state)
end
defp send_deliveries(demand, state) do
deliveries = list_deliveries(demand)
maybe_schedule_next_deliveries(deliveries, demand)
{:noreply, deliveries, state}
end
# Função responsável por buscar novas deliveries
# nos próximos 10 segundos caso a quantidade de deliveries
# no estado seja menor do que o solicitado.
defp maybe_schedule_next_deliveries(deliveries, demand) when length(deliveries) == demand,
do: nil
defp maybe_schedule_next_deliveries(_deliveries, demand) do
Process.send_after(self(), {:get_deliveries, demand}, :timer.seconds(10))
end
# Função responsável por buscar as deliveries no banco
# e mudar o status delas para `processing`.
defp list_deliveries(demand) do
with deliveries when deliveries != [] <- Deliveries.list_deliveries_waiting_process(demand),
{_, deliveries} <- Deliveries.set_processing_status_to_deliveries(deliveries) do
deliveries
end
end
end
Ниже приведен пример поведения Broadway с заданными настройками.
Проблемы с одновременным выполнением запросов к API Github
Сначала все казалось идеальным, но, к сожалению, все работало не так, как мы ожидали, из-за ограничения скорости на Github, что в итоге привело к длительной задержке в доставке отзывов студентам.
В API Github существует два типа ограничения скорости, чтобы обеспечить доступность API для всех. Здесь мы сосредоточимся только на вторичном тарифном лимите, который является нашим главным злодеем, но есть еще и первичный тарифный лимит.
В документации Github есть несколько лучших практик, позволяющих избежать вторичного ограничения скорости, и наши главные злодеи — это:
- Не делайте одновременных запросов на один и тот же токен.
- Запросы, которые создают контент, вызывающий уведомления, например, проблемы, комментарии и запросы на вытягивание, могут быть дополнительно ограничены. Создавайте этот контент в разумном темпе, чтобы избежать дальнейших ограничений.
Умеренные запросы на Github
К счастью для нас, Broadway уже был готов к подобной проблеме и реализовал функцию, позволяющую поставить ногу продюсера на тормоз. Настройка rate_limiting
, с ее помощью вы можете установить количество доставок, которые мы можем обработать за определенный промежуток времени.
defmodule Trybe.Consumer do
use Broadway
......
......
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: Trybe.Producer,
transformer: {__MODULE__, :transform, []},
concurrency: 1,
rate_limiting: [
allowed_messages: 10,
interval: 30_000
]
],
processors: [
default: [
concurrency: 1,
max_demand: 10
]
]
)
end
end
Но только этой настройки оказалось недостаточно, она лишь замедляет количество запросов, которые мы делаем к сервису за временной интервал, но проблема одновременных запросов все равно остается. При этом нам также пришлось изменить настройки процессоров, установив поле concurrency: 1
, чтобы только один процесс получал поставки от производителя, избегая одновременных запросов.
Таким образом, мы смогли делать умеренные и последовательные запросы на сервисе Github без ущерба для времени получения обратной связи для студентов.
Заключение
Реализованное решение позволило нам контролировать вторичный лимит скорости, и таким образом мы смогли эффективно обрабатывать обратную связь с учениками, т.е. с помощью нескольких строк кода мы можем снова перенастроить конфигурации Broadway так, чтобы обработка поставок осуществлялась в большом масштабе параллельно.
Если вам интересно узнать, как устроена инфраструктура, позволяющая Trybe исправлять более 20 млн. оценок в неделю, я также рекомендую прочитать пост, опубликованный Biasi.