Russian Qt Forum

Qt => Работа с сетью => Тема начата: demaker от Август 25, 2017, 18:05



Название: Многопотчный сервер
Отправлено: demaker от Август 25, 2017, 18:05
Есть сервер, в котором каждое клиентское подключение обрабатывается в отдельном потоке.
Клиент посылает файл на сервер, где тот обрабатывается и по команде клиента с сервера посылается обратно.

Сделал так.
1.Принимаем данные с клиента и запихиваем их в файл.
Сначала с клиента передается посылка- инфа о файле(ИМЯ,РАЗМЕР),
в посылке типа
Код:
  TYPE_INFO_FILE 
затем идут сами данные с файла - посылки типа
Код:
 TYPE_DATA_FILE 
после удачного приема файла сервер высылает посылку-команду об успешном приеме.
Код:
 CMD_SUCCESS_RECEIVE 

2.Кидаем указатель на файл в буффер(список) принятых файлов.

3.Если  в буффере принятых файлов есть что обрабатывать, то обрабатываем.

4.После обработки файла перекидываем указатель в буффер(список) на передачу, удалив указатель из буффера(список) принятых файлов.

5.По команде
Код:
CMD_GET_READY_FILE
с клиента передаем файл обратно на клиент.
(Все тоже самое что и п. №1, только выполняет сервер)

Обработка файла происходит в отдельном потоке.
Если при приеме команды от клиента в буффере на передачу есть что-то, то отсылаем.

Когда работает один клиент все замечательно.
Когда, подключено несколько клиентов, то при последовательной передаче файлов, т.е. сначала один клиент передал файл, потом другой передал свой файл работает тоже норм.

Но когда параллельно идет передача файлов с нескольких клиентов у меня не проходит проверка, что файл полностью передан.
Код:
if(sfiledata.nFileSize == blockSize) //!!!!!!!!!!!!!не проходит проверка

