Russian Qt Forum
Май 17, 2024, 03:50 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: [1] 2 3 4   Вниз
  Печать  
Автор Тема: Нитки и очередь  (Прочитано 26326 раз)
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« : Октябрь 20, 2009, 19:38 »

Добрый вечер

Есть реальный интерес заказчика использовать все процессоры в одной из задач. Обдумываю как это сделать. Вычислений много но как разделить их на потоки - не вижу, как минимум очень сложно да и с синхронизацией потом будет очень много забот. Вот (очень упрощенно) что делает задача:

- нужно рассчитать (некоторые значения) в какой-то точке "экрана". Сначала "прощупываются" данные в соседних точках и в соседних узловых точках. Если все Ok, точка не рассчитывается а просто интерполируется используя готовые данные.

- однако если точка узловая или (по каким-то причинам) интерполяция не годится - нужно честно/полностью считать такую точку. Для этого производится 200 (минимум, по дефаулту) пробных вычислений и затем результат осредняется. Каждое пробное вычисление совсем не дешево само по себе.

Задумка такая: "главная" нитка пихает все исходные данные (для каждого пробного расчета) в очередь/стек. На старте (один раз) создаются N "рабочих" ниток которые занимаются тем что находится в очереди.  Пседокод:

Код:
// main thread pushes a new job

void PushJob( void * jobData )
{
  pthread_mutex_lock(&theJobSemaphore);
  theJobs.push_back(Job(jobData));
  pthread_mutex_unlock(&theJobSemaphore);
}

Код:
// one of work threads pops next job

void PopJob( void * jobData )
{
  Job theJob;
  pthread_mutex_lock(&theJobSemaphore);     // lock jobs to extract the last one
  bool hasJob = theJobs.size() > 0;
  if (hasJob) {
    theJob = theJobs.last();  
    theJobs.pop_back();
   }
  pthread_mutex_unlock(&theJobSemaphore);  // unlock jobs for other threads
  if (hasJob)  DoProcessJob(&theJob);              // process the job
}

И все бы ничего но как этот процесс закончить? В конце концов у "главной" нитки задачи кончатся. Надо подождать пока все "рабочие" нитки отработают и очередь пуста (эти 2 условия не одно и то же). Обычно этой проблемы не возникает: pthread_join, нитка сама заканчивается, для следующего расчета создают новую. Но здесь мне это делать не резон

Мысли, соображения, идеи (на худой конец эрудиция Улыбающийся) ??
  
Записан
Rcus
Гость
« Ответ #1 : Октябрь 20, 2009, 19:59 »

QThreadPool/QRunnable | QtConcurrent::map | KDE ThreadWeaver | Boost.Future и т.д. . Но давать конкретные советы сложно без знания предметной области.
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #2 : Октябрь 20, 2009, 20:05 »

QThreadPool/QRunnable | QtConcurrent::map | KDE ThreadWeaver | Boost.Future и т.д. . Но давать конкретные советы сложно без знания предметной области.
В этой задаче я не могу использовать никаких сторонних библиотек - все должно быть сделано руками для OSX/Вындоуз - не от меня зависит. Буду рад ответить на все вопросы в предметной области.
Записан
lit-uriy
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 3880


Просмотр профиля WWW
« Ответ #3 : Октябрь 20, 2009, 20:07 »

а разве ОСька сама не распределяет нагрузку по процам?
Записан

Юра.
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #4 : Октябрь 20, 2009, 20:13 »

а разве ОСька сама не распределяет нагрузку по процам?
Распределяет (и очень эффективно) но только при наличии бегущих ниток. Вот я и спрашиваю как мне их "закруглить"  Улыбающийся
Записан
BRE
Гость
« Ответ #5 : Октябрь 20, 2009, 20:59 »

Псевдокод:
Код
C++ (Qt)
viod thread()
{
for(;;)
{
mutexActive.lock(); // [1]
if( !doProcess )
break;
 
if( dataAvailable )
{
// Получили данные из очереди
// Вычисления ...
// Положили результат в очередь результатов
}
 
mutexActive.lock();
}
}
 

Для каждой нити задан свой мьютекс mutexActive. Перед запуском нити этот мьютекс лочится. Т.е. после старта нитка останавливается в точке [1] и ждет когда этот мьютекс разлочиться не кушая процессорного времени.
Когда появляются данные для обработки, они помещаются в очередь и мьютекс для свободной нитки разблокируется. Нитка достает данные из очереди и начинает считать. После расчета она сама себя блокирует и останавливается в точке [1].
Для остановки нитки, нужно установить ее флажок doProcess в false и разблокировать ее мьютекс активности. Нитка выйдет из цикла. В главном потоке ожидаем завершения join.

Наверное лучше для оживления потока использовать pthread_cond_signal/pthread_cond_wait, но идея должна быть понятна.
« Последнее редактирование: Октябрь 20, 2009, 21:34 от BRE » Записан
umts
Гость
« Ответ #6 : Октябрь 20, 2009, 21:43 »

