Вопрос по qt, c++, tcp, multithreading – Серверный поток внутри класса qt (нужен мьютекс?)

1

Я создал этот серверный класс, который запускает поток при появлении нового соединения. В некоторых случаях он работает нормально, но он не очень стабилен. Я пытаюсь решить, где это ломается. Мой отладчик говорит мне кое-что о qmutex. Если кто-то может определить проблему. ти

Он соединяется с родителем сигнальными слотами и возвращает данные.

Вот заголовок:

#ifndef FORTUNESERVER_H
#define FORTUNESERVER_H

#include <QStringList>
#include <QTcpServer>
#include <QThread>
#include <QTcpSocket>
#include <string>
using namespace  std;


class FortuneServer : public QTcpServer
{
    Q_OBJECT

 public:
    FortuneServer(QObject *parent = 0);

public slots:


void procesServerString(string serverString);
void getStringToThread(string serverString);

protected:
void incomingConnection(int socketDescriptor);

private:
QStringList fortunes;

signals:

void procesServerStringToParent(string serverString);
void getStringToThreadSignal(string serverString);
};


class FortuneThread : public QObject
 {
Q_OBJECT

public:
FortuneThread(int socketDescriptor, QObject *parent);

public slots:

void getString();
void sendString(string sendoutString);

signals:

void error(QTcpSocket::SocketError socketError);
void fromThreadString(string serverString);
void finished();


private:
int socketDescriptor;
QString text;
QTcpSocket tcpSocket;
};

#endif

и копия:

#include <stdlib.h>
#include <QtNetwork>
#include "MeshServer.hh"
#include <iostream>
#include "TableView.hh"

using namespace  std;

FortuneServer::FortuneServer(QObject *parent)
: QTcpServer(parent)
{

}

void FortuneServer::procesServerString(string serverString){

emit procesServerStringToParent(serverString);

}
void FortuneServer::getStringToThread(string serverString){

emit getStringToThreadSignal(serverString);

}

void FortuneServer::incomingConnection(int socketDescriptor)
{


FortuneThread *serverthread = new FortuneThread(socketDescriptor, this);
//connect(&serverthread, SIGNAL(finished()), &serverthread, SLOT(deleteLater()));


QThread* thread = new QThread;

serverthread->moveToThread(thread);

connect(thread, SIGNAL(started()), serverthread, SLOT(getString()));
connect(serverthread, SIGNAL(fromThreadString(string)), this,        SLOT(procesServerString(string)));
connect(this, SIGNAL(getStringToThreadSignal(string)), serverthread, SLOT(sendString(string)));

connect(serverthread, SIGNAL(finished()), thread, SLOT(quit()));
connect(serverthread, SIGNAL(finished()), serverthread, SLOT(deleteLater()));
connect(serverthread, SIGNAL(finished()), thread, SLOT(deleteLater()));

thread->start();

}



FortuneThread::FortuneThread(int socketDescriptor, QObject *parent)
: QObject(parent), socketDescriptor(socketDescriptor)
{



}

void FortuneThread::getString()
{

if (!tcpSocket.setSocketDescriptor(socketDescriptor)) {
    emit error(tcpSocket.error());
    cout<<"socket error"<<endl;
    return;
}
//in part
if(!tcpSocket.waitForReadyRead(10000)){

    emit finished();
    return;
}
int joj = tcpSocket.bytesAvailable();
char inbuffer[1024];
tcpSocket.read(inbuffer,1024);
string instring;
instring = inbuffer;
instring.resize(joj);

emit fromThreadString(instring);

}   


void FortuneThread::sendString(string sendoutString)
{       


//out part
char buffer[1024];
int buffer_len = 1024;
int bytecount;

memset(buffer, '\0', buffer_len);


string outstring = sendoutString;



int TempNumOne= (int)outstring.size();

for (int a=0;a<TempNumOne;a++)
    {
        buffer[a]=outstring[a];
    }

QByteArray block;
block = buffer;



tcpSocket.write(block);
tcpSocket.disconnectFromHost();
tcpSocket.waitForDisconnected();
emit finished();
}

