Конвейер (Pipeline) — II

Конвейер для небольших массивов

Предоставленные реализации пока что не претендуют на звание реального конвейера, и представлены чтобы показать от чего мы будем отталкиваться, и чтобы проследить путь эволюции. Дальше уже будет интереснее, будет видна реальная разница в применении тех или иных технологий к решению задачи. Будут выявлены плюсы и минусы подходов.

Обработка одного значения совсем не интересно, гораздо интереснее посмотреть временные затраты при небольших массивах. Для начала пусть будет 4 значения в массиве.

Чтобы увидеть, что методы конвейера выполнились в верном порядке, будем использовать модифицированный сервис, который на каждой фазе к значению будет добавлять идентификатор фазы.

Ничего сложного, но зато можно отследить порядок выполнения фаз. При прохождении через сервис, мы должны получить значение с постфиксом «smf».

Один поток

Как вы можете догадаться, для одного массива распределение по потокам и времени будет выглядеть следующим образом:

Тут не надо быть семи пядей во лбу, чтобы посчитать затрачиваемое время и увидеть, как это будет отображаться на модель использования потоков – все делается в основном потоке приложения.

Я несколько сократил запись, вышло компактно, но наверно не очень понятно с первого взгляда. Ну да на этом все возможности такого построения и исчерпываются, если говорить начистоту. Усложнять методы сервиса и вносить туда инфраструктурную логику для обслуживания конвейера или асинхронных прелестей как-то не хочется совершенно, так как это усложнит логику метода, что совсем не нужно в данном случае. К тому же методы не должны затачиваться под использование на конвейере, это конвейер должен быть максимально приспособлен к работе с любыми методами.

Я думаю, понятно, что потенциала у такой реализации нет и далее рассматривать мы его не будем. Так как время потраченное на расчеты растет линейно и не решение не использует должным образом все мощности вычислительной техники.

Проверку полученных значений можно осуществить такими методами:

Тест на корректность значений будет одинаков для всех методов из этого раздела. Более того, его можно дополнительно усилить проверкой на порядок значений.

Использование Task

Использование класса Task выглядит гораздо более перспективным и умозрительно поток выполнения поставленной задачи выглядит как на схеме ниже:

Все значения начинают просчитываться одновременно и, учитывая кол-во ядер, завершаются одновременно в идеале. Расчётное время 4 секунды, посмотрим как оно в реальности будет работать.

По сути ничего не изменилось по сравнению с задачей с одним входным значением. Добавился только цикл по значениям. Так как значений мало, то это приемлемый вариант, и получаем быструю обработку данных. Хотя, кончено, нет никакой гарантии насчет того, в каком порядке мы получим результирующие данные. Если это не критично, то результат вполне устраивает.

Тесты на корректность и время не привожу, так как они просты и пока что интереса не представляют.

Мне кажется, что уже на данном этапе становится понятно, что при таком подходе и достаточно больших объемах данных по количеству, число потоков будет сильно разрастаться и можем получить неприятную ситуацию, хотя менеджер тасков достаточно умный, чтобы не загадить всю систему разом.

Можно посмотреть в Concurrency Analyzer как выглядит реальное положение дел. Закладка «Threads» показывает следующее:

Главный поток «отдыхает» после инициализации всех данных. Наши действия тоже указаны красным, так как в методах сервиса мы тормозим поток на необходимое количество времени, но при нажатии на сегменты диаграммы показывается связанный код, по которому мы можем узнать работающую секцию кода. В данном случае, на скриншоте видно, что я выделил сегмент отображающий работу метода StartPhase(). Тут несложно догадаться, что цифра означает элемент массива, а буква кодирует фазу: s – Start, m – Middle, f – Final.

Дальнейшая инспекция показала, что для 2го элемента фаза 2 и 3 перешли к выполнению в новом потоке. Это сложно предсказать, но основной паттерн выполнения кода точно отражает, то как мы его представляли теоретически.

 

Использование Rx

Как я уже сказал, Rx работает в однопоточном режиме, если не указано другое явным образом. И если не задумываясь применить реализацию обработки одного элемента в цикле, то и получим то же самое, что и для простой обработки «в лоб».

В самом деле, реализация не претерпевает кардинальных изменений, только цикл добавился.

Т.е. даже если у кого-то питались скрытые надежды на умность Rx в плане многопоточности, то следующие скриншоты из Concurrency Analyzer должны окончательно убедить в том, что все выполняется последовательно.

Видим, что первый элемент отсылает к методу StartPhase().

Далее, метод MiddlePhase().

Завершается методом FinalPhase() и все начинается сначала.

