Russian Qt Forum

Qt => Общие вопросы => Тема начата: Igors от Октябрь 20, 2009, 19:38



Название: Нитки и очередь
Отправлено: Igors от Октябрь 20, 2009, 19:38
Добрый вечер

Есть реальный интерес заказчика использовать все процессоры в одной из задач. Обдумываю как это сделать. Вычислений много но как разделить их на потоки - не вижу, как минимум очень сложно да и с синхронизацией потом будет очень много забот. Вот (очень упрощенно) что делает задача:

- нужно рассчитать (некоторые значения) в какой-то точке "экрана". Сначала "прощупываются" данные в соседних точках и в соседних узловых точках. Если все Ok, точка не рассчитывается а просто интерполируется используя готовые данные.

- однако если точка узловая или (по каким-то причинам) интерполяция не годится - нужно честно/полностью считать такую точку. Для этого производится 200 (минимум, по дефаулту) пробных вычислений и затем результат осредняется. Каждое пробное вычисление совсем не дешево само по себе.

Задумка такая: "главная" нитка пихает все исходные данные (для каждого пробного расчета) в очередь/стек. На старте (один раз) создаются N "рабочих" ниток которые занимаются тем что находится в очереди.  Пседокод:

Код:
// main thread pushes a new job

void PushJob( void * jobData )
{
  pthread_mutex_lock(&theJobSemaphore);
  theJobs.push_back(Job(jobData));
  pthread_mutex_unlock(&theJobSemaphore);
}

Код:
// one of work threads pops next job

void PopJob( void * jobData )
{
  Job theJob;
  pthread_mutex_lock(&theJobSemaphore);     // lock jobs to extract the last one
  bool hasJob = theJobs.size() > 0;
  if (hasJob) {
    theJob = theJobs.last();  
    theJobs.pop_back();
   }
  pthread_mutex_unlock(&theJobSemaphore);  // unlock jobs for other threads
  if (hasJob)  DoProcessJob(&theJob);              // process the job
}

И все бы ничего но как этот процесс закончить? В конце концов у "главной" нитки задачи кончатся. Надо подождать пока все "рабочие" нитки отработают и очередь пуста (эти 2 условия не одно и то же). Обычно этой проблемы не возникает: pthread_join, нитка сама заканчивается, для следующего расчета создают новую. Но здесь мне это делать не резон

Мысли, соображения, идеи (на худой конец эрудиция :)) ??
  


Название: Re: Нитки и очередь
Отправлено: Rcus от Октябрь 20, 2009, 19:59
QThreadPool/QRunnable | QtConcurrent::map | KDE ThreadWeaver | Boost.Future и т.д. . Но давать конкретные советы сложно без знания предметной области.


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 20, 2009, 20:05
QThreadPool/QRunnable | QtConcurrent::map | KDE ThreadWeaver | Boost.Future и т.д. . Но давать конкретные советы сложно без знания предметной области.
В этой задаче я не могу использовать никаких сторонних библиотек - все должно быть сделано руками для OSX/Вындоуз - не от меня зависит. Буду рад ответить на все вопросы в предметной области.


Название: Re: Нитки и очередь
Отправлено: lit-uriy от Октябрь 20, 2009, 20:07
а разве ОСька сама не распределяет нагрузку по процам?


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 20, 2009, 20:13
а разве ОСька сама не распределяет нагрузку по процам?
Распределяет (и очень эффективно) но только при наличии бегущих ниток. Вот я и спрашиваю как мне их "закруглить"  :)


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 20, 2009, 20:59
Псевдокод:
Код
C++ (Qt)
viod thread()
{
for(;;)
{
mutexActive.lock(); // [1]
if( !doProcess )
break;
 
if( dataAvailable )
{
// Получили данные из очереди
// Вычисления ...
// Положили результат в очередь результатов
}
 
mutexActive.lock();
}
}
 

Для каждой нити задан свой мьютекс mutexActive. Перед запуском нити этот мьютекс лочится. Т.е. после старта нитка останавливается в точке [1] и ждет когда этот мьютекс разлочиться не кушая процессорного времени.
Когда появляются данные для обработки, они помещаются в очередь и мьютекс для свободной нитки разблокируется. Нитка достает данные из очереди и начинает считать. После расчета она сама себя блокирует и останавливается в точке [1].
Для остановки нитки, нужно установить ее флажок doProcess в false и разблокировать ее мьютекс активности. Нитка выйдет из цикла. В главном потоке ожидаем завершения join.

Наверное лучше для оживления потока использовать pthread_cond_signal/pthread_cond_wait, но идея должна быть понятна.


Название: Re: Нитки и очередь
Отправлено: umts от Октябрь 20, 2009, 21:43
Это весьма сложная задача (распределение нагрузки по процессорам, ядрам и т.д. и т.п.) для решения "руками". Тут можно посоветовать изучить OMP либо (идеальный вариант) Intel Threading Building Blocks, возможности TBB просто великолепны.


Название: Re: Нитки и очередь
Отправлено: lit-uriy от Октябрь 20, 2009, 22:07
>>при наличии бегущих ниток
а это что такое?


Название: Re: Нитки и очередь
Отправлено: Steven_Orko от Октябрь 21, 2009, 08:23
Это весьма сложная задача (распределение нагрузки по процессорам, ядрам и т.д. и т.п.) для решения "руками". Тут можно посоветовать изучить OMP либо (идеальный вариант) Intel Threading Building Blocks, возможности TBB просто великолепны.
Ничего здесь сложно нет. ОС сама распределяет потоки по разным процессорам.


Как раньше написали, тебе надо всего лишь реализовать определенный шаблон проектирования.

