Russian Qt Forum
Апрель 26, 2024, 20:04 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: [1] 2 3 ... 13   Вниз
  Печать  
Автор Тема: К вопросу об организации взаимодействия пула производителей и одного потребителя  (Прочитано 60852 раз)
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2094



Просмотр профиля
« : Сентябрь 07, 2019, 00:32 »

Приветствую, камрады)
Хотел проконсультироваться у экспертов по теме многопоточности.
Обрисую проблему:
Нужно распараллелить один алгоритм (псевдокод)
Код
C++ (Qt)
do {
    // эта функция может быть распараллелена
    doHardComputing();
 
   // А это легковесная функция. Она использует результаты, полученные doHardComputing
   doSomething();
 
} while (condition); // Всё это крутится в цикле пока не сработает условие выхода.
 


Наивный вариант распараллеливания видется таким (псевдокод):
Код
C++ (Qt)
do {
   std::vector<std::thread> threads;
   for (size_t i = 0; i < NThreads; ++i)
       threads.emplace_back(worker);
 
   for (auto & th : threads)
       th.join();
 
   doSomething();
 
} while(condition);
 

Однако такой подход не очень: не хорошо так в цикле создавать и убивать потоки..( (Хотя возможно, я ошибаюсь)

По хорошему нужен пул workerов (производителей) и один потребитель, который реализует логику doSomething().
Т.е. логика такая:
1. Запускаются workerы, каждый worker должен выполнить рассчёты, оповестить потребителя и заснуть.
2. Потребитель просыпается только тогда, когда все производители закончили вычисления.
3. Производитель проснулся, выполнил свои вычисления, разбудил всех производителей и заснул.
И так до тех пор, пока не будет выполнено условие condition.

Накидал на коленке тестовый вариант. Прошу покритиковать или советы как и что улучшить и сделать правильно)

Код
C++ (Qt)
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <algorithm>
 
std::mutex mutex;
 
std::condition_variable cv_worker;
std::condition_variable cv_loop;
 
std::atomic_bool is_done(false);
 
static constexpr int NThreads = 8;
 
void worker(std::vector<bool> & flags, unsigned int id)
{
   while (!is_done)
   {
       // Имитация расчётов..
       std::this_thread::sleep_for(std::chrono::seconds(5));
 
       {
           // Каждый worker имеет свой флажёк, который говорит о том, что он выполнил расчёты и готов спать)
           std::unique_lock<std::mutex> lk(mutex);
           flags[id] = false;
       }
 
       cv_loop.notify_one(); // Пытаемся разбудить потребителя (main_loop)
       // И засыпаем
       std::unique_lock<std::mutex> lk(mutex);
       cv_worker.wait(lk, [&]()
       {
           return flags[id] || is_done;
       });
   }
}
 
void main_loop(std::vector<bool> & flags)
{
   int i = 0;
   auto cond = [](int i) { return i > 10; }; // условие выхода, для наглядности
 
   do
   {
       {   // Спим до тех пор, пока все workerы не выполнят вычисления (все флажки = false)
           std::unique_lock<std::mutex> lk(mutex);
           cv_loop.wait(lk, [&]()
           {
               return !std::any_of(flags.begin(), flags.end(), [](bool x){ return x; });
           });
       }
 
       // Имитация вычислений..
       std::this_thread::sleep_for(std::chrono::seconds(1));
 
       {   // После вычислений заряжаем флажки
           std::unique_lock<std::mutex> lk(mutex);
           std::fill(flags.begin(), flags.end(), true);
           if (cond(i++))
               is_done = true;
       }
       // Будим всех воркеров и засыпаем
       cv_worker.notify_all();
 
   } while (!is_done);
}
 
int main()
{
   std::vector<bool> flags(NThreads, false); // флажки
 
   std::thread mainthread(main_loop, std::ref(flags)); // Это наш потребитель
 
   std::vector<std::thread> threads; // Это наши производители workerы
 
   for (unsigned int i = 0; i < NThreads; ++i)
       threads.emplace_back(worker, std::ref(flags), i);
 
   mainthread.join();
 
   for (auto & th : threads)
       th.join();
 
   return 0;
}
 
 
Коректна ли такая реализация? Или можно улучшить, или я вообще велосипед изобретаю?)
Спасибо Улыбающийся
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Old
Джедай : наставник для всех
*******
Online Online

Сообщений: 4349



Просмотр профиля
« Ответ #1 : Сентябрь 07, 2019, 08:00 »

Так может просто OpenMP? Улыбающийся
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #2 : Сентябрь 07, 2019, 08:20 »

