Russian Qt Forum
Март 28, 2024, 20:02 *
Добро пожаловать, Гость. Пожалуйста, войдите или зарегистрируйтесь.
Вам не пришло письмо с кодом активации?

Войти
 
  Начало   Форум  WIKI (Вики)FAQ Помощь Поиск Войти Регистрация  

Страниц: [1]   Вниз
  Печать  
Автор Тема: Реализация простейшего lock-free кольцевого буф  (Прочитано 6423 раз)
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« : Декабрь 06, 2017, 09:00 »

Задача - реализация lock-free кольцевого буфера для обмена сообщениями.
Предполагалась простая модель - размер буфера заранее определен.
С буфером могут одновременно работать несколько писателей и читателей на конкурентной основе.
Если писать некуда, писатели ожидают. Если читать нечего, читатели ожидают.

Алгоритм записи такой.
Писатель атомарно сдвигает головку записи, "захватывая" позицию ячейки буфера для записи.
С каждой позицией связан атомарный флаг, который показывает есть в ней запись или нет.
Если запись существует, то писатель ожидает. Как только писатель завершает запись данных в буфер, флаг наличия записи выставляется в true.

Читатель атомарно сдвигает головку чтения, "захватывая" позицию ячейки буфера для чтения.
Если запись еще не существует, то писатель ожидает. Как только писатель завершает чтение данных из буфера, флаг наличия записи выставляется в false.

Вроде все логично и просто, но реализация ниже не работает. Операции с данными убрал, оставил только операции с головками и флагами.
Читатель "пропускает" записи.

Код
C++ (Qt)
#include <atomic>
#include <cstring>
#include <iostream>
#include <thread>
 
struct Buffer
{
   enum { MaxCount = 0x0004 };
 
   struct Packet
   {
       std::atomic< bool > m_is_exists;
       Packet () : m_is_exists() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   Packet m_packet[ MaxCount ];
 
   void push ()
   {
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       while( m_packet[ pos ].m_is_exists );
       m_packet[ pos ].m_is_exists = true;
   }
 
   void pop ()
   {
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       while ( !m_packet[ pos ].m_is_exists );
       m_packet[ pos ].m_is_exists = false;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   global_buffer.push();
   ++write_count;
}
 
void popValue ()
{
   global_buffer.pop();
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
 
int main ( int /*argc*/, char */*argv*/[] )
{
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
 
   return 0;
}
 
« Последнее редактирование: Декабрь 06, 2017, 09:07 от ssoft » Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4349



Просмотр профиля
« Ответ #1 : Декабрь 06, 2017, 09:59 »

Код
C++ (Qt)
struct Buffer
{
enum { MaxCount = 0x0004 };
 
struct Packet
{
std::atomic< bool > m_is_exists;
Packet () : m_is_exists() {}
};
 
std::atomic< uint32_t > m_write_head;
std::atomic< uint32_t > m_read_head;
Packet m_packet[ MaxCount ];
 
void push ()
{
uint32_t pos = ( m_write_head++ ) % MaxCount;
bool expected = false;
while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, true ) );
}
 
void pop ()
{
uint32_t pos = ( m_read_head++ ) % MaxCount;
bool expected = true;
while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, false ) );
}
};
 
Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #2 : Декабрь 06, 2017, 10:31 »

Спасибо). Так работает.
Но хотелось бы понять в чем кроется ошибка в изначальном примере, чтобы избежать подобных в дальнейшем.
Казалось бы load() (оператор преобразования к типу) и store() (оператор присвоения) также атомарны.
И порядок их применения не может быть изменён ни компилятором, ни CPU.

Какой сценарий поведения приводит к ошибке?
Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #3 : Декабрь 06, 2017, 11:22 »

И вот еще, когда с добавились данные Data (по сути любой тип, в примере int), то работа с ними получилась не атомарной.

Код
C++ (Qt)
#include <atomic>
#include <cassert>
#include <cstring>
#include <iostream>
#include <thread>
 
typedef int Data;
 
struct Buffer
{
   enum { MaxCount = 0x0004 };
 
   struct Packet
   {
       Data m_value;
       std::atomic< bool > m_is_exists;
       Packet () : m_value(), m_is_exists() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   std::atomic< uint32_t > m_count;
   Packet m_packet[ MaxCount ];
 