Создаешь очередь заданий. Это не стек, это именно очередь, т.к., что первое положил, то первое и выполняется.
Создаешь пул потоков. Каждый из них ожидает события заполнения очереди. Делается с помощью мютексов и сигналов (не Qt).
Как только в очередь заданий попадает хотя бы одно задание, посылаешь сигнал для разблокирования одного потока. Тот извлекает задание, и начинает его выполнять. И так далее.
Для получения результатов создаешь очередь результатов. Каждый поток, как только заканчивает вычисление, блокирует монопольно (с помощью мютекса) очередь результатов, сохраняет туда данные, и переходит в ожидание очередного задания, если очередь пуста, либо извлекает задание из очереди и выполняет его.
Если же очередь пуста, то пусть каждый поток проверяет некоторый флаг, о том, что заданий больше не будет.
При установке флага просто заверши поток системным вызовом, соответственно, не забыв разблокировать очередь.

Насчет главного потока. Я так понимаю, что результатов будет ровно столько же, сколько результатов. Здесь можно ждать совпадения кол-ва, но проще будет ожидание завершения всех потоков, т.к. если потоки завершились, то значит задания все решены, можно пожинать результаты. Здесь уже можно извлекать данные из очереди результатов и считать все, что хочешь.
Если процессоров достаточно много, то это действие можно выполнять также в отдельном потоке. Но это уже довольно сильное осложнение алгоритма.


 Я так понимаю, что результатов должно быть именно столько же, сколько заданий.



Название: Re: Нитки и очередь
Отправлено: SABROG от Октябрь 21, 2009, 09:18
Псевдокод:
Код
C++ (Qt)
viod thread()
{
forever {
mutexActive.lock();
...
mutexActive.lock();
 

2 lock'a и ни одного unlock'a, это правильно?


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 09:20
2 lock'a и ни одного unlock'a, это правильно?
Для псевдокода, можно сказать, правильно.  :)
unlock делается в "главной" нитке, для оживления рабочей.


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 11:04
Псевдокод:
Код
C++ (Qt)
viod thread()
{
for(;;)
{
mutexActive.lock(); // [1]
if( !doProcess )
break;
 
if( dataAvailable )
{
// Получили данные из очереди
// Вычисления ...
// Положили результат в очередь результатов
}
 
mutexActive.lock();
}
}
 

Для каждой нити задан свой мьютекс mutexActive. Перед запуском нити этот мьютекс лочится. Т.е. после старта нитка останавливается в точке [1] и ждет когда этот мьютекс разлочиться не кушая процессорного времени.
Когда появляются данные для обработки, они помещаются в очередь и мьютекс для свободной нитки разблокируется. Нитка достает данные из очереди и начинает считать. После расчета она сама себя блокирует и останавливается в точке [1].
Для остановки нитки, нужно установить ее флажок doProcess в false и разблокировать ее мьютекс активности. Нитка выйдет из цикла. В главном потоке ожидаем завершения join.

Наверное лучше для оживления потока использовать pthread_cond_signal/pthread_cond_wait, но идея должна быть понятна.

Здесь непонятки

1) Мне совсем не хочется для каждой нитки заводить свой мутекс и потом устраивать в главной нитке долгие и мутные разборки типа "а какая же нитка свободна"? Ладно, пусть у меня всего 1 "рабочая" нитка + главная, итого 2. Но и в этом случае:

2) Непонятно зачем нужен второй (нижний) lock. Если for (;; ) то управление опять приходит на верхний. А так получается что главной нитке надо дважды делать unlock - неясно как

3) // Получили данные из очереди
Так просто их не получить. Только 1 нитка (в один момент времени) может класть в очередь или извлекать из нее. Эти операции должны быть внутри lock/unlock. Если мутекс заводится для рабочей нитки - это никак не блокирует главную. Если (предположим) этот мутекс общий - опять нехорошо, расчеты не должны быть locked


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 11:15
>>при наличии бегущих ниток
а это что такое?
Например, есть всего 2 процессора. Логично использовать 2 нитки. Но мне плохо не станет если я запущу, например, 4 - особенного/заметного снижения скорости я не увижу. То есть, да, OS распределяет вычисления эффективно. Но в любом случае это моя забота чтобы нитки нормально бегали и выполняли полезную работу каждая.


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 11:30
1) Мне совсем не хочется для каждой нитки заводить свой мутекс и потом устраивать в главной нитке долгие и мутные разборки типа "а какая же нитка свободна"? Ладно, пусть у меня всего 1 "рабочая" нитка + главная, итого 2.
А какие в этом проблемы:
Код
C++ (Qt)
typedef struct
{
int do_process; // флаг завершения нити
pthread_mutex_t mutexActive; // мутекс активации
// Другие внутренние данные для нитки (если нужно)
} ThreadData;
 
Для каждой нитки своя структура

Код
C++ (Qt)
void *thread_func( void *data )
{
assert( data != NULL );
ThreadData *d = (ThreadData*)data;
 
for(;;)
{
pthread_mutx_lock( d->mutexActive );
...
}
return NULL;
}
 

2) Непонятно зачем нужен второй (нижний) lock. Если for (;;) то управление опять приходит на верхний. А так получается что главной нитке надо дважды делать unlock - неясно как
Это был ПСЕВДОКОД, я хотел показать что после выполнения нитка должна заблокироваться на mutexActive.
Если в реальном коде использовать блокировку с мьютексом, то второй lock не нужен.

3) // Получили данные из очереди
Так просто их не получить. Только 1 нитка (в один момент времени) может класть в очередь или извлекать из нее. Эти операции должны быть внутри lock/unlock. Если мутекс заводится для рабочей нитки - это никак не блокирует главную. Если (предположим) этот мутекс общий - опять нехорошо, расчеты не должны быть locked
Сделай еще один мьютекс для доступа к очереди.
Главная нить lock(), кладет данные в очередь, unlock().
Рабочая нить lock(), выдащили данные из очереди, unlock, начали считать.
Этот мьютекс блокирует только вставку/извлечение данных из очереди.


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 11:33
Ничего здесь сложно нет. ОС сама распределяет потоки по разным процессорам.

Как раньше написали, тебе надо всего лишь реализовать определенный шаблон проектирования.

