Синхронизация разных потребителей одной и той же темы Kafka

Синхронизация в распределенных системах — сложная задача. Вы, вероятно, стремитесь предотвратить ее настолько, насколько это возможно. Но иногда требования бизнеса приводят к необходимости координации различных сервисов, которые имеют жесткую зависимость от свежести данных.

Для обобщения предположим, что архитектура состоит из службы-A, службы-B и службы-C. Все они потребляют сообщения из одной и той же темы Kafka, но, очевидно, обрабатывают их по-разному в соответствии с их бизнес-логикой, API и SLA. Когда Service-A обрабатывает сообщение, он обращается к API Service-B и Service-C и ожидает, что это сообщение будет туда включено. Поэтому служба-A может обработать сообщение только после того, как службы-B и-C успешно обработают это сообщение. Если одна из них не работает или данные в ней не обновляются, единственное, что может сделать Service-A, это остановиться и ждать. Другими словами, существует жесткая зависимость от Service-B и Service-C как с точки зрения доступности, так и свежести данных.

Проблема архитектуры заключается в том, как синхронизировать службу-A и ее зависимости с помощью наиболее масштабируемого, экономически эффективного и простого подхода. Да, я считаю, что принципом проектирования систем хабера является снижение сложности. (и если вы недостаточно убеждены, посмотрите эту замечательную книгу «Философия проектирования программного обеспечения», которая начинается с высокопарного объяснения опасностей сложности как в программном обеспечении, так и в проектировании систем).

Так почему же мы изначально создали для себя эту проблему?

Теперь вы, наверное, спрашиваете себя, если служба-A так чувствительна к порядку сообщений, то самый простой (и, возможно, единственный!) способ гарантировать порядок — это выполнение обработки, которую выполняют службы-B и-C, в виде последовательных шагов внутри службы-A. Почему вы распределили их повсюду?

Несомненно, архитектура микро (или мини) сервисов имеет свою стоимость, но этот стиль архитектуры повышает скорость работы команды, поскольку каждый отдельный сервис легче разрабатывать, тестировать, развертывать и, что более важно, масштабировать, поддерживать и эксплуатировать. Это, безусловно, не уникально для нашей организации, но лично я уже убедился и оценил эти преимущества, наблюдая за реальностью в Dev org. Представьте, что сервис-A — это процесс обнаружения AML, который потребляет платежи и оценивает их риск. Служба-B обрабатывает отношения между сущностями, извлеченные из той же темы платежей, а служба-C отвечает за агрегирование этих платежей и предоставление эффективных запросов на их агрегирование с учетом времени. Подобно обнаружению AML (служба-A), существует также служба Fraud (назовем ее службой-A1) с совершенно иной бизнес-логикой, но все же требующая запросов к графу связей сущностей и агрегированному профилю. Мы не хотим, чтобы каждая команда инвестировала в выбор технологий Time-Series DB и Graph DB, получала опыт того, как они масштабируются (или нет… ) и работают, вместо того, чтобы сосредоточиться на своем бизнесе — AML или обнаружении мошенничества.

Возвращаясь к нашей первоначальной проблеме…

Если мы продолжим использовать обнаружение AML в качестве конкретного примера службы-A, как она может убедиться, что, обратившись к API отношений (открытому службой-B) или API агрегации (открытому службой-C), она получит актуальные результаты? Что если они временно не работают или замедляются по какой-то причине? Процесс обнаружения НЕ может обработать сообщение, пока не будет доступна вся необходимая информация. Продвижение с несвежей информацией может закончиться пропуском отмывания денег на миллионы долларов!

Архитектурный паттерн

Основная идея здесь заключается в использовании управления смещением в Kafka в качестве единого источника истины для отслеживания прогресса различных служб. Смещение — это простое целое число, которое используется Kafka для поддержания текущей позиции потребителя. Текущее смещение — это указатель на последнюю запись, которую Kafka уже отправила потребителю в последнем опросе. Таким образом, потребитель не получает одну и ту же запись дважды из-за текущего смещения. Поскольку Kafka управляет смещением для каждой темы, группы потребителей и раздела, это фактически означает, что каждая отдельная запись может быть идентифицирована только по . Существуют различные стратегии того, как потребители могут фиксировать смещение в Kafka, здесь мы полагаемся на фиксацию, которая выполняется только после успешной обработки сообщения потребителем.

Когда сервис-A опрашивает сообщение из темы, он должен сначала извлечь раздел и смещение этого сообщения. Затем он вызывает Kafka admin API, чтобы проверить, что это смещение уже обработано группами потребителей, связанными с Service-B и Service-C. Kafka admin API вызывается для каждой группы потребителей для получения карты разделов темы со смещением (посмотрите на функцию listConsumerGroupOffsets Java-клиента Kafka admin). Минимальное смещение на раздел представляет смещение самого медленного сервиса.