Код
C++ (Qt)
case TYPE_DATA_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size())
           {
               pData = pData.mid(sizeof(head_pack));
 
               if(m_pReceiveFile && m_pReceiveFile->open(QIODevice::ReadWrite | QIODevice::Append)){
                    blockSize += m_pReceiveFile->write(&pData.data()[0],headPack.sizePack);
                    m_pReceiveFile->close();
               }
 
               pData = pData.mid(headPack.sizePack);
 
               {
                   head_pack headpck;
                   headpck.sizePack = sizeof(int);
                   headpck.typePack = TYPE_INFO_PROCESS;
                   headpck.sizePack2 = sizeof(int);
                   QByteArray ba_progress;
                   ba_progress.append((char*)&headpck,sizeof(head_pack));
                   //std::cout<<"blockSize = "<<blockSize;
                   ba_progress.append((char*)(&blockSize),sizeof(int));
                   m_pTcpSocket->write(ba_progress);
                   m_pTcpSocket->waitForBytesWritten(1);
               }
 
               if(sfiledata.nFileSize == blockSize) //!!!!!!!!!!!!!не проходит проверка
               {
                   head_pack headpck;
                   headpck.sizePack = 0;
                   headpck.typePack = CMD_SUCCESS_RECEIVE;
                   headpck.sizePack2 = 0;
                   QByteArray ba_success;
                   ba_success.append((char*)&headpck,sizeof(head_pack));
                   m_pTcpSocket->write(ba_success);
                   m_pTcpSocket->waitForBytesWritten(1);
 
                   blockSize = 0;        
 
                   mutexListRxFiles.lock();
                   m_listRxFiles.append(m_pReceiveFile);
                   mutexListRxFiles.unlock();
                   m_pReceiveFile = NULL;
               }
 

Получается что счетчик принятых байт не равняется размеру передаваемого файла.
Не пойму в чем может быть причина? Как преодалеть это ???

Вот код программы
Код
C++ (Qt)
 
//TcpServer
void TcpServer::incomingConnection(int socketDescriptor)
{
   ThreadServer *thread = new ThreadServer(socketDescriptor,0);
   connect(thread,SIGNAL(fin()),this,SLOT(removeThread()));
   listThreads.append(thread);
   thread->start();
}
 
 
//ThreadServer
ThreadServer::ThreadServer(int socketdescriptor, QObject *parent) :
   QThread(parent),
   descriptor(socketdescriptor),
   m_iPingCounting(0),
   m_iTimerCnt(0),
   m_pReceiveFile(NULL),
   m_pTransmiteFile(NULL),
   m_pThreadProcessData(NULL)
{
   std::cout<<__FUNCTION__;
 
   m_listTxFiles.clear();
   m_listRxFiles.clear();
 
   //moveToThread(this);
 
}
 
ThreadServer::~ThreadServer()
{
   std::cout<<__FUNCTION__;
 
   if(m_pThreadProcessData){
       m_pThreadProcessData->setStopThread(true);
       m_pThreadProcessData->quit();
       m_pThreadProcessData->wait();
       delete m_pThreadProcessData;
   }
 
   while(!m_listRxFiles.isEmpty()){
       delete m_listRxFiles.at(0);
       m_listRxFiles.removeAt(0);
   }
 
   while(!m_listTxFiles.isEmpty()){
       delete m_listTxFiles.at(0);
       m_listTxFiles.removeAt(0);
   }
 
}
 
void ThreadServer::run()
{
   m_pTcpSocket = new QTcpSocket();
 
   if(!m_pTcpSocket->setSocketDescriptor(descriptor)){
       Quit();
       return;
   }
 
   QHostAddress addr = m_pTcpSocket->peerAddress();
   QString str = QString("Ïîòîê ṗ %1 , óñòàíîâëåíî ñîåäèíåíèå ñ êëèåíòîì àäŵåñ %2\n").arg(descriptor).arg(addr.toString());
   QByteArray ba;
   ba.append(str);
   std::cout << ba.data();  
 
   m_pThreadProcessData = new ThreadProcessData(&m_listRxFiles,&mutexListRxFiles,&m_listTxFiles,&mutexListTxFiles);
   m_pThreadProcessData->start();
 
   while(1){
       msleep(5);
 
       if(!TryPing()){
           Quit();
           return;
       }
       ParsDataRes();        
   }
}
 
void ThreadServer::Quit()
{
   delete m_pTcpSocket;
   emit fin();
}
 
bool ThreadServer::TryPing()
{
   if(m_iTimerCnt <= MAX_TIMER_CNT){
       m_iTimerCnt++;
       //std::cout<<"timercnt = "<<m_iTimerCnt<<"\n";
       return true;
   }
   else{
       m_iPingCounting++;
       //std::cout<<"pingcounting = "<<m_iPingCounting<<"\n";
       if(m_iPingCounting <= MAX_PING_COUNTING){
           //CMD_PING
           head_pack headpck;
           headpck.sizePack = 0;
           headpck.typePack = CMD_PING;
           headpck.sizePack2 = 0;
           QByteArray ba_ping;
           ba_ping.append((char*)&headpck,sizeof(head_pack));
           m_pTcpSocket->write(ba_ping);
           m_pTcpSocket->waitForBytesWritten(1);
           return true;
       }
       else{
           std::cout<<"!!!EXIT!!!"<<"\n";
           return false;
       }
   }
}
 
void ThreadServer::ParsDataRes()
{
   static int blockSize = 0;
 
   m_pTcpSocket->waitForReadyRead(1);
   pData.append( m_pTcpSocket->readAll() );
 
   while(pData.size() != 0)
   {
       if(pData.size() < sizeof(head_pack))
           return ;
 
       m_iTimerCnt = m_iPingCounting = 0;
 
       head_pack headPack;
       memcpy((void*)(&headPack), (void *)(&pData.data()[0]), sizeof(head_pack));
 
       if ( headPack.sizePack != headPack.sizePack2 )
       {
           pData = pData.mid( 1 );
           continue;
       }
 
       switch(headPack.typePack)
       {
 
       case TYPE_INFO_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size())
           {
               pData = pData.mid(sizeof(head_pack));
               if( sizeof(sfiledata) <= headPack.sizePack ){
                   memcpy((char *)&sfiledata,&pData.data()[0],headPack.sizePack);
                   pData = pData.mid(headPack.sizePack);
 
                   m_pReceiveFile = new QFile;
                   m_pReceiveFile->setFileName(QString(sfiledata.pFileName));
 
               }
               else{                  
               }                
           }
           else{
               return;
           }
           break;
 
       case TYPE_DATA_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size())
           {
               pData = pData.mid(sizeof(head_pack));
 
               if(m_pReceiveFile && m_pReceiveFile->open(QIODevice::ReadWrite | QIODevice::Append)){
                    blockSize += m_pReceiveFile->write(&pData.data()[0],headPack.sizePack);
                    m_pReceiveFile->close();
               }
 
               pData = pData.mid(headPack.sizePack);
 
               {
                   head_pack headpck;
                   headpck.sizePack = sizeof(int);
                   headpck.typePack = TYPE_INFO_PROCESS;
                   headpck.sizePack2 = sizeof(int);
                   QByteArray ba_progress;
                   ba_progress.append((char*)&headpck,sizeof(head_pack));
                   //std::cout<<"blockSize = "<<blockSize;
                   ba_progress.append((char*)(&blockSize),sizeof(int));
                   m_pTcpSocket->write(ba_progress);
                   m_pTcpSocket->waitForBytesWritten(1);
               }
 
               if(sfiledata.nFileSize == blockSize)
               {
                   head_pack headpck;
                   headpck.sizePack = 0;
                   headpck.typePack = CMD_SUCCESS_RECEIVE;
                   headpck.sizePack2 = 0;
                   QByteArray ba_success;
                   ba_success.append((char*)&headpck,sizeof(head_pack));
                   m_pTcpSocket->write(ba_success);
                   m_pTcpSocket->waitForBytesWritten(1);
 
                   blockSize = 0;        
 
                   mutexListRxFiles.lock();
                   m_listRxFiles.append(m_pReceiveFile);
                   mutexListRxFiles.unlock();
                   m_pReceiveFile = NULL;
               }
           }
           else{
               return;
           }
           break;
 
       case CMD_PING:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
               pData = pData.mid(sizeof(head_pack));              
 
               m_iTimerCnt = m_iPingCounting = 0;
           }
           else{
               return;
           }
           break;
 
       case CMD_GET_READY_FILE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
               pData = pData.mid(sizeof(head_pack));
 
               m_pTransmiteFile = NULL;
               mutexListTxFiles.lock();
               if(!m_listTxFiles.isEmpty()){
                   m_pTransmiteFile = m_listTxFiles.at(0);
                   m_listTxFiles.removeAt(0);
               }
               mutexListTxFiles.unlock();
               TransmiteFile(m_pTransmiteFile);
           }
           else{
               return;
           }
           break;
 
       case CMD_SUCCESS_RECEIVE:
           if(headPack.sizePack + sizeof(head_pack) <= pData.size()){
                 pData = pData.mid(sizeof(head_pack));
 
 
                 if(m_pTransmiteFile){
                     m_pTransmiteFile->remove();
                     delete m_pTransmiteFile;
                     m_pTransmiteFile = NULL;
                 }
           }
           else{
               return;
           }
           break;
       default:
           pData = pData.mid( 1 );
           continue;
       };
   }
   return ;
 
}
 