Создаешь очередь заданий. Это не стек, это именно очередь, т.к., что первое положил, то первое и выполняется.
Создаешь пул потоков. Каждый из них ожидает события заполнения очереди. Делается с помощью мютексов и сигналов (не Qt).
Как только в очередь заданий попадает хотя бы одно задание, посылаешь сигнал для разблокирования одного потока. Тот извлекает задание, и начинает его выполнять. И так далее.
Для получения результатов создаешь очередь результатов. Каждый поток, как только заканчивает вычисление, блокирует монопольно (с помощью мютекса) очередь результатов, сохраняет туда данные, и переходит в ожидание очередного задания, если очередь пуста, либо извлекает задание из очереди и выполняет его.
Если же очередь пуста, то пусть каждый поток проверяет некоторый флаг, о том, что заданий больше не будет.
При установке флага просто заверши поток системным вызовом, соответственно, не забыв разблокировать очередь.

Насчет главного потока. Я так понимаю, что результатов будет ровно столько же, сколько результатов. Здесь можно ждать совпадения кол-ва, но проще будет ожидание завершения всех потоков, т.к. если потоки завершились, то значит задания все решены, можно пожинать результаты. Здесь уже можно извлекать данные из очереди результатов и считать все, что хочешь.
Если процессоров достаточно много, то это действие можно выполнять также в отдельном потоке. Но это уже довольно сильное осложнение алгоритма.


 Я так понимаю, что результатов должно быть именно столько же, сколько заданий.
С очередью результатов проблем нет, просто у каждого задания есть указатель куда помещать результат. В этой конкретной задаче неважно очередь или стек, порядок вычислений роли не играет.

Все получается действительно просто (и не раз уже делано) с завершением всех рабочих ниток. Главной нитке надо выставить флаг "заданий больше не будет", сделать unlock и ждать пока все рабочие нитки отстреляются (join). Но по задаче это не очень подходит потому что создание/убиение ниток будет происходить очень часто. Также мне выгодно иметь очередь "всегда под рукой" - разные задания могут быть в очереди.

Поэтому я и хочу: после того как все задания выполнены все рабочие нитки ждут на семафоре (дальше главная нитка скажет что делать).


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 13:04
Код
C++ (Qt)
typedef struct
{
int do_process; // флаг завершения нити
pthread_mutex_t mutexActive; // мутекс активации
// Другие внутренние данные для нитки (если нужно)
} ThreadData;
 
А тогда что делает mutexActive и зачем он нужен? Рабочая нитка все равно ждет на семафоре очереди - все хорошо кроме одного: неясно что должна делать главная нитка после того как она положила в очередь все что хотела. Как отловить момент "очередь пуста и все в ней выполнено"?


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 13:34
А тогда что делает mutexActive и зачем он нужен? Рабочая нитка все равно ждет на семафоре очереди - все хорошо кроме одного: неясно что должна делать главная нитка после того как она положила в очередь все что хотела. Как отловить момент "очередь пуста и все в ней выполнено"?
Если рабочая нитка будет ждать на семафоре очереди, то проснутся она сможет, только при поступлении данных в очередь, а это нам не подходит. Поэтому, я и предложил вариант, что рабочая нитка засыпает на мьютексе, а будить ее извне разблокированием этого мьютекса, когда данные будут появляться или когда наступит момент завершить нить.


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 15:03
Если рабочая нитка будет ждать на семафоре очереди, то проснутся она сможет, только при поступлении данных в очередь, а это нам не подходит. Поэтому, я и предложил вариант, что рабочая нитка засыпает на мьютексе, а будить ее извне разблокированием этого мьютекса, когда данные будут появляться или когда наступит момент завершить нить.
Ждать на семафоре очереди все нитки обязаны и ничего не изменится если мы "обернем" это в еще 1 мутекс.

Хммм... я кажется придумал как :) Сделаю/проверю - отпишусь


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 15:09
Ждать на семафоре очереди все нитки обязаны и ничего не изменится если мы "обернем" это в еще 1 мутекс.
Кому они обязаны? Почему они обязаны? Кто эти обязательства устанавливает? :)

Как вариант, можно ввести "специальное" значение данных, которое будет означать, что нитка которая его получает должна завершиться. Главная нить помещает в очередь рабочие данные и в конце специальные (по количеству запущенных нитей). Рабочие нити обработают нужные данные и когда получат "специальные" завершаться.

Для 3-х нитей:
[data] [data] [data] [data] ... [data] [stop] [stop] [stop]


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 15:26
Ждать на семафоре очереди все нитки обязаны и ничего не изменится если мы "обернем" это в еще 1 мутекс.
Кому они обязаны? Почему они обязаны? Кто эти обязательства устанавливает? :)
Жизнь :) Не имею права делать
Код:
job = theJobsQueu.last();
В то время как др. нитка может сделать бяку типа
Код:
theJobQueu.pop_back();
Семафор очереди ставить обязан


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 15:29
Жизнь :) Не имею права делать
Код:
job = theJobsQueu.last();
В то время как др. нитка может сделать бяку типа
Код:
theJobQueu.pop_back();
Здесь хватит одного мьютекса, для чего семафор?


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 15:40
Здесь хватит одного мьютекса, для чего семафор?
Для простоты/удобства изложения давайте считать что семафор и мутекс - одно и то же. Различия между sem_wait и pthread_mutex_lock (sem_post и pthread_mutex_unlock)  нас здесь не интересуют


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 15:57
Для простоты/удобства изложения давайте считать что семафор и мутекс - одно и то же. Различия между sem_wait и pthread_mutex_lock (sem_post и pthread_mutex_unlock)  нас здесь не интересуют
Лучше давайте на ты.

Главная нить положила данные в очередь и "руками" разблокировала (unlock) очередную нить, которая это значение из очереди достанет.


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 16:14
Лучше давайте на ты.
Не умею, мы с Вами водку не пили :)