это от родителя:

//server start

QHostAddress adr;
adr.setAddress( QString("127.0.0.1") );
adr.toIPv4Address();
quint16 port = 1101;

if (!server.listen( adr, port)) {
  QMessageBox::critical(this, tr("CR_bypasser"),
      tr("Unable to start the server: %1.")
      .arg(server.errorString()));
  close();
  return;
}

QString ipAddress;
ipAddress = server.serverAddress().toString();
statusLabel->setText(tr("The server is running on\n\nIP: %1\nport: %2\n\n"
  "Run the Fortune Client example now.")
  .arg(ipAddress).arg(server.serverPort()));

connect (&server, SIGNAL(procesServerStringToParent(string)), this,  SLOT(procesServerString(string))); 
connect (this, SIGNAL(StringToServer(string)), &server, SLOT(getStringToThread(string))); 

редактировать: что я пытаюсь сделать:

У меня есть клиент (часть игрового движка (Cryengine)), который я создал для отправки строки игровых координат и некоторых других вещей с сокетом, как это было сделано по ссылке, которую я дал ранее. Это работает хорошо. Я получаю данные "127.0.0.1" порт 1101. Теперь мне просто нужно, чтобы эти данные были оценены в моей собственной программе, которая имеет этот класс TableView, внутри которого я могу собирать координаты, которые я получаю из строки, вызывать некоторые данные из координат и затем возвращать эту новую строку обратно через сервер к игровому движку. В игре я нажму на объекты, чтобы получить их coor., Сделаю из них строку (содержащую coor, entityid и т. Д.), Отправлю эту строку на сервер, который возвращает информацию о callculated из TableView. Мне просто нужен этот односторонний поток только один клиент, который отправляет строки. Я не уверен насчет recv (hsock, buffer, buffer_len, 0), я думаю, что узел, отвечающий за отправку строки в игре, будет ожидать возврата строки? Это одна из моих первых программ, я действительно запутался ...

Вам понадобится мьютекс, когда несколько потоков читают одни и те же данные, и один или несколько из них могут изменить эти данные. Пожалуйста, рефакторинг стены кода, который вы разместили, чтобы включить биты, где это может произойти. Также выделите код, в котором отладчик говорит, что проблема в. cmannett85

Ваш Ответ

2   ответа
6

который вы представляете, является примером кодирования грузового культа: вы делаете разные ненужные вещи, по-видимому, в надежде решить проблему.

The Likely Crasher ...

Есть много проблем с кодом, но я думаю, что причина сбоя заключается в следующем:tcpSocket.write(block) не отправляет строку с нулевым символом в конце. Блок заканчивается нулем, но присвоение байтовому массиву не добавляет это нулевое завершение кsize() QByteArray. Следующий код печатает 1, даже если внутри содержимого байтового массива есть нулевой завершающий байт.

QByteArray arr = "a";
qDebug("len=%d", arr.size());

Получающий код ожидает нулевого завершения, но никогда его не получает. Затем вы переходите к назначению буфера с нулевым символом в концеstd::string:

string instring;
instring = inbuffer;
instring.resize(joj);

Последующее изменение размера является культом груза: вы пытаетесь решить проблему послеstd::string & std::string::operator=(const char*) уже прочитал мимо вашего буфера, по всей вероятности.

Делатьnot это означает, что исправление - это правильный путь. Не за что. Правильный способ - удалить написанный вами код и делать это правильно, без множества ненужных заклинаний, которые не помогут.

... and All The Other Problems

Вы попали в ловушку веры в магию, бесконечно увековеченную на различных форумах.

Потоки не являются магическими объектами, которые вы можете просто применить к любой проблеме в надежде, что они помогут. Я не знаю, что заставляет людей думать, что темы магические, но эмпирическое правило таково:If someone tells you "oh, you should try threads", they are most likely wrong. If they tell that in relation to networking, they are pretty much never right, they are unhelpful, and they don't understand your problem at all (neither do you, it seems). Чаще всего темы будутnot помочь, если вы не понимаете свою проблему. Сетевая система Qt является асинхронной: она не блокирует выполнение вашего кода, если вы не используетеwaitxxxx() функции. Кстати, вы не должны их использовать, так что здесь все хорошо. Нет необходимости в миллиарде нитей.

