Как реализовать отложенные сообщения в RabbitMQ? Примеры кода

Иногда вам нужно реализовать запланированные или повторяющиеся действия в вашем приложении. Например, отправка push-уведомления через 10 минут или очистка временной папки каждый день.
Для этого можно использовать cron-задачи, которые автоматически запускают скрипты на вашем сервере, или пакет node-schedule (библиотека планирования задач для Node.js).
Но при использовании обоих этих решений возникает проблема масштабирования:
Существует несколько серверов, поэтому может быть неясно, на каком из них запускать задачу.
Выбранный сервер может упасть
Узел может быть удален из-за освободившихся ресурсов.

Одним из возможных решений здесь является RabbitMQ, брокер сообщений. Посмотрите общую схему реализации отложенных сообщений в этом примере на GitHub. А вот как это выглядит в деталях, шаг за шагом:

1. Создайте 2 обменника: обычный и отложенный.

export const HELLO_EXCHANGE = Object.freeze({
    name: 'hello',
    type: 'direct',
    options: {
        durable: true,
    },
    queues: {},
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
    name: 'helloDelayed',
    type: 'direct',
    options: {
        durable: true,
    },
    queues: {},
});
Войдите в полноэкранный режим Выйти из полноэкранного режима

2. В каждом из обменников создайте очереди с одинаковым типом привязки, но разными именами.
Для HELLO_EXCHANGE:

queues: {
        WORLD: {
            name: 'hello.world', // subscribe to this queue
            binding: 'hello.world',
            options: {
                durable: true,
        },
    },
},
Войти в полноэкранный режим Выйти из полноэкранного режима

Для HELLO_DELAYED_EXCHANGE:

queues: {
      WORLD: {
            name: 'helloDelayed.world',
            binding: 'hello.world',
            options: {
                durable: true,
                queueMode: 'lazy', // set the message to remain in the hard memory
        },
    }
Войдите в полноэкранный режим Выйти из полноэкранного режима

Для очереди отложенного обмена установите аргумент x-dead-letter-exchange с именем обычной очереди. Этот аргумент указывает брокеру RabbitMQ передать сообщение на этот обменник, если оно не будет обработано.

arguments: {
                'x-dead-letter-exchange': HELLO_EXCHANGE.name, // set the queue to transfer the message to once it's dead
}
Вход в полноэкранный режим Выход из полноэкранного режима

3. Опубликовать сообщение в очередь отложенного обменника с указанием срока действия.

// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
    exchangeName: exchangeNameDelayed,
    queue: WORLD_DELAYED,
    expirationInMs: 30000, //set when the message dies (in 30s)
});
Войти в полноэкранный режим Выйти из полноэкранного режима

Как только срок действия отложенного сообщения истечет, оно перейдет в очередь обычного обменника.
Теперь вам нужно только установить потребителя для очереди обычного обменника:

// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
    createConsumer(
        {
            queueName: HELLO_EXCHANGE.queues.WORLD.name,
            prefetch: 50,
            log: true,
        },
        controller.consumeHelloWorld,
    ),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
    const result = await world({ name: payload.name });
    logger.info(result.message);
    // await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
Войти в полноэкранный режим Выход из полноэкранного режима

Прибыль!
Если вам нужно запускать действие периодически, опубликуйте сообщение в отложенном обменнике снова в конце секции потребителя.

// await publishHelloDelayedWorld({ name: payload.name });
Вход в полноэкранный режим Выход из полноэкранного режима

ПРИМЕЧАНИЕ: RabbitMQ работает по принципу FIFO (первым пришел, первым ушел) — он обрабатывает команды в том же порядке, в котором они были заданы. Поэтому если вы опубликуете в одной очереди отложенное сообщение с истечением 1 дня и сообщение с истечением 1 минуты, она обработает второе сообщение после первого, а целевое действие для второго сообщения произойдет через минуту после первого.
В итоге вы получите вот что:
1. Создайте обменники и очереди