Главная нить положила данные в очередь и "руками" разблокировала (unlock) очередную нить, которая это значение из очереди достанет.
Да, и все хорошо пока "есть что положить". А вот что делать главной нитке после того как "все положено"? Просто unlock - нельзя, какие-то нитки еще считают. Как узнать что все расчеты закончены?


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 16:22
Да, и все хорошо пока "есть что положить". А вот что делать главной нитке после того как "все положено"? Просто unlock - нельзя, какие-то нитки еще считают. Как узнать что все расчеты закончены?
Когда у всех нитей флаг calculate == 0, то все расчеты завершены.
Код
C++ (Qt)
typedef struct
{
       int     do_process;                             // флаг завершения нити
       pthread_mutex_t mutexActive;    // мутекс активации
int calculate; // 1 - нить производит вычисления; 0 - нить простаивает
       // Другие внутренние данные для нитки (если нужно)
} ThreadData;
 
void *thread_func( void *data )
{
       assert( data != NULL );
       ThreadData *d = (ThreadData*)data;
 
       for(;;)
       {
               pthread_mutx_lock( d->mutexActive );
 
d->calculate = 1;
 
               ...
 
d->calculate = 0;
       }
       return NULL;
}
 


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 19:44
Когда у всех нитей флаг calculate == 0, то все расчеты завершены.
На такие грабли я наступал уже не раз :) В данном конкретном случае: допустим главная нитка бросилась проверять "все ли отстрелялись". Бог с ним что это некрасиво (главная нитка ест процессор с чем-то типа while(true)) но главное - оно и работать будет "не всегда" а это самое мерзкое. Пример:

- очередь пуста. Две рабочих нитки "дорабатывают". Одна еще не успела установить флаг calculate = 1, другая свой флаг уже сбросила. Именно в этот злосчастный момент главная нитка проверила и решила "все сделано". Alas, poor Yorick  :'(


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 19:50
На такие грабли я наступал уже не раз :) В данном конкретном случае: допустим главная нитка бросилась проверять "все ли отстрелялись". Бог с ним что это некрасиво (главная нитка ест процессор с чем-то типа while(true)) но главное - оно и работать будет "не всегда" а это самое мерзкое.
Это цикл по всем ниткам пула. Откуда while( true )?

- очередь пуста. Две рабочих нитки "дорабатывают". Одна еще не успела установить флаг calculate = 1, другая свой флаг уже сбросила. Именно в этот злосчастный момент главная нитка проверила и решила "все сделано". Alas, poor Yorick  :'(
Так в мьютекс это оберни, в мьютекс.
Все к чему могут обращаться/устанавливать несколько ниток, нужно в мьютекс оборачивать.


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 20:04
Это цикл по всем ниткам пула. Откуда while( true )?
"Оттуда". Главная нитка пробежалась и нашла - как минимум 1 из рабочих ниток еще пашет. Что главная нитка должна делать кроме как повторять и повторять проверку? Вот и while( true ) - ну можно do {} while, без разницы

Так в мьютекс это оберни, в мьютекс.
Все к чему могут обращаться/устанавливать несколько ниток, нужно в мьютекс оборачивать.
Да прынцыпы понятны, но не всегда оно легко получается:) Мутексом-то управлять надо а не просто "оборачивать"


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 20:16
"Оттуда". Главная нитка пробежалась и нашла - как минимум 1 из рабочих ниток еще пашет. Что главная нитка должна делать кроме как повторять и повторять проверку? Вот и while( true ) - ну можно do {} while, без разницы
Есть такая штука, называется семафор. Вот если его инициализировать количеством рабочих нитей и при начале расчета нитка будет его уменьшать на единицу, а при завершении увеличивать, то у тебя будет точное количество рабочих нитей без всяких циклов.

Да прынцыпы понятны, но не всегда оно легко получается:) Мутексом-то управлять надо а не просто "оборачивать"
Ну конечно, то не легко, это не просто.
У меня складывается впечатление, что ты даже не подумал о чем я говорю, а сразу кинулся в спор. И вместо того что бы искать решение придумываешь себе сложности.
Ты думаешь мне интересно с тобой спорить?


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 21, 2009, 20:34
Есть такая штука, называется семафор. Вот если его инициализировать количеством рабочих нитей и при начале расчета нитка будет его уменьшать на единицу, а при завершении увеличивать, то у тебя будет точное количество рабочих нитей без всяких циклов.
Будет, а толку? Если значение ноль - что это значит? Все нитки работают или все уже кончилось?

Ну конечно, то не легко, это не просто.
У меня складывается впечатление, что ты даже не подумал о чем я говорю, а сразу кинулся в спор. И вместо того что бы искать решение придумываешь себе сложности.
Ты думаешь мне интересно с тобой спорить?
Да никуда я не кидался :) Понятно что все операции с нитками должны быть тщательно синхронизированы - но как это сделать в данном случае? Что в кого обернуть? Ну помещу я это "внутрь" еще одного мутекса - и что? А решение я кажется придумал. Сделаю/проверю - отпишусь. И злиться на меня нечего :)


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 21, 2009, 22:19
Набросал немного кода:
Код
C++ (Qt)
#include <boost/noncopyable.hpp>      
#include <boost/thread/thread.hpp>    
#include <boost/thread/mutex.hpp>    
#include <boost/bind.hpp>            
#include <boost/shared_ptr.hpp>      
#include <iostream>                  
#include <queue>                      
 
boost::mutex            mutexCout;              // Блокировка std::cout
 
boost::mutex            mutexDatas;
boost::condition        condDatas;
 
std::queue<int>         datas;
 
void thread_func()
{                
       for(;;)  
       {        
               boost::mutex::scoped_lock locker( mutexDatas );
               while( datas.empty() )                        
                       condDatas.wait( locker );              
 
               int val = datas.front();
               if( val == -1 )        
                       break;          
 
               datas.pop();
               locker.unlock();
 
               {
                       boost::mutex::scoped_lock locker( mutexCout );
                       std::cout << "Process thread " << val << std::endl;
               }                                                          
               sleep( 3 );                                                
               {                                                          
                       boost::mutex::scoped_lock locker( mutexCout );    
                       std::cout << "Stop thread " << val << std::endl;
               }
       }
}
 