Ну, блин, наворотили Улыбающийся Это хрестоматийный случай, выдумывать ничего не надо, посмотреть пример (напр std::consition_variable) и подогнать под свои нужды, напр
Код
C++ (Qt)
std::thread consumer([&]() {
       while (!abortFlag) {
          std::unique_lock<std::mutex> lock(m);
 
/* каждый из worker'ов, закончив работу, уменьшает атомарный счетчик
workerInProcess и пытается нас (производителя) будить -
но мы игнорируем побудку если хотя бы один worker активен.
Собсно идея в этом */

           while (workerInProcess) {
               if (abortFlag) return;
               cond_var.wait(lock);
            }
 
/*  В этот момент все worker'ы отстрелялись и висят на (захваченном нами) мутексе, перезаряжаемся */
           if (CheckDone()) break;
           SetupWorkersData();   // заряжаем новые данные worker'ов
           workerInProcess = NUM_WORKERS;
           cond_var.notify_all();   // стартуем worker'ов, сами засыпаем
       }
 
/* даем worker'ам выйти */
       abortFlag = true;
       cond_var.notify_all();
 
   });
Теперь worker

Код:
    std::thread worker([&]() {
        while (true) {
 
// Пресекаем "ложные побудки"
          {
            std::unique_lock<std::mutex> lock(m);
            while (!HasData()) {
               if (abortFlag) return;
               cond_var.wait(lock);    // освобождаем мутекс и спим
             }
          }

// считаем
           DoWorkerJob();
        
// уменьшаем счетчик и будим всех
           --workerInProcess;
           cond_var.notify_all();
    });
Обычная ошибка - считать/полагать что wake_one/all "кого-то будит". В действительности это эффект лишь для ожидающих. И также побудка "строго одного" не гарантируется (на уровне ОС). Поэтому каждая нитка должна проверяться на ложную побудку.
« Последнее редактирование: Сентябрь 07, 2019, 09:34 от Igors » Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2094



Просмотр профиля
« Ответ #3 : Сентябрь 07, 2019, 11:07 »

Так может просто OpenMP? Улыбающийся
Возможно.. Нужно глянуть на него по внимательней Улыбающийся
А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?

Цитировать
Это хрестоматийный случай, выдумывать ничего не надо, посмотреть пример (напр std::consition_variable) и подогнать под свои нужды, напр
Этот хрестоматийный случай будет работать не корректно (Поправте, если я ошибаюсь):
Просыпаются воркеры: пусть первый из них оказался самым шустрым, выполнил DoWorkerJob(), декрементировал  счётчик --workerInProcess и ушёл на второй круг (HasData() = false)
Затем остальные воркеры доходят до --workerInProcess, будят консумера, тот видит  workerInProcess == 0 и просыпается  Плачущий
А тем временем у нас ещё считает тот первый (уже не шустрый) воркер  Плачущий
 
Цитировать
Обычная ошибка - считать/полагать что wake_one/all "кого-то будит". В действительности это эффект лишь для ожидающих. И также побудка "строго одного" не гарантируется (на уровне ОС). Поэтому каждая нитка должна проверяться на ложную побудку.
Да, поэтому я и передаю предикат в метод wait, который и спасает от ложных пробудок Улыбающийся
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Авварон
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 3258


Просмотр профиля
« Ответ #4 : Сентябрь 07, 2019, 11:32 »

Ну это же типичный шаг map из map&reduce, в QtConcurrent реализован как раз на тред пуле
Записан
Old
Джедай : наставник для всех
*******
Online Online

Сообщений: 4349



Просмотр профиля
« Ответ #5 : Сентябрь 07, 2019, 11:44 »

А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?
Нет, на старте будет создан пул потоков, который и будут все разгребать на горячих потоках.
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2094



Просмотр профиля
« Ответ #6 : Сентябрь 07, 2019, 12:20 »

Ну это же типичный шаг map из map&reduce, в QtConcurrent реализован как раз на тред пуле
Не знал про QtConcurrentMap, хотя я Qt особо не пользуюсь..
Но спасибо, покурю в сторону их подхода) 

Цитировать
Нет, на старте будет создан пул потоков, который и будут все разгребать на горячих потоках.
Понятненько) Надо будет тогда его тоже поизучать)
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2094



Просмотр профиля
« Ответ #7 : Сентябрь 07, 2019, 13:59 »

А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?
Нет, на старте будет создан пул потоков, который и будут все разгребать на горячих потоках.

