Этот сайт использует файлы cookies. Оставаясь на сайте, Вы соглашаетесь с использованием файлов cookies и принимаете Соглашение об использовании сайта.

очередь сообщений

Как мы «честную» очередь организовывали

431
431

Использование очередей сообщений доказало свою эффективность в решении вопросов надежности и высокой производительности коммуникаций между компонентами системы. Но часто специфика бизнеса заставляет пересматривать готовые подходы, модифицировать их или вовсе искать новые решения. Рассмотрим случай, с которым столкнулась наша команда, и решение, к которому мы пришли.


Проблематика

Итак, дано:

  1. Клиент через Web API оставляет запрос на отправку сообщения адресату или множеству адресатов. Запрос состоит из темы сообщения, тела сообщения и списка адресатов.
  2. Запрос попадает в очередь Message Queue.
  3. Message Sender сервис вытаскивает из очереди запрос, выполняет логику по разделению исходного запроса на количество сообщений, равному количеству адресатов, сохраняет эти изменения в базе данных, и уже каждое отдельное сообщение для каждого адресата отправляется на SMTP сервер для непосредственной доставки. Такое разделение является бизнес-требованием. Тема и тело сообщения может быть шаблоном, в который будут подставлены персональные данные каждого адресата.
запрос на отправку сообщения
Описание задачи.

Стоит заметить, сервис Message Sender состоит из нескольких потребителей или обработчиков, которые могут параллельно обрабатывать запросы из очереди. Количество таких потребителей обычно задается равным количеству ядер процессора. Масштабирование может быть как вертикальным — увеличение производительных мощностей приведет к увеличению количества обработчиков в рамках одного экземпляра сервиса, так и горизонтальным — за счет увеличения количества экземпляров сервиса на разных машинах.

Схема довольно простая, но здесь есть следующие проблемы, с которыми мы столкнулись:

  1. SMTP сервер может быть недоступен по некоторым причинам (проблемы сети, внутренняя ошибка и т. д.), но мы не можем позволить себе потерять сообщение, и должны доставить его, как только сервер будет доступен.
  2. Слишком большое количество адресатов (рекламные кампании, массовые рассылки для десяток тысяч пользователей). Так как исходный запрос будет разделен по количеству адресатов, то и отправлены эти сообщения будут последовательно. Такая ситуация заставит других клиентов ожидать своей очереди, что будет несправедливо.

Решение

Если для решения первой проблемы мы можем просто не подтверждать обработку сообщения и выполнить попытку его доставки снова (на момент написания статьи я нашел отличную библиотеку Polly для обработки ситуаций, когда стоит немного подождать, прежде чем повторять действие снова), то для решения второй проблемы логично было бы предположить, что обработка должна вестись по одному сообщению с каждого запроса клиента в очереди. Визуально это будет выглядеть следующим образом:

механизм обработки сообщений
Варианты решения задачи.

Основная идея в том, что у нас есть очередь запросов клиентов, каждый из которых имеет стэк сообщений. Поочередно извлекаем запрос, вытаскиваем из стека сообщение, обрабатываем его и, если в стеке еще остались сообщения, то добавляем текущий запрос в конец очереди, иначе просто переходим к обработке следующего запроса.

Введем для этого дополнительное звено, еще один сервис, назовем его Message Scheduler и избавим Message Sender от обязанностей разделения адресатов и сохранения изменений в базу данных, оставив только то, чем он действительно должен заниматься, а именно, — доставкой сообщений до SMTP сервера.

Теперь нам надо организовать общение между вновь добавленным сервисом и текущими звеньями имеющейся архитектуры. Добавим дополнительную очередь, объем и порядок сообщений в которой мы будем контролировать, что делает механизм обработки «честным» по отношению к клиентам:

обработка сообщения
Добавление дополнительной очереди.

Прежде чем мы перейдем к описанию нового сервиса, стоит сказать несколько слов об акторной модели, которая была выбрана для реализации Message Scheduler.

Модель акторов

Модель акторов не является новым веянием моды. Карл Хьюитт описал ее еще в начале 70-х. Модель акторов — это подход к распараллеливанию программ, главным достоинством которого является отсутствие проблемы общего разделяемого состояния. Больше никаких lock’ов.

Основная идея в том, что приложение построено из многих легковесных процессов, называемых акторами. Каждый актор отвечает за одну очень маленькую задачу, поэтому легко понять, что он делает.

В основе всего лежит система акторов, которая содержит несколько системных акторов и типы акторов, определенные пользователем. Для большинства языков программирования уже есть реализация данного подхода. Например, для Java существует фреймворк Akka, для .NET — Akka.NET.

Система акторов представляет собой иерархию. У каждого актора есть актор-супервайзер, и каждый актор может быть актором-супервайзером.