Таким образом, совершенно необязательно запускать новый поток для каждого входящего соединения. Это снизит производительность вашего сервера - особенно если сервер выполняет простую обработку, потому что вы добавляете накладные расходы на переключение контекста и создание / демонтаж потока для каждого соединения. Вам нужно менее 2 потоков на каждое ядро в вашей системе, поэтому используйтеQThread::idealThreadCount() для количества потоков в пуле будет хорошей отправной точкой.

Вы также лишаете себя возможности многопоточности, поскольку вы используете сетевой поток только для получения данных, а затем отправляетеfromThreadString(string) сигнал. Я предполагаю, что сигнал отправляется в основной поток вашего приложения. Теперь это просто глупо, потому что получение множества байтов из сетевого сокета совершенно тривиально. Ваши темы не выполняют никакой работы, вся работа, которую они выполняют, тратится на их создание и удаление.

ниже является простым примером того, как можноcorrectly используйте API Qt для реализации системы клиент-сервер, которая распределяет работу по физическим ядрам в циклическом порядке. Это должно работать довольно хорошо. Пример клиента Fortune, включенный в Qt:very unfortunate indeedпотому что это совершенно неправильный путь.

Что можно заметить, так это:

It's not entirely trivial. Qt could be more helpful, but isn't.

Both the clients and the senders are moved into threads from a thread pool.

Disconnected clients are not deleted, but merely returned to a list of clients kept by the tread pool. They are reused when a client is called for.

QThread is not derived from. QTcpServer is only derived to access the socket handle.

No functions whose name begins with wait() are used. Everything is handled asynchronously.

The ThreadPool keeps a looked-up QMetaMethod for the newConnection(int) slot of the Client. This is faster than using QMetaObject::invokeMethod() as it has to look things up every time.

A timer running in the main thread sets off a signal-slot chain by deleting the first sender. Each senders' deletion triggers the deletion of the next one. Eventually, the last sender sets off the quit() slot in the thread pool. The latter emits the finished() signal when all threads are indeed finished.

#include <QtCore/QCoreApplication>
#include <QtNetwork/QTcpServer>
#include <QtNetwork/QTcpSocket>
#include <QtCore/QQueue>
#include <QtCore/QThread>
#include <QtCore/QTimer>
#include <QtCore/QMetaMethod>

// Processes data on a socket connection
class Client : public QObject
{
    Q_OBJECT
public:
    Client(QObject* parent = 0) : QObject(parent), socket(new QTcpSocket(this))
    {
        connect(socket, SIGNAL(readyRead()), SLOT(newData()));
        connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
                SLOT(newState(QAbstractSocket::SocketState)));
        qDebug("Client()");
    }
    ~Client() { qDebug("~Client()"); }
signals:
    void done();
public slots:
    void newConnection(int descriptor) {
        socket->setSocketDescriptor(descriptor);
    }
private slots:
    void newData() {
        QByteArray data = socket->readAll();
        if (0) qDebug("got %d bytes", data.size());
        if (0) qDebug("got a string %s", data.constData());
        // here we can process the data
    }
    void newState(QAbstractSocket::SocketState state) {
        qDebug("client new state %d", state);
        if (state == QAbstractSocket::UnconnectedState) { emit done(); }
    }
protected:
    QTcpSocket* socket;
    int descriptor;
};

// Connects to a client and sends data to it
class Sender : public QObject
{
    Q_OBJECT
public:
    Sender(const QString & address, quint16 port, QObject * parent = 0) :
        QObject(parent), socket(new QTcpSocket(this)),
        bytesInFlight(0), maxBytesInFlight(65536*8)
    {
        connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
                SLOT(newState(QAbstractSocket::SocketState)));
        connect(socket, SIGNAL(bytesWritten(qint64)), SLOT(sentData(qint64)));
        socket->connectToHost(address, port);
        qDebug("Sender()");
    }
    ~Sender() { qDebug("~Sender()"); }