void ThreadServer::TransmiteFile(QFile *file)
{
   //std::cout<<"TransmiteFile\n";
   if(file == NULL){
 
       head_pack headpck;
       headpck.sizePack = 0;
       headpck.typePack = CMD_ANS_EMPTY_LIST;
       headpck.sizePack2 = 0;
       QByteArray ba_empty_list;
       ba_empty_list.append((char*)&headpck,sizeof(head_pack));
       m_pTcpSocket->write(ba_empty_list);
       m_pTcpSocket->waitForBytesWritten(1);
       return;
   }
 
   {
       sfiledata.nFileSize = file->size();
       for(int i = 0; i < max_char; i++)
           sfiledata.pFileName[i] = file->fileName().toAscii().data()[i];
 
       QByteArray ba;
       head_pack headPack;
       headPack.sizePack = sizeof(sFileData);
       headPack.typePack = TYPE_INFO_FILE;
       headPack.sizePack2 = sizeof(sFileData);
       ba.append((char *)(&headPack),sizeof(head_pack));
       ba.append((char*)(&sfiledata),sizeof(sFileData));
       m_pTcpSocket->write(ba);
       m_pTcpSocket->waitForBytesWritten(1);
   }
 
   {
   if(file->open(QFile::ReadWrite)){
       file->seek(0);
       while(!file->atEnd())
       {
         const QByteArray buf = file->read(MAX_SIZE_FRAME); //64Êáàéò
         head_pack headPack;
         headPack.sizePack = buf.size();
         headPack.typePack = TYPE_DATA_FILE;
         headPack.sizePack2 = buf.size();
 
         QByteArray ba;
         ba.append((char *)(&headPack),sizeof(headPack));
         ba.append(buf);
         m_pTcpSocket->write(ba);
         m_pTcpSocket->waitForBytesWritten(1);
 
       }
       file->close();
   }
   }
}
 