модель акторов
Модель акторов.

В иерархической структуре акторов есть несколько значительных преимуществ:

  1. Актор-супервайзер порождает потомков через контекст и следит за их состоянием. Это дает контроль над ошибками и предоставляет возможность решить, что с этими ошибками делать (перезапустить, уничтожить потомка и т. д.).
  2. Актор-супервайзер инкапсулирует логику работы с потомками и предоставляет абстракцию более высокого уровня для других акторов, с которыми будет происходить коммуникация.

Взаимодействие между акторами происходит через механизм обмена сообщениями. Но как система узнает от кого и кому необходимо доставить сообщение? Актор имеет уникальный адрес с учетом местоположения в иерархии.

Когда актор получает сообщение, это сообщение попадает в его внутренний Mailbox. Если актор тратит много времени на каждое входящее сообщение, то новые сообщения начинают скапливаться в Mailbox’е. Новое сообщение может быть извлечено только после полной обработки предыдущего сообщения. Если обработка сообщения — это метод класса, то критерием окончания обработки будет выход из этого метода. Поэтому имеет смысл долгоиграющие операции, такие как работа с БД, делать асинхронными.

Те, кто хочет познакомиться с моделью акторов поближе, рекомендую посмотреть два доклада (первый, второй) и пройти Akka.net bootcamp.

Реализация Message Scheduler сервиса

Теперь вернемся непосредственно к реализации самого сервиса. Для решения поставленной задачи было создано три актора: Message Scheduler, Message Splitter, Dispatcher.

Так как мы собираемся обрабатывать по одному сообщению с каждого запроса за раз, нам нужно знать какие сообщения от клиента уже были доставлены и какие еще предстоит отправить. Это приводит к тому, что актор Message Scheduler будет иметь внутреннее состояние, чтобы следить за этим (как работать с состоянием актора мы поговорим чуть позже, сейчас главное знать, что за нас уже все придумали и, например, фреймворк Akka содержит модуль Persistence для таких случаев).

реализация Message Scheduler сервиса
Реализация Message Scheduler сервиса.

Итак, путь запроса от общей очереди Scheduler Queue до нашей «справедливой» очереди Sender Queue:

  1. Message Scheduler получает запрос клиента из очереди сообщений Scheduler Queue, добавляет его во внутреннюю коллекцию запросов и просит актора Message Splitter разбить запрос на сообщения.
  2. Message Splitter разделяет исходный запрос согласно количеству адресатов, сохраняет изменения в БД и возвращает коллекцию сообщений.
  3. Message Scheduler добавляет к исходному запросу вновь созданные сообщения в виде стека и помещает запрос во внутреннюю очередь обработки.
  4. Отправляем по одному сообщению из каждого запроса к актору Dispatcher, который просто кладет сообщение в очередь Sender Queue и отвечает флагом Ok, что все прошло успешно. Если в стеке сообщений текущего запроса остались ещё сообщения, то добавляем этот запрос в конец внутренний очереди актора Message Scheduler.

Но как мы контролируем количество сообщений в очереди Sender Queue и не нагружаем Message Sender Service больше необходимого? Дело в том, что коммуникации между акторами могут быть не только в пределах одной системы (сервис/проекта).

Еще один большой плюс акторной системы в том, что внешне процесс взаимодействия с локальным или удаленным актором никак не отличается. Помните мы говорили про уникальный адрес актора? Так вот он включает в себя также название системы акторов и может иметь в названии протокол, по которому будет происходить взаимодействие.

уникальный адрес актора
Уникальный адрес актора.

Благодаря этому мы с легкостью реализовали актор MessageSchedulerClient на стороне Message Sender Service, который запрашивает такое количество сообщений, которое ему комфортно обработать.

Что еще полезно знать об акторах?

Чтобы уберечься от потери сообщений при взаимодействии акторов, во фреймворке Akka есть модуль Akka.Persistence. Этот модуль позволяет сохраняет сообщения актора, восстанавливать их при перезагрузке актора и снимать snapshot’ы для оптимизации (чтобы не загружать каждое сообщение по отдельности, а загрузить их все за одну операцию чтения). Существуют реализации данного модуля для работы с файлами, с различными база данными и т. д.

Akka позволяет не только создавать акторы, которые обмениваются между собой сообщениями, но и строить кластеры, состоящие из нескольких физических машин. При этом акторы, запущенные на разных машинах, все еще могут взаимодействовать друг с другом.

Вывод

Вполне возможно данное решение является лишь одним из множества решений подобных задач и вы уже сталкивались с подобным. Поэтому если у вас есть вопросы или предложения, добро пожаловать в комментарии. Спасибо:)