protected:
    // sends enough data to keep a maximum number of bytes in flight
    void sendData() {
        qint64 n = maxBytesInFlight - bytesInFlight;
        if (n <= 0) return;
        bytesInFlight += n;
        socket->write(QByteArray(n, 44)); // 44 is the answer, after all
    }
protected slots:
    void sentData(qint64 n) {
        bytesInFlight -= n;
        Q_ASSERT(bytesInFlight >= 0);
        sendData();
    }
    void newState(QAbstractSocket::SocketState state) {
        qDebug("sender new state %d", state);
        if (state == QAbstractSocket::ConnectedState) sendData();
    }
protected:
    QTcpSocket* socket;
    qint64 bytesInFlight;
    qint64 maxBytesInFlight;
};

// Keeps track of threads and client objects
class ThreadPool : public QTcpServer
{
    Q_OBJECT
public:
    ThreadPool(QObject* parent = 0) : QTcpServer(parent), nextThread(0) {
        for (int i=0; i < QThread::idealThreadCount(); ++i) {
            QThread * thread = new QThread(this);
            connect(thread, SIGNAL(finished()), SLOT(threadDone()));
            thread->start();
            threads << thread;
        }
        const QMetaObject & mo = Client::staticMetaObject;
        int idx = mo.indexOfMethod("newConnection(int)");
        Q_ASSERT(idx>=0);
        method = mo.method(idx);
    }
    void poolObject(QObject* obj) const {
        if (nextThread >= threads.count()) nextThread = 0;
        QThread* thread = threads.at(nextThread);
        obj->moveToThread(thread);
    }
protected:
    void incomingConnection(int descriptor) {
        Client * client;
        if (threads.isEmpty()) return;
        if (! clients.isEmpty()) {
            client = clients.dequeue();
        } else {
            client = new Client();
            connect(client, SIGNAL(done()), SLOT(clientDone()));
        }
        poolObject(client);
        method.invoke(client, Q_ARG(int, descriptor));
    }
signals:
    void finished();
public slots:
    void quit() {
        foreach (QThread * thread, threads) { thread->quit(); }
    }
private slots:
    void clientDone() {
        clients.removeAll(qobject_cast<Client*>(sender()));
    }
    void threadDone() {
        QThread * thread = qobject_cast<QThread*>(sender());
        if (threads.removeAll(thread)) delete thread;
        if (threads.isEmpty()) emit finished();
    }
private:
    QList<QThread*> threads;
    QQueue<Client*> clients;
    QMetaMethod method;
    mutable int nextThread;
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    ThreadPool server;
    if (!server.listen(QHostAddress::Any, 1101)) qCritical("cannot establish a listening server");
    const int senderCount = 10;
    Sender *prevSender = 0, *firstSender = 0;
    for (int i = 0; i < senderCount; ++ i) {
        Sender * sender = new Sender("localhost", server.serverPort());
        server.poolObject(sender);
        if (!firstSender) firstSender = sender;
        if (prevSender) sender->connect(prevSender, SIGNAL(destroyed()), SLOT(deleteLater()));
        prevSender = sender;
    }
    QTimer::singleShot(3000, firstSender, SLOT(deleteLater())); // run for 3s
    server.connect(prevSender, SIGNAL(destroyed()), SLOT(quit()));
    qApp->connect(&server, SIGNAL(finished()), SLOT(quit()));
    // Deletion chain: timeout deletes first sender, then subsequent senders are deleted,
    // finally the last sender tells the thread pool to quit. Finally, the thread pool
    // quits the application.
    return a.exec();
}

#include "main.moc"

Учитывая ваше объяснение, игровой движок запускается и создает соединение с каким-либо портом на localhost. Предполагается, что ваша программа Qt принимает это соединение через порт 1101, принимает некоторые строки, обрабатывает их, а затем отправляет обратно.

изменен, чтобы принять соединение с фиксированным номером порта. Вся обработка данных, включая отправку ответа, должна быть выполнена изnewData() слот. Вы также можете передать эти данные в другой поток, если ваши вычисления очень сложны. Под комплексом я подразумеваю десятки тысяч операций, таких как сложения и умножения, или тысячи триггерных операций.

