Конвейер (Pipeline) — IV

Ближе к жизни

Предыдущий пример стоит достаточно далеко от жизни, так как часто не требуется выполнять последовательно задачи из одного сервиса. Чаще всего бывает несколько сервисов, которые работают над своими задачами. Требуется их как-то красиво связать в одном месте, чтобы это было понятно и прозрачно. Собственно такой пример я и собираюсь показать. Запомните сколько строчек кода понадобилось для описания предыдущего примера и сколько это займет в конце.

Итак, обычно на каждую фазу бывает по сервису. В таком случае, все же будет разумно, чтобы сервис имел поток публикуемых результатов.

Вообще ничего сложного. Каждый класс делает свою работу заданное количество времени и по истечению времени публикует значение дальше.

При таком минимальном вмешательства в реализацию класса, код конвейера сокращается очень сильно, и становится гораздо более читабельным и ясным. Более четко видна связь и идея.

Сначала идет инициализация всех необходимых сервисов, затем идет настройка балансировщика. В данном случае я сделал специальный класс, который принимает делегат обработки. Затем настраивается сам конвейер. Согласитесь, что настройка конвейера в три оператора очень хорошо выглядит. Остальное должно быть знакомо уже и не вызывать вопросов.

Надо только еще показать, как реализован балансировщик:

Я думаю, что все останавливаться на нем не стоит подробно. Общая идея уже была высказана, просто реализация выделена в отдельный класс, для более удобного использования.

Надеюсь, что с таким примером, будет уже точно понятно как это все применить в жизни и не будет восклицаний в духе: «Ага, в статье показываем как сложить 2+2, а на дом задаем расчет двойного определенного интерала! Что за [censored] люди!? Вы [censored] там [censored] [censored] [censored] [censored] совсем [censored] [censored] страх потеряли и не [censored] программировали [censored] давно и не учились видимо ничего по [censored] таким [censored] статьям!!!!!11111»

Конечно, разные фишки Rx остались за бортом, так как статья не о возможностях Rх, хотя их очень много и можно сильно облегчить жизнь себе.

Так же за бортом пока что осталась обработка исключительных ситуаций.

Еще хочу сказать, что после легкости использования Rx, совершенно не хочется показывать и разрабатывать версию конвейера с балансировкой на Task. Совершенно не хочется возиться с потокобезопасными коллекциями, это должна быть большая инфраструктурная обвязка.

Все же приведу пример из оригинальной статьи про конвейеры:

 

Выводы

Собственно какие можно сделать выводы из всего написанного выше:

  • Конвейер можно реализовать самыми разными способами, какие-то понятнее, какие-то нет, но основными конкурентами могут все равно считаться реализация на основе Task и на Rx. По крайней мере для меня.
  • Конвейер используется, когда надо провести последовательную обработку данных, но количество и время поступления данных заранее не известны.

Если сравнивать между собой подходы с использованием класса Task и Rx, то можно составить следующую таблицу Pros & Cons за каждый подход:

Реализация с помощью Task Реализация с помощью RX
+ Многопоточность по умолчанию — Выполнение в один поток по умолчанию. Возможно настроить.
+ Легкий и четкий контроль количества потоков
+ Конструирование в run-time по условиям. Можно использовать конструкцию ContinueWith(). +  Конструирование в run-time по условиям. Можно использовать конструкцию Subscibe
+ Возможность восстановления с любой фазы конвейера.
+ Возможность указывать пути работы при ошибке. Указывается при создании ContinueWith. + Возможность указывать пути работы при ошибке. Используется конструкция OnError().
+ Возможность остановки конвейера +
— Сложность распараллеливания отдельной фазы конвейера. + Легко распараллелить фазу.
— Ограниченность синхронизационных действий. Wait(), WaitAll(), WaitAny(). + Легкость синхронизации и фигурного объединения потоков. Rx создан для этого.

 

В рабочем проекте я использовал Rx и ничуть не пожалел, так как он очень компактен и позволяет очень ярко и точно выразить свои желания относительно того, как данные должны быть скомпонованы и обработаны.

Ошибки проектирования

Есть несколько моментов, о которых стоит помнить при реализации конвейера удобным вам способом.

Простаивающие потоки

Конвейер в целом требует, чтобы все его задачи выполнялись параллельно. Если потоков для выполнения задач недостаточно, то буферы могут заполнятся и блокироваться бесконечно. Если вы используете класс Task для задач конвейера, то не забудьте указать ключ создания LongRunning, как я показывал в примерах выше. Так вы гарантируете каждой задаче по потоку. Если вы его забудете, то в целом тоже ок, но скорость упадет. Т.е. менеджер потоков увидит, что где-то в начале задачи иссякли, появились ресурсы и для задач без потоков выделит необходимые ресурсы. Но это время, что я так же показал в тестах и вы можете сами проверить данные.

Исключительные ситуации

В зависимости от реализации, необходимо помнить о том, как исключительные ситуации повлияют на конвейер. В большинстве случаев такая оказия остановит конвейер. Хотя конвейер должен продолжать работу, а сбойный элемент можно исключить из дальнейшей обработки. Либо необходимо корректно останавливать конвейер для решения проблем.

 

 

Итого

Надеюсь вы осилили данный опус и вынесли для себя много полезной информации, которую можно применить в реальной жизни.

В дальнейшем может быть я еще рассмотрю особенности обработки исключений и отмены работы конвейера для реализации с помощью Rx, но пока что это должно быть реализовано в общем случае в сервисах.

Исходный код SVN

 

 

 

Hard’n’Heavy!

 

 

 

Оставить комментарий