// Помещение данных в очередь
// -1 специальное сообщение обозначающее, что данных больше не будет
void push_data( int v )
{
       boost::mutex::scoped_lock locker( mutexDatas );
       datas.push( v );
       condDatas.notify_one();
}
 
int main( int, char ** )
{
       boost::thread th1( &thread_func );
       boost::thread th2( &thread_func );
       boost::thread th3( &thread_func );
 
       for( int i = 0; i < 20; ++i )
               push_data( i * 10 );
 
       sleep( 5 );
 
       for( int i = 0; i < 20; ++i )
               push_data( i * 10 );
 
       push_data( -1 );
 
       th1.join();
       th2.join();
       th3.join();
 
       return 0;
}
 


Название: Re: Нитки и очередь
Отправлено: spectre71 от Октябрь 22, 2009, 12:41
Добрый вечер
...
...

Наверное так:

proc_thread.h
Код
C++ (Qt)
//---------------------------------------------------------------------------
 
#ifndef proc_threadH
#define proc_threadH
 
//---------------------------------------------------------------------------
 
#include <QObject>
#include <QThread>
#include <QWaitCondition>
#include <QMutex>
 
/*-----------------------------------------------------------------------------*/
 
class TThreadData;
class TCommonThreadData;
class TProcThread;
 
/*-----------------------------------------------------------------------------*/
 
class TThreadData {
//
//
//
};
 
class TCommonThreadData : public QObject {
   Q_OBJECT
 protected:
   int RunThreads;
   int ThreadsCount;
   QAtomicInt Stopped;
   QMutex Mutex;
   QWaitCondition Wait;
   QList<TThreadData*> NewData;
 public:
   TCommonThreadData(int ThreadsCount, QObject* parent=NULL);
   virtual ~TCommonThreadData();
 public:
   inline bool stopped(void) const {return Stopped;}
   void wait   (void);
   TThreadData* nextThreadData(void);
   void completeThreadData(TThreadData* ReadyData);
 protected:
   void stopAll(void);
   void addThreadData(TThreadData* ThreadData);
 protected slots:
   void doCompleteThreadData(TThreadData* ReadyData);
 signals:
   void signalCompleteThreadData(TThreadData*);
};  
 
 
class TProcThread : public QThread {
   Q_OBJECT
 protected:
   TCommonThreadData* CommonData;
 public:
   TProcThread(TCommonThreadData* CommonData, QObject* parent=NULL);
   virtual ~TProcThread();
 protected:
   virtual void run (void);
 protected:
   inline void wait   (void)       {CommonData->wait();}
   inline bool stopped(void) const {return CommonData->stopped();}
 protected:
   void calculate(TThreadData* ThreadData);
};
 
/*-----------------------------------------------------------------------------*/
 
#endif
 

proc_thread.cpp
Код
C++ (Qt)
#include "proc_thread.h"
#include <QMutexLocker>
 
 
/***************************
*     class TCommonThreadData
***************************/

 
 
TCommonThreadData::TCommonThreadData(int ThreadsCount, QObject* parent) : QObject(parent) {
 this->ThreadsCount = ThreadsCount;
 RunThreads = 0;
 Stopped    = 0;
 connect(this , SIGNAL(signalCompleteThreadData(TThreadData*)), this , SLOT(doCompleteThreadData(TThreadData*)), Qt::QueuedConnection);
 for(int i=0; i<ThreadsCount; i++) {new TProcThread(this);}
}
 
TCommonThreadData::~TCommonThreadData() {
}
 
TThreadData* TCommonThreadData::nextThreadData(void) {
 QMutexLocker MutexLocker(&Mutex);
 if(NewData.isEmpty()) {return NULL;}
 RunThreads++;
 return NewData.takeFirst();
}
 
void TCommonThreadData::completeThreadData(TThreadData* ReadyData) {
 emit signalCompleteThreadData(ReadyData);
}                          
 
void TCommonThreadData::doCompleteThreadData(TThreadData* ReadyData) {
 //
 //  Work with ReadyData
 //                    
 //  delete ReadyData;
 //                    
 
 QMutexLocker MutexLocker(&Mutex);
 RunThreads--;
 if(!Stopped) {
   int Count = (NewData.count() <= ThreadsCount-RunThreads)?NewData.count():ThreadsCount-RunThreads;
   for(int i=0; i<Count; i++) {Wait.wakeOne();}
   return;
 }
 if(!RunThreads) {
   //
   // All Threads Completed
   //
 }
}
 
void TCommonThreadData::addThreadData(TThreadData* ThreadData) {
 QMutexLocker MutexLocker(&Mutex);
 NewData.append(ThreadData);                
 if(Stopped) {return;}
 Wait.wakeOne();
}
 
void TCommonThreadData::wait (void) {
 Mutex.lock();
 Wait.wait(&Mutex);
 Mutex.unlock();
}
 
void TCommonThreadData::stopAll(void) {
 Stopped.ref();
 Mutex.lock();
 Wait.wakeAll();
 Mutex.unlock();
}
 
 
 
/***************************
*     class TProcThread
***************************/

 
TProcThread::TProcThread(TCommonThreadData* CommonData, QObject* parent) : QThread(parent) {
 this->CommonData = CommonData;
 connect(this , SIGNAL(finished()), this , SLOT(deleteLater()));
 start();
}
 
TProcThread::~TProcThread() {
}
 
void TProcThread::run (void) {
 TThreadData* ThreadData;
 for(;;) {
   if(stopped()) {return;}
   wait();
   if(stopped()) {return;}
   ThreadData = CommonData->nextThreadData();
   if(!ThreadData) {continue;}
   calculate(ThreadData);
   CommonData->completeThreadData(ThreadData);
 }
}            
 
void TProcThread::calculate(TThreadData* ThreadData) {
 //
 //  Calculate ThreadData
 //                  
}
 


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 22, 2009, 13:43
А вот что получилось у меня (очевидyые/несущественные детали опущены).
Для OSX (и, насколько мне известно Linux)