Sender класс там просто в качестве примера. Разумеется, ваш игровой движок выполняет отправку, поэтому вам не нужен класс Sender.

Спасибо, я посмотрю на это! Может быть, я должен также добавить код для клиента. Его взяли отсюда: [ссылка]tidytutorials.com/2009/12/… pazduha
Я немного запутался, можете ли вы привести пример, как получить это нулевое завершение в QByteArray? ти pazduha
Почему вы хотите пойти по такому обходному пути? Помните, что, по крайней мере, в этом примере сетевые потоки ужасно недоиспользуются - лучше было бы дать им еще немного работы. Что именно вы пытаетесь сделать? Это может дать мне больше понимания для правильной реализации. Укажите свою проблему, а не предполагаемое решение.
Я постараюсь реализовать ваш пример в моем коде. Мне было интересно, если вы можете показать, как лучше всего передать строку от клиента к основному и от основного обратно в отправителя. Ty pazduha
user1220769: он уже существует, но вы не можете использовать QByteArray напрямую. Вы должны сделать что-то вроде:socket.write(byteArray.constData(), byteArray.size()+1).
0

. Я думаю, что эта часть была там, где была ошибка:

//removed
tcpSocket.disconnectFromHost();
tcpSocket.waitForDisconnected();
emit finished();

...

#include <stdlib.h>
#include <QtNetwork>
#include "MeshServer.hh"
#include <iostream>
#include "TableView.hh"

using namespace  std;

FortuneServer::FortuneServer(QObject *parent)
: QTcpServer(parent)
{

}

void FortuneServer::procesServerString(string serverString){

emit procesServerStringToParent(serverString);

}
void FortuneServer::getStringToThread(string serverString){

emit getStringToThreadSignal(serverString);

}

void FortuneServer::incomingConnection(int socketDescriptor)
{


FortuneThread *serverthread = new FortuneThread(socketDescriptor, this);
//connect(&serverthread, SIGNAL(finished()), &serverthread, SLOT(deleteLater()));


QThread* thread = new QThread;

serverthread->moveToThread(thread);




connect(serverthread, SIGNAL(fromThreadString(string)), this, SLOT(procesServerString(string)));
connect(this, SIGNAL(getStringToThreadSignal(string)), serverthread, SLOT(sendString(string)));
connect(serverthread, SIGNAL(finished()), thread, SLOT(quit()));
connect(serverthread, SIGNAL(finished()), serverthread, SLOT(deleteLater()));
connect(serverthread, SIGNAL(finished()), thread, SLOT(deleteLater()));

thread->start();

}



FortuneThread::FortuneThread(int socketDescriptor, QObject *parent): QObject(parent),   socketDescriptor(socketDescriptor)

{
if (!tcpSocket.setSocketDescriptor(socketDescriptor)) {
    emit error(tcpSocket.error());
    cout<<"socket error"<<endl;
    emit finished();
    return;
}




connect(&tcpSocket, SIGNAL(readyRead()), this, SLOT(getString()));
//connect(&tcpSocket, SIGNAL(disconnected()), this, SLOT(ondisconnected()));

}

void FortuneThread::getString()
{

int joj = tcpSocket.bytesAvailable();
if(joj==0){
    tcpSocket.disconnectFromHost();
    emit finished();
    return;
}
char inbuffer[1024];
int buffer_len = 1024;
memset(inbuffer, '\0', buffer_len);
tcpSocket.read(i,nbuffer,1024);
string instring;
instring = inbuffer;
instring.resize(joj);

emit fromThreadString(instring);

}   


void FortuneThread::sendString(string sendoutString)
{   
char buffer2[1024];
int buffer_len = 1024;
memset(buffer2, '\0', buffer_len);
strcat(buffer2,sendoutString.c_str());

tcpSocket.write(buffer2,buffer_len);

 }

void FortuneThread::ondisconnected()
{


emit finished();


}

Похожие вопросы