// services/base-service/src/broker/const/exchanges.ts
export const HELLO_EXCHANGE = Object.freeze({
    name: 'hello',
    type: 'direct',
    options: {
        durable: true,
    },
    queues: {
        WORLD: {
            name: 'hello.world', // subscribe to this queue
            binding: 'hello.world',
            options: {
            durable: true,
            },
        },
    },
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
    name: 'helloDelayed',
    type: 'direct',
    options: {
        durable: true,
        queueMode: 'lazy', // specify that the hard memory must store this message
},
queues: {
    WORLD: {
        name: 'helloDelayed.world',
        binding: 'hello.world',
        options: {
            durable: true,
            queueMode: 'lazy', // specify that the hard memory must store this message                arguments: {
                'x-dead-letter-exchange': HELLO_EXCHANGE.name, // specify the queue to which the message must relocate after its death
                },
            },
        },
    },
});
Войдите в полноэкранный режим Выйдите из полноэкранного режима

2. Добавьте издателя, который будет отправлять сообщение в отложенную очередь

// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
    exchangeName: exchangeNameDelayed,
    queue: WORLD_DELAYED, 
    expirationInMs: 30000, // set when the message dies (in 30s)
});
Войти в полноэкранный режим Выйти из полноэкранного режима

3. Добавьте потребителя для очереди обычного обменника

// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
    createConsumer(
        {
            queueName: HELLO_EXCHANGE.queues.WORLD.name,
            prefetch: 50,
            log: true,
        },
        controller.consumeHelloWorld,
    ),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
    const result = await world({ name: payload.name });
    logger.info(result.message);
    // await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
Войти в полноэкранный режим Выйти из полноэкранного режима

4. Прибыль!
Существует также плагин, который делает эту работу за вас и упрощает реализацию. Вы создаете только один обменник, одну очередь, одного издателя и одного потребителя.
При публикации плагин будет обрабатывать отложенное сообщение и, по истечении срока, передавать его в нужную очередь. И все это самостоятельно.
С этим плагином запланированные сообщения обрабатываются в порядке истечения срока действия. То есть, если вы опубликуете сообщение с задержкой в 1 день, а затем сообщение с задержкой в 1 минуту, второе будет обработано раньше первого.

// services/base-service/src/broker/const/exchanges.ts
export const HELLO_PLUGIN_DELAYED_EXCHANGE = Object.freeze({
   name: 'helloPluginDelayed',
   type: 'x-delayed-message', // specify the delayed queue
   options: {
       durable: true,
        arguments: {
          'x-delayed-type': 'direct', // set the recipient      
        },
    },
    queues: {
        WORLD_PLUGIN_DELAYED: {
            name: 'helloPluginDelayed.world', // subscribe to the queue
            binding: 'helloPluginDelayed.world',
            options: {
                durable: true,
            },
        },
    },
});
Войти в полноэкранный режим Выйти из полноэкранного режима

Добавьте издателя, который отправляет сообщения в очередь с задержкой:

export const publishHelloPluginDelayedWorld = createPublisher({
    exchangeName: exchangeNamePluginDelayed,
    queue: WORLD_PLUGIN_DELAYED,
    delayInMs: 60000,  // specify when the message should die (60s)
});
Войти в полноэкранный режим Выйти из полноэкранного режима

Добавить потребителя в очередь:

// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
    createConsumer(
        {
            queueName: HELLO_PLUGIN_DELAYED_EXCHANGE.queues.WORLD_PLUGIN_DELAYED.name,
            prefetch: 50,
            log: true,
        },
        controller.consumeHelloWorld,
    ),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
    const result = await world({ name: payload.name });
    logger.info(result.message);
};
Войти в полноэкранный режим Выйти из полноэкранного режима

И — готово!

Мы регулярно используем RabbitMQ в наших проектах. Например, посмотрите на его использование в портфолио интернет-телевидения Janson Media. Это сервис проката фильмов, но в цифровом формате.

Здесь мы использовали отложенные сообщения RabbitMQ для трех основных функций приложения: отправка электронных писем и SMS-сообщений для уведомления пользователей о том, что, например, срок аренды почти закончился; отправка сообщений о завершенных платежах на сокет и отправка уведомления пользователю; отправка загруженных видео на дальнейшую обработку.

Надеюсь, внедрение отложенных сообщений больше не будет для вас подобно падению в кроличью нору (если это вообще когда-либо было так) 🙂 .

Оцените статью
devanswers.ru
Добавить комментарий