CSemaphore::Wait  - sem_wait
CSemaphore::Signal  - sem_post

Код:
void CThreadControl::PutJob( void * data )
{
if (!data) return;
CSemaphore::Wait(mClientJobSemaphore);
mJobList.PutJob((CRTJob *) data);
CSemaphore::Signal(mClientJobSemaphore);
}

bool CThreadControl::GetJob( void )
{
CSemaphore::Wait(mClientJobSemaphore);
CRTJob * job = mJobList.GetJob();

// do nothing (but return true) if no jobs
if (!job) return true;

switch (job->mID) {
// main thread is waiting for all jobs ready
case MPJobEnd:
// if all done then open semaphore for main thread
if (++mNumReady >= mThread.Count())
CSemaphore::Signal(mHostEndSemaphore);

// unlock queue semaphore
CSemaphore::Signal(mClientJobSemaphore);

// wait on mClientEndSemaphore
CSemaphore::Wait(mClientEndSemaphore);
if (mNumReady) {
if (--mNumReady > 0)
CSemaphore::Signal(mClientEndSemaphore);
else
CSemaphore::Signal(mHostBegSemaphore);
}
break;

// terminate all jobs
case MPJobQuit:
// unlock queue semaphore and return false to terminate the thread
CSemaphore::Signal(mClientJobSemaphore);
return false;

// it is a job to calculate
default:
// unlock queue semaphore and perform the job
CSemaphore::Signal(mClientJobSemaphore);
job->Eval();
break;
}
return true;
}

void CThreadControl::WaitJobs( void )
{
int i, limit = mThread.Count();
for (i = 0; i < limit; ++i)
PutJob(mJobList.Alloc(MPJobEnd));

CSemaphore::Wait(mHostEndSemaphore);
CSemaphore::Signal(mClientEndSemaphore);
}

void * theThreadFunc( void * )
{
while (theThreadControl.GetJob()) {}
return 0;
}
Пошел отлаживаться - мне там еще много глобальных переменных убрать надо. Всем спасибо за помощь/участие 


Название: Re: Нитки и очередь
Отправлено: spectre71 от Октябрь 22, 2009, 14:15
Вот подправил немного:

TCommonThreadData::addThreadData - добавляем данные для расчета (ThreadData)
TCommonThreadData::noMoreNewData - вызываем когда данных(ThreadData) больше не будет
TCommonThreadData::onCalculatedData - вызовется когда расчитаны конкретные данные (ThreadData)
TCommonThreadData::onCompletedAllData - вызовется когда все данные расчитаны
TProcThread::calculate - расчет конкретных данных (ThreadData)

proc_thread.h
Код
C++ (Qt)
//---------------------------------------------------------------------------
 
#ifndef proc_threadH
#define proc_threadH
 
//---------------------------------------------------------------------------
 
#include <QObject>
#include <QThread>
#include <QWaitCondition>
#include <QMutex>
 
/*-----------------------------------------------------------------------------*/
 
class TThreadData;
class TCommonThreadData;
class TProcThread;
 
/*-----------------------------------------------------------------------------*/
 
class TThreadData {
//
//
//
};
 
class TCommonThreadData : public QObject {
   Q_OBJECT
 protected:
   int RunThreads;
   int ThreadsCount;
   bool NoMoreNewData;
   QAtomicInt Stopped;
   QMutex Mutex;
   QWaitCondition Wait;
   QList<TThreadData*> NewData;
 public:
   TCommonThreadData(int ThreadsCount, QObject* parent=NULL);
   virtual ~TCommonThreadData();
 public:
   inline bool stopped(void) const {return Stopped;}
   void wait   (void);
   TThreadData* nextThreadData(void);
   void completeThreadData(TThreadData* ReadyData);
 protected:
   void noMoreNewData(void);
   void addThreadData(TThreadData* ThreadData);
   void onCalculatedData(TThreadData* ReadyData);
   void onCompletedAllData(void);
 protected slots:
   void doCompleteThreadData(TThreadData* ReadyData);
 signals:
   void signalCompleteThreadData(TThreadData*);
};  
 
 
class TProcThread : public QThread {
   Q_OBJECT
 protected:
   TCommonThreadData* CommonData;
 public:
   TProcThread(TCommonThreadData* CommonData, QObject* parent=NULL);
   virtual ~TProcThread();
 protected:
   virtual void run (void);
 protected:
   inline void wait   (void)       {CommonData->wait();}
   inline bool stopped(void) const {return CommonData->stopped();}
 protected:
   void calculate(TThreadData* ThreadData);
};
 
/*-----------------------------------------------------------------------------*/
 
#endif

proc_thread.cpp
Код
C++ (Qt)
#include "proc_thread.h"
#include <QMutexLocker>
 
/***************************
*     class TCommonThreadData
***************************/

 
TCommonThreadData::TCommonThreadData(int ThreadsCount, QObject* parent) : QObject(parent) {
 this->ThreadsCount = ThreadsCount;
 RunThreads = 0;
 Stopped    = 0;
 NoMoreNewData  = false;
 connect(this , SIGNAL(signalCompleteThreadData(TThreadData*)), this , SLOT(doCompleteThreadData(TThreadData*)), Qt::QueuedConnection);
 for(int i=0; i<ThreadsCount; i++) {new TProcThread(this);}
}
 
TCommonThreadData::~TCommonThreadData() {
}
 
TThreadData* TCommonThreadData::nextThreadData(void) {
 QMutexLocker MutexLocker(&Mutex);
 if(NewData.isEmpty()) {return NULL;}
 RunThreads++;
 return NewData.takeFirst();
}
 
void TCommonThreadData::completeThreadData(TThreadData* ReadyData) {
 emit signalCompleteThreadData(ReadyData);
}                          
 
