Конвейер (Pipeline) — I

Сложность 200-400

Практически всегда темы для статей происходят из жизни, из реальных проектов и проблем, которые надо решить. Иногда делаешь как проще и быстрее, а потом уже потихоньку перерабатываешь как надо. В этот раз, скорее всего, сразу получилось как надо =)

Преамбула

Задача в общем смысле стояла примерно так:

  • С некоторой периодичностью приходят файлы данных в xml;
  • Файлы надо распарсить;
  • Проверить данные, контрольные суммы, верность указания справочников;
  • Отправить пользователю на проверку суммарные данные;
  • Сохранить данные в базу.

Интуитивно понятно, что при обработки файлов применяется строгая последовательность шагов. Для каждого шага используется свой сервис и, для скорости, каждый шаг может выполнятся в отдельном потоке. Так я и сделал.

Описанная ситуация и решение ее неформально описывают конвейер обработки данных. Чуть позже я наткнулся на статью в MSDN о конвейерах, прочитал, проанализировал насколько верно все получилось в моем решение и что было предложено собственно в MSDN. Результат анализа и экспериментов мне показался интересным для того, чтобы рассказать о них вам. Кроме того, сам подход на мой взгляд хорош для решения проблем, а представление в коде имеет определенную красоту и легкую расширяемость. Но обо всем по порядку.

Теория

Конвейер является шаблоном проектирования при котором используется параллельная обработка задач с заданным порядком следования входных данных. Каждая задача реализует свой этап конвейерной обработки. При этом должны существовать некоторые буферы между этапами, которые будут накапливать и отдавать данные для последующей обработки. Буферы обеспечивают параллельную обработку задач, даже если данные приходят последовательно. Естественно, что данный шаблон имеет прямые отсылки к фабричным конвейерам.

Исходя из такого, более формального определения конвейера, видно, что задача поставленная в преамбуле точно соответствует данному шаблону.

Конвейер используется, когда для обработки данных вовлечено много зависимостей и объектов, и действия невозможно произвести в параллельном цикле. Если лень лезть по ссылке и читать, то в вкратце речь идет об этом:

 

Остальные подробности, про плюсы и минусы, лучше почитать в статье, это не тема сегодняшней статьи.

Есть огромное количество случаев применения конвейера, и достаточное количество способов его реализации. Конвейер, как правило, полезен в случаях, когда количество исходных данных не задано заранее, а зависит от каких-либо событий: значения с биржи, события генерируемые пользователем, данные приходящие по сети, перекодировка данных. В задаче представленной в преамбуле файлы приходили пачками, в неизвестный момент времени. Во всех случаях очень важна последовательная обработка кусков данных, и последовательная их выдача.

В самом общем смысле конвейер является разновидностью шаблона производитель\потребитель. Цепочки потребителей и производителей связаны между собой и зависят друг от друга. Так же настойчивой рекомендацией является сохранение общего порядка следования элементов FIFO (первый вошел – первый вышел).

Необходимые замечания о проектировании

Если вы задумали использовать конвейер, вам необходимо четко продумать сколько шагов потребуется для решения вашей проблемы. Количество шагов реализации должно коррелировать с количеством ядер имеющимся на целевой системе.

В канонической реализации конвейер не является автомасштабируемым шаблоном, в плане захвата мощностей процессора. С увеличением количества ядер, конвейер не будет автоматически масштабироваться на новые ядра. Это одно из ограничений, которое конечно можно постараться обойти некоторыми ухищрениями и они будут не из раздела rocket science.

Большое количество шагов в конвейере – нормально, до тех пор, пока затраты на поддерживание буферов не занимают большую часть времени работы. Это обычно бывает при очень маленьких действиях в одном шаге.

Для достижения наиболее эффективной работы конвейера, требуется постараться сделать так, чтобы каждый шаг конвейера занимал примерно одинаковое время. Если это условие не соблюдено, то пропускная способность будет определятся самым медленным шагом, что может привести к блокировке и переполнению буферов. Решением могут стать балансировщики нагрузки, т.е. какие-то части конвейера могут быть дополнительно распараллелены.

Размер буфера, как вы уже наверно поняли, так же является важной частью всей работы. Маленький буфер может тормозить весь процесс, слишком большой буфер с большими данными может забирать неоправданно много памяти. Требуется искать оптимальные значения экспериментально.

В общем случае буфер должен быть таких размеров, чтобы нивелировать время обработки элемента, не больше. Следует хорошенько посидеть с профайлером, чтобы определить реальный диапазон размера данных и времени, который требуется для каждого шага, для всего процесса. Буферы должны минимально тормозить, а лучше вообще не тормозить, этапы конвейера ожиданием данных.