Естественно, что это не то, что мы хотим и чего добиваемся от Rx. Нам нужна параллельная обработка фаз, а не 16 секунд работы.

Использование Rx с оптимизацией Async

Анализируя работу Rx, хочется сделать, так чтобы после первой фазы, вторая шла в отдельном потоке, управление передавалось обратно в цикл и обрабатывался следующий элемент из массива данных. Если писать код как приходит на ум, то можно использовать класс Task и модные фишки async\await. Кстати, помним о том, что методы сервиса менять мы не хотим кардинально менять и усложнять.

При такой реализации у нас скорее всего получится что-то в духе:

Итак, посмотрим, как можно поизвращаться и нафантазировать реализацию с помощью Rx, async и Task.

Что-то как-то совсем не красиво и мало понятно. В методах Subscribe() можно очень легко запутаться, будь логика еще хоть сколько-нибудь сложнее. Использование Await позволяет вернуть поток управления обратно при выполнении асинхронных задач в методе, а не терять времени в ожидании результата.

В каждом методе создается новый поток и, в зависимости от настроек создания, можно получить совершенно разные картины создания и распределения потоков. В нашем случае каждая задача занимает секунду или две, что достаточно много по сравнению со временем создания возможных потоков. В случае большого количества фаз, обрабатываемых элементов, времени обработки на каждой фазе, такой подход может родить очень много проблем.

Конечно, менеджер тасков умный и он не создаст 12 потоков для данного кода, но все же, все же. Посмотрим как работала программа реально.

В основном потоке были обработаны\получены первые значения, которые затем передавались на обработку в другие потоки не тормозя основной. Хотя в реальной жизни вполне может возникнуть ситуация, когда задача в Task будет выполнятся синхронно. Однако распределение некоторых элементов, например f2, трудно предугадать, и мы не можем управлять количеством создаваемых потоков. Это может быть проблемой в реальных приложениях, так что не думаю, что этим способом стоит пользоваться. Это плохой пример, хотя для малого количества элементов работает очень быстро =)

Однако такой паттерн может в конце концов натолкнуть на то, как мы хотим видеть работу конвейера на основе Rx.

 

Использование Rx с оптимизацией

Честно сказать, такое видение работы конвейера, наверно, должно приходить раньше всего, но прийти к нему можно не сразу.

Я хочу, чтобы у меня было фиксированное и контролируемое количество потоков, которые будут выполнять строго свои задачи, без смешивания ответственностей и прочих казусов выполнения кода, т.е. чтобы было как на картинке ниже:

Ну не красота ли?

На самом деле добиться этого весьма просто, сейчас вы увидите насколько красиво все может быть:

Все четко и красиво. Добиться этого помог метод ObserveOn(), который указывает режим в котором следить за результатами производящего потока. В нашем случае мы используем стандартный менеджер NewThreadScheduler.Default, который говорит о том, что метод указанный в Subscribe() будет выполнятся в отдельном постоянном потоке. Теперь, так же у нас не блокируется цикл обработки начальной фазы, и обработка значений идет последовательно. Количество потоков строго ограничено обрабатывающими структурами.

Посмотрим, реально ли так будет:

 

На скриншоте вы можете разглядеть, что в основном потоке есть четыре кубика по 1 секунде, в потоке 13756 уже 4 кубика по 2 секунды и в последнем потоке что-то много кубиков. Сейчас подробнее расскажу обо всем.

Итак, тут вы видите, что выполняется начальная фаза. Я пометил код жирной вертикальной синей линией.

Далее видим, что в другом потоке идет выполнение средней фазы. Причем основной поток нагенерировал данных раньше, но все равно все они в свое время попали в поток обработки средней фазы. Начало средней фазы идет четко с окончанием генерирования первого значения для обработки.

 

В последнем потоке обрабатывается заключительная фаза сервиса. Так же можно видеть, что начало работы потока совпало с передачей первого значения от средней фазы сервиса. До этого поток просто не существовал.

 

После обработки заключительной фазы сервиса, можно увидеть, что поток приостановил свою работу с помощью SemaphoreSlim до поступления новых значений в поток. Т.е. поток не пересоздается, не умирает.

Что ж, значительно лучше! Все именно так как мы и хотели, и все согласуется с теорией конвейеров. Время обработки так же значительно лучше, чем при линейной обработке.

В целом я могу сказать, что мы добились своего результата, но это только начало, так как в текущем виде непонятно как это прикрутить к реальным вещам. К тому же стоит проанализировать работу наших реализаций на чуть больших массивах данных

 

Продолжение следует.

 

 

Оставить комментарий