void TCommonThreadData::doCompleteThreadData(TThreadData* ReadyData) {
 onCalculatedData(ReadyData);
 QMutexLocker MutexLocker(&Mutex);
 RunThreads--;
 if(!NewData.isEmpty()) {
   Wait.wakeOne();
 } else if (NoMoreNewData) {
   if(!Stopped) {
     Stopped.ref();
     Wait.wakeAll();
   }
   if(!RunThreads) {onCompletedAllData();}
 }
}
 
void TCommonThreadData::addThreadData(TThreadData* ThreadData) {
 if(NoMoreNewData) {
   // throw ERROR usage addThreadData
 }
 QMutexLocker MutexLocker(&Mutex);
 NewData.append(ThreadData);                
 Wait.wakeOne();
}
 
void TCommonThreadData::wait (void) {
 Mutex.lock();
 Wait.wait(&Mutex);
 Mutex.unlock();
}
 
void TCommonThreadData::noMoreNewData(void) {
 if(NoMoreNewData) {return;}
 NoMoreNewData = true;
 QMutexLocker MutexLocker(&Mutex);
 if(NewData.isEmpty()) {
   Stopped.ref();
   Wait.wakeAll();
   if(!RunThreads) {onCompletedAllData();}
 }
}
 
void TCommonThreadData::onCalculatedData(TThreadData* ReadyData) {
 //
 //  Work with ReadyData
 //                    
 //  delete ReadyData;
 //                    
}
 
void TCommonThreadData::onCompletedAllData(void) {
 //
 // All Data Calculated
 //
}
 
/***************************
*     class TProcThread
***************************/

 
TProcThread::TProcThread(TCommonThreadData* CommonData, QObject* parent) : QThread(parent) {
 this->CommonData = CommonData;
 connect(this , SIGNAL(finished()), this , SLOT(deleteLater()));
 start();
}
 
TProcThread::~TProcThread() {
}
 
void TProcThread::run (void) {
 TThreadData* ThreadData;
 for(;;) {
   if(stopped()) {return;}
   wait();
   if(stopped()) {return;}
   ThreadData = CommonData->nextThreadData();
   if(!ThreadData) {
     // throw ERROR Synchronization
   }
   calculate(ThreadData);
   CommonData->completeThreadData(ThreadData);
 }
}            
 
void TProcThread::calculate(TThreadData* ThreadData) {
 //
 //  Calculate ThreadData
 //                  
}


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 22, 2009, 15:56
Обвернул свой вариант в класс:  :)
Код
C++ (Qt)
#include <boost/noncopyable.hpp>                        
#include <boost/thread/thread.hpp>                      
#include <boost/thread/mutex.hpp>                      
#include <boost/thread/condition.hpp>                  
#include <boost/bind.hpp>                              
#include <boost/shared_ptr.hpp>                        
#include <iostream>                                    
#include <queue>                                        
#include <list>                                        
 
struct JobData
{            
       JobData( int i ) : x( i ), y( i ) {}
       int     x;                          
       int     y;                          
};                                          
 
class JobManager
{              
public:        
       bool begin( int numThread = 10 );
       bool end();                      
 
       void addJob( const JobData &data );
 
private:
       void threadFunc();
 
private:
       boost::mutex                 m_mutex;
       boost::condition              m_cond;
       std::queue<JobData*>    m_datas;        
 
       typedef boost::shared_ptr<boost::thread>        ThreadPtr;
       std::list<ThreadPtr>        m_threadPool;                    
};                                                                
 
bool JobManager::begin( int numThread )
{                                      
       if( m_threadPool.size() )      
       {                              
               std::cout << "Begin already running." << std::endl;
               return false;                                      
       }                                                          
 
       for( int i = 0; i < numThread; ++i )
       {                                  
               ThreadPtr thread( new boost::thread( boost::bind( &JobManager::threadFunc, this ) ) );
               m_threadPool.push_back( thread );                                                    
       }                                                                                            
 
       return true;
}                  
 
bool JobManager::end()
{                    
       if( !m_threadPool.size() )
       {                        
               std::cout << "End already running." << std::endl;
               return false;                                    
       }                                                        
 
       {
               boost::mutex::scoped_lock locker( m_mutex );
               m_datas.push( 0 );                          
               m_cond.notify_all();                        
       }                                                  
 
       for_each( m_threadPool.begin(), m_threadPool.end(), boost::bind( &boost::thread::join, _1 ) );
m_threadPool.clear();
 
       m_datas.pop();
 
       return true;
}                  
 
void JobManager::addJob( const JobData &data )
{                                            
       boost::mutex::scoped_lock locker( m_mutex );
       m_datas.push( new JobData( data ) );        
       m_cond.notify_one();                        
}                                                  
 
void JobManager::threadFunc()
{                            
       for(;;)              
       {                    
               boost::mutex::scoped_lock locker( m_mutex );
               while( m_datas.empty() )                    
                       m_cond.wait( locker );              
 
               JobData *val = m_datas.front();
               if( !val )                    
                       break;                
 
               JobData data( *val );
 
               m_datas.pop();
delete val;
               locker.unlock();
 
// Выполняем расчет используя data
// ...
       }
}
 

Использование:
Код
C++ (Qt)
{
       JobManager mng;
 
// Запускаем 5 ниток
       mng.begin( 5 );
for( ... )
mng.addJob( JobData(...) );
mng.stop()
 
// Запускаем 20 ниток
mng.begin( 20 )
for( ... )
mng.addJob( JobData(...) );
mng.stop()
 
       return 0;
}
 


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 22, 2009, 16:36
А сравнить у кого быстрее? :)  Ну понятно, не сию минуту а найти время на выходные.  Если есть интерес, что возьмем для расчетов?


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 22, 2009, 19:21
А сравнить у кого быстрее? :)  Ну понятно, не сию минуту а найти время на выходные.  Если есть интерес, что возьмем для расчетов?
Так все есть, подставь расчет и проверь.  :)


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 22, 2009, 20:23
Так все есть, подставь расчет и проверь.  :)
Никому не хочется возиться с чужим кодом, искать нужные хедеры, либы и.т.п. А вот сделать тестовый пример со своей реализацией - несложно и интересно. Если есть идеи такого примера (что считать) - я поддержу


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 23, 2009, 08:27
Никому не хочется возиться с чужим кодом, искать нужные хедеры, либы и.т.п. А вот сделать тестовый пример со своей реализацией - несложно и интересно. Если есть идеи такого примера (что считать) - я поддержу
Все равно тесты нужно гонять на одной машине.
Для варианта Spectre нужен Qt, для моего нужен boost.
Можно скопировать код с форума, если нужно могу выложить в отдельном архиве.