Я правильно понимаю, что с OpenMP это будет выглядеть примерно так?:
Код
C++ (Qt)
#pragma omp parallel
{
   do {
         #pragma omp task
         {
              worker();
         }
         #pragma omp taskwait
 
         doSomething();
 
   } while (condition);
}
 
Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #8 : Сентябрь 07, 2019, 14:00 »

Этот хрестоматийный случай будет работать не корректно (Поправте, если я ошибаюсь):
Просыпаются воркеры: пусть первый из них оказался самым шустрым, выполнил DoWorkerJob(), декрементировал  счётчик --workerInProcess и ушёл на второй круг (HasData() = false)
HasData - это тот самый флажок. Все равно Вы будете подавать в нитку какие-то данные для расчетов, ну и флажок там же, выделять их в отдельный контейнер незачем. Конечно после выполнения порции расчетов worker должен свой флажок зачистить.

Код:
void worker(std::vector<bool> & flags, unsigned int id)
{
    while (!is_done)
    {
        // Имитация расчётов..
        std::this_thread::sleep_for(std::chrono::seconds(5));
Не ошибка, но лучше все-таки не полагаться на то что, мол, "данные есть" а проверить (это у Вас в конце цикла).
Код
C++ (Qt)
       {
           // Каждый worker имеет свой флажёк, который говорит о том, что он выполнил расчёты и готов спать)
           std::unique_lock<std::mutex> lk(mutex);
           flags[id] = false;
       }
 
Не вижу зачем тут брать мутекс

Не знал про QtConcurrentMap, хотя я Qt особо не пользуюсь..
Но спасибо, покурю в сторону их подхода)  
Если цель = минимум усилий, то прощe всего QThreadPool, там только пристроить run(), потом накидал и waitForDone();

А не будет ли вызов OpenMP эквивыалентен первому "наивному" варианту реализации на std::thread?
Ну создавать и убивать нитки - это вообще дичь, пул создается один раз или по запросу. Даже усыплять нитки - дорого, поэтому в OpenMP после окончания порции расчетов нитки остаются активными в течение некоторого "времени прокрутки" которое значительно (насколько помню 0.3  СЕКУНДЫ)

А вообще OpenMP - прекрасная вещь (особенно Intel реализация), единственное возражение - увы, это еще одна зависимость
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #9 : Сентябрь 07, 2019, 14:04 »

Я правильно понимаю, что с OpenMP это будет выглядеть примерно так?:
Зачем? Вам же известно кол-во задач и исходные данные для каждой из них. Просто так
Код
C++ (Qt)
#pragma omp parallel for
for (int i = 0; i < data.size(); ++i)
DoCalc(data[i]);
Записан
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2094



Просмотр профиля
« Ответ #10 : Сентябрь 07, 2019, 14:19 »

Цитировать
HasData - это тот самый флажок. Все равно Вы будете подавать в нитку какие-то данные для расчетов, ну и флажок там же, выделять их в отдельный контейнер незачем. Конечно после выполнения порции расчетов worker должен свой флажок зачистить.
Именно вот поэтому у каждого воркера должен быть свой индивидуальный флажёк.
Или я не понимаю чего то?

Цитировать
Не вижу зачем тут брать мутекс
Потребитель (main_loop) может случайно проснуться, вызвать предикат и тогда появится возможность одновременного доступа к масссиву flags из предиката и из воркера, который в этот момент устанавливает флажёк.

Я правильно понимаю, что с OpenMP это будет выглядеть примерно так?:
Зачем? Вам же известно кол-во задач и исходные данные для каждой из них. Просто так
Код
C++ (Qt)
#pragma omp parallel for
for (int i = 0; i < data.size(); ++i)
DoCalc(data[i]);
 
Так мне этот цикл нужно повторять до возникновения условия завершения.
Здесь получается, что они постоянно будут создаваться и уничтожаться?  
« Последнее редактирование: Сентябрь 07, 2019, 14:23 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
ViTech
Гипер активный житель
*****
Offline Offline

Сообщений: 858



Просмотр профиля
« Ответ #11 : Сентябрь 07, 2019, 14:39 »

Код
C++ (Qt)
void worker(std::vector<bool> & flags, unsigned int id)
{...}
 

Воркеру обязательно знать про массив флажков, свой id, глобальный mutex? Может лучше ему передать функтор, который сформирует вызывающая сторона, а воркер его просто вызовет, когда закончит свою работу?
Записан

Пока сам не сделаешь...
m_ax
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 2094



Просмотр профиля
« Ответ #12 : Сентябрь 07, 2019, 14:48 »

Цитировать
Воркеру обязательно знать про массив флажков, свой id, глобальный mutex?
Ну это просто пример на коленке, который илюстрирует логику/концепт Улыбающийся
Воркеру можно передавать ссылку на его уникальный флажёк, например..
Код
C++ (Qt)
void worker(bool & flag)
 

бес всякого id..

А мьютекс должен быть общим для всех воркеров и для потребителя, разумеется..

Цитировать
Может лучше ему передать функтор, который сформирует вызывающая сторона, а воркер его просто вызовет, когда закончит свою работу?

Не понял, можно по подробней?

Т.е. в конечном счёте я хочу некий класс, назовём его concurrent_loop, который используется примерно так:
Код
C++ (Qt)
bool loop_function() { ... }
 
void do_work() { ... }
 
 
concurrent_loop cloop(num_threads);
 
cloop.start(loop_function, do_work);
 
cloop.join();
 
 
 
Что, фактически, эквивалентно
Код
C++ (Qt)
do {
   std::vector<std::thread> threads;
   for (unsigned int i = 0; i < num_threads; ++i)
       threads.emplace_back(do_work);
 
   for (auto & th : threads)
       th.join();
 
} while (loop_function());
 
за исключением того, что не будет на каждый чих создаваться и уничтожаться пул потоков.
« Последнее редактирование: Сентябрь 07, 2019, 15:11 от m_ax » Записан

Над водой луна двурога. Сяду выпью за Ван Гога. Хорошо, что кот не пьет, Он и так меня поймет..

Arch Linux Plasma 5
ViTech
Гипер активный житель
*****
Offline Offline

Сообщений: 858



Просмотр профиля
« Ответ #13 : Сентябрь 07, 2019, 16:12 »

Т.е. в конечном счёте я хочу некий класс, назовём его concurrent_loop, который используется примерно так:
...

Как уже говорили, лучше сначала существующие решения и подходы рассмотреть: map-reduce, OpenMP, в C++17 вроде есть параллелизация в алгоритмах, std::packaged_task может на что сгодится, и т.д.

Цитировать
Может лучше ему передать функтор, который сформирует вызывающая сторона, а воркер его просто вызовет, когда закончит свою работу?

Не понял, можно по подробней?

Схематично:
Код
C++ (Qt)
template <class Done>
void worker(Done done)
{
   // hard computing...
   done();
}
 
void caller()
{
   const std::size_t N = 4;
   std::vector<bool> flags(N, false);
 
   for (int i = 0; i < N; ++i)
       worker([&flags, i]() { flags[i] = true; });
}
 

Т.е. в функтор перенести флажки, мьютексы и уведомление о завершении работы. У вызывающей стороны всё это есть и она лучше знает, как это организовать. Тогда интерфейс воркера будет чище и без ненужных  зависимостей.
Записан

Пока сам не сделаешь...
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #14 : Сентябрь 07, 2019, 17:14 »

Потребитель (main_loop) может случайно проснуться, вызвать предикат и тогда появится возможность одновременного доступа к масссиву flags из предиката и из воркера, который в этот момент устанавливает флажёк.
На здоровье, если запись/чтение флажка атомарны, то все норм.

Так мне этот цикл нужно повторять до возникновения условия завершения.
Здесь получается, что они постоянно будут создаваться и уничтожаться?  
Да, цикл придется повторять, это нормально если, как Вы сказали в стартовом посте, все нитки должны закончить все расчеты а потом главная решает. При использовании OpenMP нитки пере-создаваться не будут, они не будут даже засыпать если циклов несколько

Именно вот поэтому у каждого воркера должен быть свой индивидуальный флажёк.
Или я не понимаю чего то?
Грубо говоря, к чему сводится multi-threading: дать каждой нитке "свои" данные что не пересекаются с другими по чтению и записи. Если это удается обеспечить (а часто удается), то все сводится к одной директиве OpenMP. Как только от тестового sleep Вы перейдете к реальной задаче - такие "индивидуальные данные нитки" у Вас неизбежно появятся, каждая должна знать исходные данные (что считать) и куда сливать результаты

Неясно сколько ниток Вы собираетесь создавать. Тупо "по числу задач" так-сяк работает до определенного предела. Напр если ядер 4, а задач (data.size()) 100, то overhead будет не слабый. Также есть такая противная "гранулярность", т.е. ну вот попался один длинный расчет, все давно отстрелялись, а одна нитка все с ним мучается, и приходится ее ждать. При использовании низкоуровневых примитивов синхронизации (как у Вас) эти проблемы не решаются автоматычно/легко
Записан
Страниц: [1] 2 3 ... 13   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.309 секунд. Запросов: 22.