   bool push ( const Data & value )
   {
       if ( m_count >= MaxCount )
           return false;
 
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       bool expected = false;
       while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, true ) );
       m_packet[ pos ].m_value = value;
       m_packet[ pos ].m_is_exists = true;
       ++m_count;
       return true;
   }
 
   bool pop ( Data & value )
   {
       if ( m_count == 0 )
           return false;
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       bool expected = true;
       while( !m_packet[ pos ].m_is_exists.compare_exchange_weak( expected, false ) );
       value = m_packet[ pos ].m_value;
       m_packet[ pos ].m_value = Data();
       m_packet[ pos ].m_is_exists = false;
       --m_count;
       return true;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   Data value = 1;
   while( !global_buffer.push( value ) );
   ++write_count;
}
 
void popValue ()
{
   Data value;
   while( !global_buffer.pop( value ) );
   assert( value == 1 );
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
 
int main ( int /*argc*/, char */*argv*/[] )
{
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
 
   return 0;
}
 
« Последнее редактирование: Декабрь 06, 2017, 11:24 от ssoft » Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4349



Просмотр профиля
« Ответ #4 : Декабрь 06, 2017, 11:51 »

Какой сценарий поведения приводит к ошибке?

Код
C++ (Qt)
   void push ()
   {
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       while( m_packet[ pos ].m_is_exists );
       // <<<<<< здесь нитка может переключиться, а когда она получит управление обратно, уже не известно что будет в m_is_exists
       m_packet[ pos ].m_is_exists = true;
   }
 
Записан
Old
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 4349



Просмотр профиля
« Ответ #5 : Декабрь 06, 2017, 11:51 »

то работа с ними получилась не атомарной.
Точно. Улыбающийся
Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #6 : Декабрь 06, 2017, 12:07 »

Цитировать
Код
C++ (Qt)
// <<<<<< здесь нитка может переключиться, а когда она получит управление обратно, уже не известно что будет в m_is_exists
Это то понятно). У писателя и читателя конфликта не будет.
Я проморгал, что буфер круговой). Если происходит переключение, то в первоначальном варианте другой писатель или даже несколько может по кругу дойти до этой позиции и записать туда значение.
Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #7 : Декабрь 06, 2017, 13:25 »

Метод compare_exchange_weak/strong не такой уж атомарный(. Значение expected может быть заменено на содержимое во время exchange.[\s]

По сути происходит две операции:[\s]
* проверка, что внутреннее значение равно expected;[\s]
<< здесь может быть переключение контекста[\s]
* выполнение атомарной операции exchange, результат которой помещается в expected.[\s]

Если сравнивать с Qt, то в методы testAndSet* являются атомарными, без возможности переключения контекста внутри.[\s]
Как бы реализовать аналог средствами std?[\s]
« Последнее редактирование: Декабрь 06, 2017, 14:52 от ssoft » Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #8 : Декабрь 06, 2017, 13:59 »

Все-таки нужно документацию лучше читать))).

Compares the contents of the atomic object's contained value with expected:
- if true, it replaces the contained value with val (like store).
- if false, it replaces expected with the contained value .

если метод вернул false, то в переменную expected нужно заново присвоить тестовому значению, иначе при следующем вызове expected будет совсем не тем, чем ожидается.

Сам метод атомарный, мое замечание выше неверное.
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #9 : Декабрь 06, 2017, 15:37 »

Ну одним bool тут не отделаться, давайте int
Код
C++ (Qt)
enum {
flag_busy = 1,  // есть чтение или запись
flag_data = 2,  // есть данные
};
 
void Buffer::push( int data )
{
uint32_t pos = ( m_write_head++ ) % MaxCount;
 
// захватываем cвободную ячейку
 while (true) {
   int expected = 0;
   if (m_packet[pos].m_is_exists.compare_exchange_weak(expected, flag_busy)) break;
   // yield ? abortFlag ?
 }
 
// пишем данные
  m_packet[pos].m_data = data;
 
// освобождаем ячейку ставя ей флаг "есть данные"
  m_packet[pos].m_exists = flag_data;
}
 