//ThreadProcessData
ThreadProcessData::ThreadProcessData(QList<QFile *> *inFiles, QMutex *mutexInFiles, QList<QFile *> *outFiles, QMutex *mutexOutFiles, QObject *parent) :
   QThread(parent),
   p_listInFiles(inFiles),
   p_mutexInFiles(mutexInFiles),
   p_listOutFiles(outFiles),
   p_mutexOutFiles(mutexOutFiles),
   m_bStopThread(false)
{    
   //moveToThread(this);
}
 
ThreadProcessData::~ThreadProcessData()
{
}
 
void ThreadProcessData::setStopThread(bool flag)
{
   mutexStopThread.lock();
   m_bStopThread = flag;
   mutexStopThread.unlock();
}
 
bool ThreadProcessData::stopThread()
{
   bool flag;
   mutexStopThread.lock();
   flag = m_bStopThread ;
   mutexStopThread.unlock();
   return flag;
}
 
 
void ThreadProcessData::run()
{
   while(!stopThread()){        
       msleep(5);
       if(!isEmptyListInFiles()){
           QFile *processFile;
           p_mutexInFiles->lock();
           processFile = p_listInFiles->at(0);
           p_mutexInFiles->unlock();
 
           if(processFile->open(QIODevice::ReadWrite))
           {
               int lAB = 0;
               int valB = 0;
               int valG = 0;
               int valR = 0;
 
               int correction = 100;//koef correction
               int L = 256;
               int b[L];
 
               QImage image(processFile->fileName());
 
               for(int i = 0; i < image.size().height(); i++){
                   for(int j = 0; j < image.size().width(); j++){
                       QRgb rgb = image.pixel(QPoint(j,i));
                       QColor color(rgb);
                       valB = color.blue();
                       valG = color.green();
                       valR = color.red();
                       lAB += (int)(valR * 0.299 + valG * 0.587 + valB * 0.114);
                   }
               }                
 
               lAB /= image.size().height() * image.size().width();
 
 
               double k = 1.0 + correction / 100.0;
 
               for (int i = 0; i < L; i++)
               {
                   int delta = (int)i - lAB;
                   int temp  = (int)(lAB + k *delta);
 
                   if (temp < 0)   temp = 0;
                   if (temp >= 255)    temp = 255;
 
                   b[i] = (unsigned char)temp;
               }
 
               //
               for (int j = 0; j < image.byteCount(); j++)
               {
                   unsigned char value = image.bits()[j];
                   image.bits()[j] = (unsigned char)b[value];
               }
 
               QImageWriter writer(processFile->fileName());
               writer.write(image);
 
           }
           processFile->close();
 
           p_mutexInFiles->lock();
           p_listInFiles->removeAt(0);
           p_mutexInFiles->unlock();
 
           p_mutexOutFiles->lock();
           p_listOutFiles->append(processFile);
           p_mutexOutFiles->unlock();
       }      
   }
}
 
bool ThreadProcessData::isEmptyListInFiles()
{
   bool isEmpty;
   p_mutexInFiles->lock();
   isEmpty = p_listInFiles->isEmpty();
   p_mutexInFiles->unlock();
   return isEmpty;
}
 


Название: Re: Многопотчный сервер
Отправлено: demaker от Август 31, 2017, 14:38
Выяснил в чем ошибка.
Скажите, если есть класс потока.
В нем есть метод, в котором объявленна статическая переменная.
То для каждого экземпляра класса данная переменная будет своя или одна для всех(экземпляров класса)?
Код
C++ (Qt)
class Thread: public QThread
{
...
protected:
      void run();
 
private:
     void foo();
 
}
 
void Thread::foo()
{
 
  static int value = 0;
  ....
 
  value++;
 
}
 
void Thread::run()
{
   forever(){
      msleep(5);
      foo();
  }
 
}
 

В моем случае получилось что одна для всех экземпляров класса. Почему  ??? не знаю.
Поэтому проверка и не проходила.
Хотя я думал что для каждого экземпляра класса будет своя переменная.



Название: Re: Многопотчный сервер
Отправлено: sergek от Август 31, 2017, 16:24
Так статическая же. Создается одна при компиляции.
А потоки используют одно адресное пространство.


Название: Re: Многопотчный сервер
Отправлено: demaker от Август 31, 2017, 18:31
Так статическая же. Создается одна при компиляции.
А потоки используют одно адресное пространство.
Да :-[ балбес я!