Основа экспериментов

В оригинальной статье в MSDN показывается реализация конвейера на основе потокобезопасных коллекций и классе Task. Я собираюсь показать реализацию на основе Rx Framework, на мой взгляд итоговый код получится более ясным и компактным. Сравнение до определенной степени может быть проведено с многопотоковым связанным выполнением задач с помощью класса Task.

Для построения экспериментов и для простоты кода возьмем пустой сервис, который ничего не делает, кроме как тормозит поток выполнение на заданное количество секунд. Секунды выбраны, чтобы драматичнее были результаты.

Как вы можете видеть из кода сервиса, будем считать, что наш конвейер состоит из 3 фаз, которые должны быть выполнены последовательно. В дальнейшем мы немного усложним сервис, чтобы быть ближе к жизни.

Так же прошу заметить, что фазы не равнозначны по времени и центральная занимает в два раза больше времени, это нам пригодиться в дальнейшем.

Реализация

Перед тем как мы перейдем к вопросам реализации, я хочу рассказать о том, какие пакеты NuGet будут использоваться для решения задач.

Проверку работоспособности кода будем производить с помощью тестов, которые реализованы на движке NUnit. Т.е. первым пакетом идет NUnit. Далее:

  • Rx-Main – собственно для Reactive Extensions
  • NBuilder – для построения списков объектов.
  • FluentAssertions – для написания условий проверки в удобном формате.

Исходный код с примерами можно получить из SVN репозитория. Решение разделено на несколько проектов, по именам вы сможете догадаться что для чего: линейный массив, Tasks и Rx.

Простейший конвейер

Для простейшей реализации и сравнения результатов возьмем 3 способа написания обработки:

  • «В лоб»
  • С помощью класса Task
  • С помощью Rx

Простейший вариант будет просто демонстрацией возможностей подходов, сколько они занимают кода, и просто как плацдарм для дальнейшего развития.

Реализация «в лоб»

При такой реализации фазы и время будут представлены в таком виде:

Т.е. у нас только один объект и его надо обработать. Понятное дело, что это все будет произведено синхронно и для каждого случая займет 4 секунды.

Все как и ожидалось, не стоит задерживаться подробно на примере. Все идет синхронно с блокировкой всей программы.

Реализация с помощью класса Task

Тут уже чуток интереснее, хотя все тоже в рамках обычных методов. Только разве блокировки UI не произойдет.

Здесь, для реализации конвейера используем методы продолжения вычислений. Для простоты не использую никакие дополнительные опции по созданию потоков и управлению продолжений методов, т.к. в данном рассмотрении это не существенно.

Как и для простейшей реализации, время для прохождения теста составило 4 секунды. Что вполне ожидаемо.

Реализация с помощью Rx

Так как данный пост о конвейере, то я не буду вдаваться в подробности работы Rx более, чем того требуется для общего повествования.

Основное заблуждение по поводу Reactive Extensions в том, что многие люди думают, что фреймворк по умолчанию многопоточный. Это не так. Он безопасен для работы в многопоточном режиме, но все операции и действия выполняются по умолчанию в одном потоке. Все операции созданы для удобной обработки потока информации. Т.е. работая с Rx следует воспринимать все действия как поток данных. В общем случае можно думать что им нет начала и конца, для лучшего представления в голове.

Для легкости и для большей отказоустойчивости я решил использовать ReplaySubject, так как этот объект «помнит» все объекты опубликованные для него и выдает их при отложенной подписке. Т.е. если подписаться на такой объект после публикации энного количества значений, то все предыдущие публикации попадут в подписавшийся метод.

Основные методами для реализации будут OnNext() для публикации значения, и метод Subscribe() для подписки на результат.

В некотором роде, код будет похож на то, как сделана реализация используя классы Task.

Сначала объявляются все объекты для подписки, выстраивается очередь обработки сообщений, и идет первый запуск сообщения с помощью метода start.OnNext().

Так же как и для всех других подходов, этот занял 4 секунды.

Итого, мы имеем 3 варианта, но ни один из них еще даже и близко не является настоящим конвейером во всех смыслах.

 

Продолжение следует.

 

 

 

Один комментарий на “Конвейер (Pipeline) — I

  1. Андрюха, ну как так?
    это нам (что слелает?) пригодится в дальнейшем. Пригодится. Без мягкого знака.
    А так норм :)

Оставить комментарий