int Buffer::pop( void )
{
uint32_t pos = (m_read_head++) % MaxCount;
 
// захватываем ячейку c данными
 while (true) {
   int expected = flag_data;
   if (m_packet[pos].m_is_exists.compare_exchange_weak(expected, flag_busy)) break;
   // yield ? abortFlag ?
 }
 
// читаем данные
  int data = m_packet[pos].m_data;
 
// освобождаем ячейку ставя ей флаг 0 (свободна)
  m_packet[pos].m_exists = 0;
 
  return data;
}
Ну и в данном случае связываться с lock-free не очень "рентабельно" - атомарный лок и все дела
Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #10 : Декабрь 06, 2017, 15:39 »

Получился такой рабочие варианты, кому интересно.

Значения с атомарными флагами состояний.

Код
C++ (Qt)
struct Buffer
{
   enum { MaxCount = 0x0400 };
   enum class Status : uint8_t  { Empty, Proceed, Complete };
 
   struct Packet
   {
       Data m_value;
       std::atomic< Status > m_status;
       Packet () : m_value(), m_status() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   std::atomic< uint32_t > m_count;
   Packet m_packet[ MaxCount ];
 
   bool push ( const Data & value )
   {
       if ( m_count >= MaxCount )
           return false;
 
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       Status expected = Status::Empty;
       while( !m_packet[ pos ].m_status.compare_exchange_weak( expected, Status::Proceed ) )
           expected = Status::Empty;
       m_packet[ pos ].m_value = value;
       m_packet[ pos ].m_status = Status::Complete;
       ++m_count;
       return true;
   }
 
   bool pop ( Data & value )
   {
       if ( m_count == 0 )
           return false;
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       Status expected = Status::Complete;
       while( !m_packet[ pos ].m_status.compare_exchange_weak( expected, Status::Proceed ) )
           expected = Status::Complete;
       value = m_packet[ pos ].m_value;
       //m_packet[ pos ].m_value = Data();
       m_packet[ pos ].m_status = Status::Empty;
       --m_count;
       return true;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   Data value;
   while( !global_buffer.push( value ) );
   ++write_count;
}
 
void popValue ()
{
   Data value;
   while( !global_buffer.pop( value ) );
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
int main ( int /*argc*/, char */*argv*/[] )
{
   std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::chrono::steady_clock::time_point end= std::chrono::steady_clock::now();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
   std::cout << "Time elapsed: " << std::chrono::duration_cast< std::chrono::milliseconds >(end - begin).count() << "ms" << std::endl;
 
   return 0;
}
 

Через указатель на данные

Код
C++ (Qt)
typedef Data * DataPtr;
 
struct Buffer
{
   enum { MaxCount = 0x0004 };
 
   struct Packet
   {
       std::atomic< DataPtr > m_ptr;
       Packet () : m_ptr() {}
   };
 
   std::atomic< uint32_t > m_write_head;
   std::atomic< uint32_t > m_read_head;
   std::atomic< uint32_t > m_count;
   Packet m_packets[ MaxCount ];
 
   bool push ( DataPtr value )
   {
       if ( m_count >= MaxCount )
           return false;
       uint32_t pos = ( m_write_head++ ) % MaxCount;
       DataPtr expected = nullptr;
       while( !m_packets[ pos ].m_ptr.compare_exchange_weak( expected, value ) )
           expected = nullptr;
       ++m_count;
       return true;
   }
 
   bool pop ( DataPtr & value )
   {
       if ( m_count == 0 )
           return false;
       uint32_t pos = ( m_read_head++ ) % MaxCount;
       while( ( value = m_packets[ pos ].m_ptr.exchange( nullptr ) ) == nullptr );
       --m_count;
       return true;
   }
};
 
 
static Buffer global_buffer;
static std::atomic< uint32_t > write_count;
static std::atomic< uint32_t > read_count;
 
void pushValue ()
{
   DataPtr value = new Data;
   while( !global_buffer.push( value ) );
   ++write_count;
}
 
void popValue ()
{
   DataPtr value = nullptr;
   while( !global_buffer.pop( value ) );
   delete value;
   ++read_count;
}
 
void write ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       pushValue();
}
 
void read ( size_t count )
{
   for ( size_t i = 0; i < count; ++i )
       popValue();
}
 
int main ( int /*argc*/, char */*argv*/[] )
{
   std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
 
   std::thread first_writer( write, 10000000 );
   std::thread second_writer( write, 20000000 );
   std::thread reader( read, 30000000 );
 
   first_writer.join();
   second_writer.join();
   reader.join();
 
   std::chrono::steady_clock::time_point end= std::chrono::steady_clock::now();
 
   std::cout << "Write count: " << write_count.load() << std::endl;
   std::cout << "Read count: " << read_count.load() << std::endl;
   std::cout << "Time elapsed: " << std::chrono::duration_cast< std::chrono::milliseconds >(end - begin).count() << "ms" << std::endl;
 
   return 0;
}
 

Второй вариант медленее чем первый почти в 1.5-а раза из-за new/delete, но не требует наличие флагов состояния.
« Последнее редактирование: Декабрь 06, 2017, 16:00 от ssoft » Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #11 : Декабрь 06, 2017, 17:06 »

Ну и в данном случае связываться с lock-free не очень "рентабельно" - атомарный лок и все дела

Если делать атомарный lock, тогда нельзя будет конкурентно писать и читать данные. Буфер будет работать псевдопоследовательно - кто-то один пишет или читает. Здесь же конкурентное чтение и запись возможна. Другое дело, что здесь постоянный сброс кеша CPU происходит, но это лишь простейшая реализация, так - "пощупать")).
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #12 : Декабрь 07, 2017, 07:44 »