Это весьма сложная задача (распределение нагрузки по процессорам, ядрам и т.д. и т.п.) для решения "руками". Тут можно посоветовать изучить OMP либо (идеальный вариант) Intel Threading Building Blocks, возможности TBB просто великолепны.
Записан
lit-uriy
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 3880


Просмотр профиля WWW
« Ответ #7 : Октябрь 20, 2009, 22:07 »

>>при наличии бегущих ниток
а это что такое?
Записан

Юра.
Steven_Orko
Гость
« Ответ #8 : Октябрь 21, 2009, 08:23 »

Это весьма сложная задача (распределение нагрузки по процессорам, ядрам и т.д. и т.п.) для решения "руками". Тут можно посоветовать изучить OMP либо (идеальный вариант) Intel Threading Building Blocks, возможности TBB просто великолепны.
Ничего здесь сложно нет. ОС сама распределяет потоки по разным процессорам.


Как раньше написали, тебе надо всего лишь реализовать определенный шаблон проектирования.

Создаешь очередь заданий. Это не стек, это именно очередь, т.к., что первое положил, то первое и выполняется.
Создаешь пул потоков. Каждый из них ожидает события заполнения очереди. Делается с помощью мютексов и сигналов (не Qt).
Как только в очередь заданий попадает хотя бы одно задание, посылаешь сигнал для разблокирования одного потока. Тот извлекает задание, и начинает его выполнять. И так далее.
Для получения результатов создаешь очередь результатов. Каждый поток, как только заканчивает вычисление, блокирует монопольно (с помощью мютекса) очередь результатов, сохраняет туда данные, и переходит в ожидание очередного задания, если очередь пуста, либо извлекает задание из очереди и выполняет его.
Если же очередь пуста, то пусть каждый поток проверяет некоторый флаг, о том, что заданий больше не будет.
При установке флага просто заверши поток системным вызовом, соответственно, не забыв разблокировать очередь.

Насчет главного потока. Я так понимаю, что результатов будет ровно столько же, сколько результатов. Здесь можно ждать совпадения кол-ва, но проще будет ожидание завершения всех потоков, т.к. если потоки завершились, то значит задания все решены, можно пожинать результаты. Здесь уже можно извлекать данные из очереди результатов и считать все, что хочешь.
Если процессоров достаточно много, то это действие можно выполнять также в отдельном потоке. Но это уже довольно сильное осложнение алгоритма.


 Я так понимаю, что результатов должно быть именно столько же, сколько заданий.

Записан
SABROG
Гость
« Ответ #9 : Октябрь 21, 2009, 09:18 »

Псевдокод:
Код
C++ (Qt)
viod thread()
{
forever {
mutexActive.lock();
...
mutexActive.lock();
 

2 lock'a и ни одного unlock'a, это правильно?
« Последнее редактирование: Октябрь 21, 2009, 09:20 от SABROG » Записан
BRE
Гость
« Ответ #10 : Октябрь 21, 2009, 09:20 »

2 lock'a и ни одного unlock'a, это правильно?
Для псевдокода, можно сказать, правильно.  Улыбающийся
unlock делается в "главной" нитке, для оживления рабочей.
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #11 : Октябрь 21, 2009, 11:04 »

Псевдокод:
Код
C++ (Qt)
viod thread()
{
for(;;)
{
mutexActive.lock(); // [1]
if( !doProcess )
break;
 
if( dataAvailable )
{
// Получили данные из очереди
// Вычисления ...
// Положили результат в очередь результатов
}
 
mutexActive.lock();
}
}
 

Для каждой нити задан свой мьютекс mutexActive. Перед запуском нити этот мьютекс лочится. Т.е. после старта нитка останавливается в точке [1] и ждет когда этот мьютекс разлочиться не кушая процессорного времени.
Когда появляются данные для обработки, они помещаются в очередь и мьютекс для свободной нитки разблокируется. Нитка достает данные из очереди и начинает считать. После расчета она сама себя блокирует и останавливается в точке [1].
Для остановки нитки, нужно установить ее флажок doProcess в false и разблокировать ее мьютекс активности. Нитка выйдет из цикла. В главном потоке ожидаем завершения join.

Наверное лучше для оживления потока использовать pthread_cond_signal/pthread_cond_wait, но идея должна быть понятна.

Здесь непонятки

1) Мне совсем не хочется для каждой нитки заводить свой мутекс и потом устраивать в главной нитке долгие и мутные разборки типа "а какая же нитка свободна"? Ладно, пусть у меня всего 1 "рабочая" нитка + главная, итого 2. Но и в этом случае:

2) Непонятно зачем нужен второй (нижний) lock. Если for (;; ) то управление опять приходит на верхний. А так получается что главной нитке надо дважды делать unlock - неясно как

3) // Получили данные из очереди
Так просто их не получить. Только 1 нитка (в один момент времени) может класть в очередь или извлекать из нее. Эти операции должны быть внутри lock/unlock. Если мутекс заводится для рабочей нитки - это никак не блокирует главную. Если (предположим) этот мутекс общий - опять нехорошо, расчеты не должны быть locked
« Последнее редактирование: Октябрь 21, 2009, 11:08 от Igors » Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #12 : Октябрь 21, 2009, 11:15 »

