Когда речь идет об обработке заданий, время — это все. Выполнение заданий в фоновом режиме помогает нам снять нагрузку с веб-серверов, обрабатывающих запросы наших клиентов. Однако мы также хотим, чтобы фоновые задания выполнялись в разумные сроки для наших клиентов. Но что делать, если клиент добавил так много фоновых заданий, что они использовали все ресурсы фоновых рабочих?
Задания будут обрабатываться доступными работниками в порядке их поступления, что отлично подходит для клиента, который добавил эти задания. Но, к сожалению, для других учетных записей их сегменты не будут обновляться до тех пор, пока все эти задания не будут завершены. Это может стать проблемой для таких аккаунтов — будет казаться, что система не работает должным образом, поскольку она ждет, пока очередь освободится.
Мы решили эту проблему в Aha!, создав модуль, который:
- собирает отдельные задания вместе
- ограничивает время выполнения каждого отдельного задания
- ограничивает параллельную обработку
Когда задание использует этот новый модуль, оно становится устойчивым к этой проблеме по умолчанию.
Давайте рассмотрим это на примере, чтобы показать, как предотвратить это замедление для всех счетов. Допустим, у нас есть Accounts
с различными Segments
(т.е. «группами»). Каждый раз, когда мы обновляем Сегмент
, мы хотим обновить этот сегмент. Этот процесс может занять немного времени, поэтому мы поместим его в фоновое задание. В итоге мы получим код контроллера, который будет выглядеть следующим образом:
class SegementsController < ApplicationController
def update
segment = current_account.segments.find(params[:id])
segment.update!(segment_params)
SegmentRefresher.perform_later(segment)
head :ok
end
end
Обычно полезно обрабатывать то, что можно сделать немедленно, и откладывать более дорогостоящую работу. Это позволяет нам немедленно отвечать нашим клиентам и при этом выполнять работу, которую необходимо сделать. Однако клиент может поместить тысячи заданий SegmentRefresher
в нашу очередь фоновых заданий и помешать выполнению заданий других клиентов.
Пакетирование отдельных заданий вместе
Обычно, когда фоновое задание ставится в очередь через ActiveJob
, параметры задания передаются в качестве аргументов в perform_later
. Это не совсем то, что нам нужно для пакетной обработки заданий. Вместо этого мы создаем новый метод perform_batch_later
, который помещает аргументы в хранилище данных, например Redis, из которого задание может получить их позже.
Итак, ранее код задания мог выглядеть следующим образом:
class SegmentRefresher < ApplicationJob
def perform(segment)
# Refresh the segment
end
end
Теперь мы имеем что-то похожее на следующее:
module BatchByAccount
extend ActiveSupport::Concern
class_methods do
# Push the data into Redis and then enqueue the job
def perform_batch_later(data)
data.each_slice(100) do |slice|
Redis.current.rpush(data_key, slice.map(&:id))
end
perform_later
end
def self.data_key
"SegmentRefresher:#{Account.current.id}"
end
end
end
class SegmentRefresher < ApplicationJob
include BatchByAccount
def perform
segment_ids = Redis.current.lpop(self.class.data_key, 100)
Segment.where(id: segment_ids).each do |segement|
# Refresh the segment
end
end
end
Теперь контроллер может вызвать SegmentRefresher.perform_batch_later
с одним или несколькими Segments
, которые будут сохранены в Redis. Позже будет запущено задание, которое возьмет 100 таких идентификаторов сегментов за один раз для обработки.
Эта техника может быть действительно мощной. Она позволяет нескольким процессам не знать друг о друге и при этом обрабатывать данные вместе. Кроме того, мы можем использовать различные методы Redis, чтобы получить немного разное поведение. Например:
- Используя
rpush
для добавления записей иlpop
для их удаления, мы получим очередь «первый вошел/первый вышел». Мы можем использовать это, когда порядок заданий важен. - Использование
spush
для добавления записей иspop
для их удаления даст нам неупорядоченный набор. Это означает, что дублирующиеся данные автоматически отфильтровываются, что предотвращает выполнение ненужной работы. Однако это неупорядоченный набор, поэтому задания могут быть обработаны в другом порядке, чем они были записаны в очередь. - Использование временной метки и
zadd
для добавления записей иzrangebyscore
/zrem
для их удаления позволяет нам создать отложенный неупорядоченный набор. Это полезно для действий, которые мы хотим выполнить в будущем. Это может проявиться, если мы хотим выполнить действие через пять минут после того, как клиент перестанет взаимодействовать с объектом.
Ограничение времени выполнения отдельного задания
Теперь, когда мы собираем данные в пакетном режиме, мы хотим ограничить время выполнения одного задания. Чтобы решить эту задачу, мы воспользовались функциональностью камня job-iteration. Этот гем предоставляет интерфейс, в котором мы можем определить перечислитель и то, что нужно делать на каждой итерации. Остальное сделает сам гем.
Используя это, наше задание и модуль теперь будут выглядеть следующим образом:
(Для удобства чтения, часть кода, которая уже была показана, была удалена).
module BatchByAccount
extend ActiveSupport::Concern
class_methods do
def perform_batch_later(data)
# ...
end
def self.data_key
# ...
end
end
included do
include JobIteration::Iteration
end
def build_enumerator(*)
Enumerator.new do |yielder|
# We will pull 100 records out of the queue at a time and yield that to the enumerator
while (segment_ids = Redis.current.lpop(self.class.data_key, 100)).any?
yielder.yield segment_ids, nil
end
end
end
end
class SegmentRefresher < ApplicationJob
include BatchByAccount
def each_iteration(segment_ids)
Segment.where(id: segment_ids).each do |segement|
# Refresh the segment
end
end
end
Обратите внимание, что метод SegmentRefresher
‘perform
был заменен на метод each_iteration
.
Пока в Redis есть данные, это задание будет выполняться до тех пор, пока мы не достигнем порогового значения, определяемого параметром JobIteration.max_job_runtime
. По умолчанию это пять минут. Как только мы достигнем порогового значения, задание будет прервано и повторно поставлено в очередь. Это гарантирует, что даже если на обновление всех сегментов уйдет много времени, оно не будет монополизировать рабочего.
Ограничение параллельной обработки
Теперь, когда данные собираются в пакеты, а отдельные задания обрабатываются пакетно, мы хотим предотвратить возникновение условий гонки при одновременном выполнении нескольких заданий. Мы решили эту проблему с помощью гема activejob-uniqueness.
С помощью этого гема мы можем сделать задания уникальными. Дубликаты заданий для одного аккаунта будут игнорироваться. Поскольку есть только одно задание, мы должны обработать условия гонки, что произойдет, если наше одно задание завершится в тот же момент, когда будут добавлены новые данные.
В итоге задание выглядит следующим образом:
module BatchByAccount
extend ActiveSupport::Concern
class_methods do
def perform_batch_later(data)
# ...
end
def self.data_key
# ...
end
end
included do
include JobIteration::Iteration
unique :until_expired, lock_ttl: 5.minutes
rescue_from(StandardError, with: :handle_error)
on_shutdown do
# Ensure than when we are interrupting the job, that we clear the lock so that it can be re-queued
lock_strategy.unlock(resource: lock_key)
end
on_complete do
if Redis.current.llen(self.class.data_key) > 0
# This is the race condition
# If we are complete, but there is still data in the queue, we need to enqueue a new job to process it
self.class.perform_later
end
end
end
end
def handle_error(exception)
# Ensure we unlock the job on error
lock_strategy.unlock(resource: lock_key)
raise exception
end
# These arguments can be tweaked or overridden to lock on different criteria or allow
# some amount of parallelism
def lock_key_arguments
[Account.current.id]
end
def build_enumerator(*)
# ...
end
end
class SegmentRefresher < ApplicationJob
include BatchByAccount
def each_iteration(segment_ids)
# ...
end
end
Устойчивость по умолчанию
Само задание почти не изменилось, но теперь оно стало более устойчивым. Создав несколько простых для повторного использования шаблонов, инженеры могут больше сосредоточиться на своих функциях, а не беспокоиться о распространенных проблемах устойчивости. Мы можем направить энергию на то, чтобы сделать правильный выбор легким для всех.
Окончательный код будет выглядеть следующим образом:
module BatchByAccount
extend ActiveSupport::Concern
class_methods do
def perform_batch_later(data)
data.each_slice(100) do |slice|
Redis.current.rpush(data_key, slice.map(&:id))
end
# If the job is already enqueued or running, this will be a no-op
perform_later
end
def self.data_key
"SegmentRefresher:#{Account.current.id}"
end
end
included do
include JobIteration::Iteration
unique :until_expired, lock_ttl: 5.minutes
rescue_from(StandardError, with: :handle_error)
on_shutdown do
# Ensure than when we are interrupting the job, that we clear the lock so that it can be re-queued
lock_strategy.unlock(resource: lock_key)
end
on_complete do
if Redis.current.llen(self.class.data_key) > 0
# This is the race condition
# If we are complete, but there is still data in the queue, we need to enqueue a new job to process it
self.class.perform_later
end
end
end
end
def build_enumerator(*)
Enumerator.new do |yielder|
while (segment_ids = Redis.current.lpop(self.class.data_key, 100)).any?
yielder.yield segment_ids, nil
end
end
end
def handle_error(exception)
# Ensure we unlock the job on error
lock_strategy.unlock(resource: lock_key)
raise exception
end
# These arguments can be tweaked or overridden to lock on different criteria or allow
# some amount of parallelism
def lock_key_arguments
[Account.current.id]
end
end
class SegmentRefresher < ApplicationJob
include BatchByAccount
def each_iteration(segment_ids)
Segment.where(id: segment_ids).each do |segement|
# Refresh the segment
end
end
end
Подпишитесь на бесплатную пробную версию Aha! Develop
Aha! Develop — это полностью расширяемый инструмент agile-разработки. Расставляйте приоритеты в бэклоге, оценивайте работу и планируйте спринты. Если вас интересует интегрированный подход к разработке продукта, используйте Aha! Roadmaps и Aha! Develop вместе. Зарегистрируйтесь на бесплатную 30-дневную пробную версию или посетите живую демонстрацию, чтобы узнать, почему более 5 000 компаний доверяют нашему программному обеспечению, чтобы создавать любимые продукты и быть счастливыми при этом.