Второй вариант медленее чем первый почти в 1.5-а раза из-за new/delete, но не требует наличие флагов состояния.
По указателю - это классика lock-free  Улыбающийся

Да, и вот
Код
C++ (Qt)
       if ( m_count >= MaxCount )
           return false;
 
В multi-threading такие проверки - мертвому припарка. Нужны TryPush/TryPop которые возвращают управление немедленно если захват не удался. Вообще основная работа - организовать эффективное ожидание, остальное - приятное баловство  Улыбающийся

Если делать атомарный lock, тогда нельзя будет конкурентно писать и читать данные. Буфер будет работать псевдопоследовательно - кто-то один пишет или читает. Здесь же конкурентное чтение и запись возможна.
Ну это скорее "моральное удовлетворение", эффективность с простецким локом очень приличная, а код резко упрощается, можно использовать обычный стек или вектор.

Другое дело, что здесь постоянный сброс кеша CPU происходит, но это лишь простейшая реализация, так - "пощупать")).
Я впервые вижу compare_exchange_weak (пользуюсь tbb и Qt), но вроде бы справочник намекает что как раз синхронизации кешей не ждем (поэтому и weak). Или я не так понял?
Записан
ssoft
Программист
*****
Offline Offline

Сообщений: 574


Просмотр профиля
« Ответ #13 : Декабрь 07, 2017, 08:32 »

Я впервые вижу compare_exchange_weak (пользуюсь tbb и Qt), но вроде бы справочник намекает что как раз синхронизации кешей не ждем (поэтому и weak). Или я не так понял?

Конечно нужно подробнее поизучать этот вопрос, но если пофантазировать))), то ...
В случае weak могут быть ложные срабатывания такого рода - даже если реальная память никем не заблокирована и имеет верное значение, операция все-равно может вернуть false.
Возможно, на некоторых архитектурах в таких случаях первоначальная легковесная проверка значения происходит без блокирования (или другие какие принципы).
Но вот для выполнения операции exchange блокирование обязательно и как следствие после успешной операцией обязателен сброс кеша у других CPU.
В случае же strong проверка сразу осуществляется с блокированием.

Но это просто размышления, не более того)).
Записан
Igors
Джедай : наставник для всех
*******
Offline Offline

Сообщений: 11445


Просмотр профиля
« Ответ #14 : Декабрь 07, 2017, 08:48 »

Но вот для выполнения операции exchange блокирование обязательно и как следствие после успешной операцией обязателен сброс кеша у других CPU.
Чего это обязательно? Разве нельзя вернуть false не ожидая синхронизации? Впрочем мои познания в кешах также не отличаются глубиной  Улыбающийся

А вообще lock-free - увлекательнейший "вынос мозга", хотя часто, увы, непрактичный.

Edit: а, понял о чем Вы. Да, если exchange случился, то новое значение должно быть "для всех". Ну это нормально (или неизбежно), атомic тоже чего-то стоит
« Последнее редактирование: Декабрь 07, 2017, 09:07 от Igors » Записан
Страниц: [1]   Вверх
  Печать  
 
Перейти в:  


Страница сгенерирована за 0.364 секунд. Запросов: 23.