>>при наличии бегущих ниток
а это что такое?
Например, есть всего 2 процессора. Логично использовать 2 нитки. Но мне плохо не станет если я запущу, например, 4 - особенного/заметного снижения скорости я не увижу. То есть, да, OS распределяет вычисления эффективно. Но в любом случае это моя забота чтобы нитки нормально бегали и выполняли полезную работу каждая.
Записан
BRE
Гость
« Ответ #13 : Октябрь 21, 2009, 11:30 »

1) Мне совсем не хочется для каждой нитки заводить свой мутекс и потом устраивать в главной нитке долгие и мутные разборки типа "а какая же нитка свободна"? Ладно, пусть у меня всего 1 "рабочая" нитка + главная, итого 2.
А какие в этом проблемы:
Код
C++ (Qt)
typedef struct
{
int do_process; // флаг завершения нити
pthread_mutex_t mutexActive; // мутекс активации
// Другие внутренние данные для нитки (если нужно)
} ThreadData;
 
Для каждой нитки своя структура

Код
C++ (Qt)
void *thread_func( void *data )
{
assert( data != NULL );
ThreadData *d = (ThreadData*)data;
 
for(;;)
{
pthread_mutx_lock( d->mutexActive );
...
}
return NULL;
}
 

2) Непонятно зачем нужен второй (нижний) lock. Если for (;Подмигивающий то управление опять приходит на верхний. А так получается что главной нитке надо дважды делать unlock - неясно как
Это был ПСЕВДОКОД, я хотел показать что после выполнения нитка должна заблокироваться на mutexActive.
Если в реальном коде использовать блокировку с мьютексом, то второй lock не нужен.

3) // Получили данные из очереди
Так просто их не получить. Только 1 нитка (в один момент времени) может класть в очередь или извлекать из нее. Эти операции должны быть внутри lock/unlock. Если мутекс заводится для рабочей нитки - это никак не блокирует главную. Если (предположим) этот мутекс общий - опять нехорошо, расчеты не должны быть locked
Сделай еще один мьютекс для доступа к очереди.
Главная нить lock(), кладет данные в очередь, unlock().
Рабочая нить lock(), выдащили данные из очереди, unlock, начали считать.
Этот мьютекс блокирует только вставку/извлечение данных из очереди.
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #14 : Октябрь 21, 2009, 11:33 »

Ничего здесь сложно нет. ОС сама распределяет потоки по разным процессорам.

Как раньше написали, тебе надо всего лишь реализовать определенный шаблон проектирования.

Создаешь очередь заданий. Это не стек, это именно очередь, т.к., что первое положил, то первое и выполняется.
Создаешь пул потоков. Каждый из них ожидает события заполнения очереди. Делается с помощью мютексов и сигналов (не Qt).
Как только в очередь заданий попадает хотя бы одно задание, посылаешь сигнал для разблокирования одного потока. Тот извлекает задание, и начинает его выполнять. И так далее.
Для получения результатов создаешь очередь результатов. Каждый поток, как только заканчивает вычисление, блокирует монопольно (с помощью мютекса) очередь результатов, сохраняет туда данные, и переходит в ожидание очередного задания, если очередь пуста, либо извлекает задание из очереди и выполняет его.
Если же очередь пуста, то пусть каждый поток проверяет некоторый флаг, о том, что заданий больше не будет.
При установке флага просто заверши поток системным вызовом, соответственно, не забыв разблокировать очередь.

Насчет главного потока. Я так понимаю, что результатов будет ровно столько же, сколько результатов. Здесь можно ждать совпадения кол-ва, но проще будет ожидание завершения всех потоков, т.к. если потоки завершились, то значит задания все решены, можно пожинать результаты. Здесь уже можно извлекать данные из очереди результатов и считать все, что хочешь.
Если процессоров достаточно много, то это действие можно выполнять также в отдельном потоке. Но это уже довольно сильное осложнение алгоритма.


 Я так понимаю, что результатов должно быть именно столько же, сколько заданий.
С очередью результатов проблем нет, просто у каждого задания есть указатель куда помещать результат. В этой конкретной задаче неважно очередь или стек, порядок вычислений роли не играет.

Все получается действительно просто (и не раз уже делано) с завершением всех рабочих ниток. Главной нитке надо выставить флаг "заданий больше не будет", сделать unlock и ждать пока все рабочие нитки отстреляются (join). Но по задаче это не очень подходит потому что создание/убиение ниток будет происходить очень часто. Также мне выгодно иметь очередь "всегда под рукой" - разные задания могут быть в очереди.

Поэтому я и хочу: после того как все задания выполнены все рабочие нитки ждут на семафоре (дальше главная нитка скажет что делать).
Записан
Страниц: [1] 2 3 4   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.168 секунд. Запросов: 22.