Обратите внимание, что сервис может потенциально потреблять из различных тем, например, Service-C потребляет различные типы событий для агрегации, поэтому Service-A должен четко определить свою зависимость как комбинацию, которая технически транслируется в одну группу потребителей.
Пока минимальное смещение между Service-B и Service-C меньше, чем смещение сообщения, которое потребляет Service-A, он просто ждет.

Алгоритм более подробно…
Предположим ситуацию, когда смещение темы T1 является следующим:

Группа потребителей CG-A (Сервис-A):

  • раздел p1 (C1): 0
  • раздел p2 (C1): 0
  • раздел p3 (C2): 0

Группа потребителей CG-B (услуга-B):

  • раздел p1 (C1): 30
  • раздел p2 (C2): 30
  • раздел p3 (C3): 10

Группа потребителей CG-C (Service-C):

  • раздел p1 (C1): 23
  • раздел p2 (C2): 52
  • раздел p3 (C3): 15

Как видите, тема t1 разбита на 3 раздела. Группы CG-A и CG-B состоят из 3 потребителей каждая, поэтому каждый потребитель обрабатывает только один раздел. Но в CG-A, оба раздела p1 и p2 обрабатываются потребителем C1.

Каждый потребитель в CG-A должен хранить обновленную карту разделов. Эта карта может обновляться периодически:

Во время текущего опроса сообщений:

for every consumed message /// (sorted from early to latest)
{
set message.offset = extract the offset of the current message
set message.partition = extract the partition of the current message
while (message.offset > partitionMinOffset[message.partition])
wait
process /// only now service-A can process the message
manual commit offset /// commit offset after processing successfully
}

Поскольку лучшей практикой является предотвращение статического назначения раздела потребителю, потребитель не может предполагать список обрабатываемых разделов, но должен хранить все смещения тематических разделов, поскольку потенциально он может обрабатывать любой из этих разделов в любой момент времени.

Предположения
Этот подход в значительной степени основан на смещении Kafka для отражения фактического прогресса обслуживания. На самом деле, существуют различные стратегии того, как фиксировать смещение в Kafka. Мы внутри используем flink-connector-kafka, который фиксирует смещение только после завершения контрольной точки (когда OffsetCommitMode установлен в ON_CHECKPOINTS). В любом случае, независимо от конкретной технологии, предполагается, что потребители отключают автокоммит по умолчанию, а вручную фиксируют смещение только после успешной обработки сообщения, даже в случае асинхронных операций.

Предлагаемая альтернатива
Другой подход, который был рассмотрен, заключается в том, что службы-B и-C будут опускать ID обработанных сообщений в специальную выходную тему, а служба-A будет соединять выходные темы и исходную тему для получения полной полезной нагрузки. Только после того, как идентификатор сообщения будет получен из всех трех тем, Service-A обработает сообщение.

Этот подход был отвергнут из-за следующих недостатков:

  • Сложность со стороны производителя: тот факт, что служба-B (и служба-C) должна писать в свою собственную внутреннюю БД, а также в выходную тему (и фиксировать смещение Kafka, конечно), увеличивает вероятность того, что в некоторых случаях эти два источника, БД и выходная тема, будут рассинхронизированы.

  • Сложность со стороны потребителя: Service-A должен соединить исходную тему T1 и две выходные темы и тщательно обработать ситуацию, когда одна (или несколько) из них сильно отстает. Он может НЕ упасть (из-за OOM), а просто прекратить потребление и подождать. Правда, механизмы обработки потоков, такие как Flink, могут справиться с обратным давлением, управляя блокирующей очередью для каждого источника, но это значительно увеличивает сложность.

  • Массовый объем сообщений: этот подход просто значительно увеличивает количество сообщений, поскольку каждое сообщение будет практически дублироваться (или утраиваться), что может повлиять на производительность и стоимость шины событий.

Подведем итоги
Если вы являетесь экспертом по Kafka, то, возможно, для вас это очевидно, но, как ни странно, убедить людей избавиться от дополнительных тем вывода оказалось не так уж тривиально, поскольку они в основном уже захвачены самим смещением Kafka.

Эта схема отлично работает для нескольких потребителей одной и той же темы. На самом деле, его можно расширить даже для случаев, когда они потребляют разные темы, при условии, что все они инициированы из одной общей темы. Но этот паттерн гораздо сложнее и требует отдельной статьи…

Оцените статью
devanswers.ru
Добавить комментарий