Название: Re: Нитки и очередь
Отправлено: umts от Октябрь 24, 2009, 19:24
Это весьма сложная задача (распределение нагрузки по процессорам, ядрам и т.д. и т.п.) для решения "руками". Тут можно посоветовать изучить OMP либо (идеальный вариант) Intel Threading Building Blocks, возможности TBB просто великолепны.
Ничего здесь сложно нет. ОС сама распределяет потоки по разным процессорам.


Как раньше написали, тебе надо всего лишь реализовать определенный шаблон проектирования.

Создаешь очередь заданий. Это не стек, это именно очередь, т.к., что первое положил, то первое и выполняется.
Создаешь пул потоков. Каждый из них ожидает события заполнения очереди. Делается с помощью мютексов и сигналов (не Qt).
Как только в очередь заданий попадает хотя бы одно задание, посылаешь сигнал для разблокирования одного потока. Тот извлекает задание, и начинает его выполнять. И так далее.
Для получения результатов создаешь очередь результатов. Каждый поток, как только заканчивает вычисление, блокирует монопольно (с помощью мютекса) очередь результатов, сохраняет туда данные, и переходит в ожидание очередного задания, если очередь пуста, либо извлекает задание из очереди и выполняет его.
Если же очередь пуста, то пусть каждый поток проверяет некоторый флаг, о том, что заданий больше не будет.
При установке флага просто заверши поток системным вызовом, соответственно, не забыв разблокировать очередь.

Насчет главного потока. Я так понимаю, что результатов будет ровно столько же, сколько результатов. Здесь можно ждать совпадения кол-ва, но проще будет ожидание завершения всех потоков, т.к. если потоки завершились, то значит задания все решены, можно пожинать результаты. Здесь уже можно извлекать данные из очереди результатов и считать все, что хочешь.
Если процессоров достаточно много, то это действие можно выполнять также в отдельном потоке. Но это уже довольно сильное осложнение алгоритма.


 Я так понимаю, что результатов должно быть именно столько же, сколько заданий.



Точно, а Intel TBB - это так, для лохов... Вы сначала почитайте про эту технологию, тогда поймете, в чем разница между тем, что делает ОС, и тем, что делает ПРАВИЛЬНАЯ технология многопоточного вычисления. Во-первых, автоматически определяется оптимальное число нитей, во-вторых, нет необходимости заботиться о запуске/останове потоков, в-третьих, существует понятие "воровство задач" - когда одна задача (нить) закончила выполнение, а другая еще нет - часть незаконченной задачи передается на выполнение отработавшей нити, в-четвертых, производится балансировка нагрузки. Перечислять можно еще долго, рекомендую ознакомиться, хотя бы для общего развития. ОС же просто запускает (если такая возможность существует в лоб) поток на другом ядре/процессоре и не более того.


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 25, 2009, 19:30
Отрихтовал и убрал пару багов, потестировал на машине с 2 процессорами по 2 ядра (итого 4).  В качестве рабочей функции для thread использовал генерацию случайного числа и вычисление его косинуса 1024 раз (Test1) и 4096 раз (Test2). Результаты интересные

          no threads         1 thread       4 threads        8 threads
----------------------------------------------------------------------
Test 1   1:57 (100%)   2:35 (132%)   1:05 (55%)   1:27(74%)
Test 2   7:40 (100%)   8:15 (107%)   2:13 (29%)   2:56(38%)

В таблице время (в мин:сек и в процентах) для разного числа threads. Как видно, при небольших задачах/вычислениях расходы на переключение/синхронизацию становятся значительными. Видимо для таких случаев надо объединять задачи в блоки/кучки


Название: Re: Нитки и очередь
Отправлено: SABROG от Октябрь 26, 2009, 19:53
А это какой вариант Qt или boost?


Название: Re: Нитки и очередь
Отправлено: BRE от Октябрь 26, 2009, 20:13
А это какой вариант Qt или boost?
Проверял на boost 1.34.1 и 1.37, думаю пойдет на boost с версии 1.25.0 (в которой появился модуль thread).

Sorry, не правильно прочел вопрос.  ;D


Название: Re: Нитки и очередь
Отправлено: Igors от Октябрь 26, 2009, 20:16
А это какой вариант Qt или boost?
Нет, я проверял свою реализацию в которой нет ни Qt, ни boost  :)


Название: Re: Нитки и очередь
Отправлено: niXman от Ноябрь 24, 2009, 04:10
Товарищи, если Вам не сложно, выложите каждый Ваш вариант кода(хотя бы минимально, я остальное приведу в порядок).
Хочу создать топик именно по этому делу. Так как много подобных вопросов случается на форумах.

Ну и мой личный интерес есть.


Название: Re: Нитки и очередь
Отправлено: k06a от Февраль 07, 2010, 19:43
А QtConcurrent не пробовали? Он вроде автоматом подбирает оптимальное число тредов . . .


Название: Re: Нитки и очередь
Отправлено: Igors от Февраль 07, 2010, 20:41
А QtConcurrent не пробовали? Он вроде автоматом подбирает оптимальное число тредов . . .
С числом ниток проблем нет. По поводу QtConcurrent: насколько я понимаю, он обеспечивает различные средства блокировки/защиты, но не ф-ции "диспетчера" задач. Сейчас я на OpenMP (Intel компилятор) и вполне доволен результатами. Хотя работы очень много (никакая библиотека "распараллеливать" за меня не будет)