Конвейер (Pipeline) — III

Конвейер для больших массивов

Редко кода требуется создать конвейер для небольшого количества данных. Даже не так. Конвейер по сути своей предназначен для неопределенного количества данных, когда неизвестно наперед сколько их будет, но что будет много – известно.

До этого я специально взял количество данных в наборе равное четырем. Так как это еще разумное количество потоков для многопроцессорных машин. Для экспериментов приближенным к реальности можно взять количество элементов равным 60.

По сути код останется тем же, только надо увеличить количество элементов.

Линейную реализацию с перебором всех значений не будем рассматривать уже как совсем не интересную.

При работе с Task, замеры времени показывают среднее значение обработки в 51 секунду. Это означает, что все 60 потоков не создаются одновременно, а выстаиваются в какую-то очередь и все это безобразие занимает 51 секунду (разброс от 50 до 53 секунд), если использовать такой код:

Что совсем не оптимально и мы получаем огромное количество потоков. На скриншоте показана только часть потоков для первой фазы, ровно только же идет для второй и третьей. В общем ужас-ужас.

Сократить их количество и время выполнения можно последовав совету по проектированию в начале статьи, а именно дать указание задачам, что это долгоиграющий процесс, т.е. использовать опцию создания TaskCreationOptions.LongRunning. Тогда время выполнения сократиться примерно на 10 секунд для нашего варианта, что будет заметной прибавкой скорости. Вообще по результатам использования бесконтрольного создания тасков, предсказать время окончания работы нельзя.

Используя Rx подход, можно точно предсказать результат. В нашем случае это будет сумма:

  • Время выполнения первого элемента
  • Количество элементов * время самой долгой операции
  • Время выполнения последнего элемента

Сложив все вместе (1 + 60*2 + 1) получаем верную оценку, которая совпадает с практикой. Поток выполнения точно такой же как для 4х элементов.

Время работы конвейера на Rx можно сократить, если сбалансировать работу потоков, чтобы ни один поток не простаивал. Сейчас же у нас последний поток простаивает половину времени.

Конвейер с балансировкой нагрузки

Да, балансировка работы потоков была бы очень полезна, так как вторая фаза занимает в 2 раза больше времени, чем первая или вторая. Было бы хорошо ввести еще один поток для параллельной обработки данных во второй фазе сервиса. Тогда у нас время работы конвейера может радикально сократиться.

В представлении разделения по потокам и времени диаграмма может быть следующей:

 

Пользуясь знанием времени необходимым для работы каждой фазы сервиса, мы можем посчитать, что нам необходимо 2 параллельных потока для выполнения второй фазы, чтобы расчеты велись по шаблону с диаграммы выше. Ок, таким образом нам надо будет создать два экземпляра ReplayObject для обработки центральной фазы.

Создать дополнительный объект легко, надо еще придумать каким образом делать балансировку. Я думаю, что для балансировки обработки данных в общем случае есть много специальных очень умных алгоритмов, но так как я не в теме высоконагруженных систем, то я даже толком погуглить не могу. Предложу самое простое в визуальной реализации решение.

Сначала я подумал о том, чтобы сложить объекты обрабатывающие вторую фазу в список, хранить индекс записи и как-то итеративно по нему бегать. Но это столько геморроя, надо проверять, в конце ли списка мы и обновлять счетчик… как-то грустно мне стало от этого и я решил полежать на диване и подумать, читая всякую фигню в твиттере. Секретная техника сработала и пришло решение с очередью, когда вынимая элемент из очереди мы назначаем ему задание и отправляем в конец очереди. Не надо никаких проверок вообще: вынул-всунул, вынул-всунул обработчик данных.

Из кода видно, что после работы первой фазы, полученные данные мы передаем балансировщику, который уже распределяет их по потокам. В нашем случае балансировщик лишен обратной связи и какого-либо интеллекта, он просто равномерно выдает задачи потокам, что в целом тоже хорошо на начальных стадиях.

Так же я добавил немного кода, чтобы убедиться, что данные распределяются равномерно.

Но сначала посмотрим, что нам скажет Concurrency Analyzer по поводу балансировки.

Сначала идет обработка первого значения из массива данных первой фазой.

Затем подключается средняя фаза

И по окончании обработки второго набора данных (s2) сразу запускается средняя фаза во втором потоке, т.е. балансировщик сработал как и ожидалось. Все как в теории.

Так как мне лень было писать специальный тест, а показать работу хочется, то прикладываю скриншот итоговых данных. Можно заметить, что фазы чередуются. Таким образом мы сильно сокращаем время работы, и оно может быть оценено как сумма:

  • Время выполнения первого элемента
  • (Количество элементов * время самой долгой операции) / кол-во обработчиков
  • Время выполнения последнего элемента

Что дает нам сокращение времени выполнения в два раза, при больших значениях количества данных.

Возвращаясь к коду, можно сказать, что не совсем понятно, как же его применять в реальной жизни. Это я собираюсь рассказать в следующем разделе

 

Продолжение следует.

 

 

Оставить комментарий