Clean up polling logic

* Extracted PollFD manipulation and nonblocking socket configuration
into helper functions
* Replaced the connections list with an unordered_map
* Dynamically grow the number of PollFD structures with realloc()

With these changes done, the server's CPU usage is completely diminished
from its old average of ~47% to ~0.07%, with occasional spikes up to ~14%.
This commit is contained in:
dongresource 2020-12-05 04:58:49 +01:00
parent 269315ca09
commit ec7cba644c
2 changed files with 105 additions and 88 deletions

View File

@ -214,6 +214,27 @@ void CNSocket::step() {
} }
} }
static bool setSockNonblocking(SOCKET listener, SOCKET newSock) {
#ifdef _WIN32
unsigned long mode = 1;
if (ioctlsocket(newSock, FIONBIO, &mode) != 0) {
#else
if (fcntl(newSock, F_SETFL, (fcntl(listener, F_GETFL, 0) | O_NONBLOCK)) != 0) {
#endif
std::cerr << "[WARN] OpenFusion: fcntl failed on new connection" << std::endl;
#ifdef _WIN32
shutdown(newSock, SD_BOTH);
closesocket(newSock);
#else
shutdown(newSock, SHUT_RDWR);
close(newSock);
#endif
return false;
}
return true;
}
// ========================================================[[ CNServer ]]======================================================== // ========================================================[[ CNServer ]]========================================================
void CNServer::init() { void CNServer::init() {
@ -261,29 +282,65 @@ void CNServer::init() {
std::cerr << "[FATAL] OpenFusion: fcntl failed" << std::endl; std::cerr << "[FATAL] OpenFusion: fcntl failed" << std::endl;
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// poll() configuration
fdsSize = STARTFDSCOUNT * sizeof(PollFD); // won't overflow
fds = (PollFD*)xmalloc(fdsSize);
nfds = 1;
fds[0].fd = sock;
fds[0].events = POLLIN;
} }
CNServer::CNServer() {}; CNServer::CNServer() {};
CNServer::CNServer(uint16_t p): port(p) {} CNServer::CNServer(uint16_t p): port(p) {}
void CNServer::addPollFD(SOCKET s) {
// if the array is full, double its size
if (nfds == fdsSize / sizeof(PollFD)) {
size_t oldsize = fdsSize;
// check for multiplication overflow
assert(fdsSize < SIZE_MAX / 2);
fdsSize *= 2;
fds = (PollFD*)realloc(fds, fdsSize);
if (fds == NULL) {
std::cerr << "[FATAL] OpenFusion: out of memory!" << std::endl;
exit(EXIT_FAILURE);
}
// initialize newly allocated area
memset(((uint8_t*)fds) + oldsize, 0, oldsize);
}
fds[nfds].fd = s;
fds[nfds].events = POLLIN;
nfds++;
}
void CNServer::removePollFD(int i) {
nfds--;
assert(nfds > 0);
if (i != nfds) {
// move the last entry to the new empty spot
fds[i].fd = fds[nfds].fd;
fds[i].events = fds[nfds].events; // redundant; events are always the same
}
// delete the entry
fds[nfds].fd = 0;
fds[nfds].events = 0;
}
void CNServer::start() { void CNServer::start() {
int nfds = 1, oldnfds; int oldnfds;
PollFD fds[30]; // TODO: dynamically grow
memset(&fds, 0, sizeof(fds));
// listener socket
fds[0].fd = sock;
fds[0].events = POLLIN;
std::cout << "Starting server at *:" << port << std::endl; std::cout << "Starting server at *:" << port << std::endl;
// listen to new connections, add to connection list
while (active) { while (active) {
// the timeout is to ensure shard timers are ticking // the timeout is to ensure shard timers are ticking
//std::cout << "pre-poll\n"; int n = poll(fds, nfds, 200);
int n = poll((PollFD*)&fds, nfds, 200);
//if (n > 0)
// std::cout << "poll returned " << n << std::endl;
if (SOCKETERROR(n)) { if (SOCKETERROR(n)) {
std::cout << "[FATAL] poll() returned error" << std::endl; std::cout << "[FATAL] poll() returned error" << std::endl;
terminate(0); terminate(0);
@ -297,6 +354,8 @@ void CNServer::start() {
if (fds[i].revents == 0) if (fds[i].revents == 0)
continue; // nothing in this one; don't decrement n continue; // nothing in this one; don't decrement n
n--;
// is it the listener? // is it the listener?
if (fds[i].fd == sock) { if (fds[i].fd == sock) {
// any sort of error on the listener // any sort of error on the listener
@ -307,93 +366,46 @@ void CNServer::start() {
} }
SOCKET newConnectionSocket = accept(sock, (struct sockaddr *)&address, (socklen_t*)&addressSize); SOCKET newConnectionSocket = accept(sock, (struct sockaddr *)&address, (socklen_t*)&addressSize);
if (SOCKETINVALID(newConnectionSocket)) { if (SOCKETINVALID(newConnectionSocket))
n--;
continue; continue;
}
// set it to non-blocking mode if (!setSockNonblocking(sock, newConnectionSocket))
#ifdef _WIN32
unsigned long mode = 1;
if (ioctlsocket(newConnectionSocket, FIONBIO, &mode) != 0) {
#else
if (fcntl(newConnectionSocket, F_SETFL, (fcntl(sock, F_GETFL, 0) | O_NONBLOCK)) != 0) {
#endif
std::cerr << "[WARN] OpenFusion: fcntl failed on new connection" << std::endl;
#ifdef _WIN32
shutdown(newConnectionSocket, SD_BOTH);
closesocket(newConnectionSocket);
#else
shutdown(newConnectionSocket, SHUT_RDWR);
close(newConnectionSocket);
#endif
n--;
continue; continue;
}
std::cout << "New connection! " << inet_ntoa(address.sin_addr) << std::endl; std::cout << "New connection! " << inet_ntoa(address.sin_addr) << std::endl;
// add to pollfds addPollFD(newConnectionSocket);
assert(nfds < 30); // XXX
fds[nfds].fd = newConnectionSocket;
fds[nfds].events = POLLIN;
nfds++;
std::cout << "in thread " << (void*) this << " nfds is now " << nfds << std::endl;
// add connection to list! // add connection to list!
CNSocket* tmp = new CNSocket(newConnectionSocket, pHandler); CNSocket* tmp = new CNSocket(newConnectionSocket, pHandler);
connections.push_back(tmp); connections[newConnectionSocket] = tmp;
newConnection(tmp); newConnection(tmp);
} else { } else {
// player sockets // player sockets
std::list<CNSocket*>::iterator it = connections.begin(); if (connections.find(fds[i].fd) == connections.end()) {
while (it != connections.end()) { std::cout << "[WARN] Event on non-existant socket?" << std::endl;
CNSocket* cSock = *it; continue; // just to be safe
if (fds[i].fd != cSock->sock) {
// not the socket we're looking for; check the next one
it++;
continue;
} }
// kill the socket on error CNSocket* cSock = connections[fds[i].fd];
if (fds[i].revents & ~POLLIN) {
std::cout << "Killing socket at fds[" << i << "]\n"; // kill the socket on hangup/error
if (fds[i].revents & ~POLLIN)
cSock->kill(); cSock->kill();
}
if (cSock->isAlive()) { if (cSock->isAlive()) {
cSock->step(); cSock->step();
++it; // go to the next element
} else { } else {
killConnection(cSock); killConnection(cSock);
connections.erase(it++); connections.erase(fds[i].fd);
delete cSock; delete cSock;
nfds--; removePollFD(i);
assert(nfds > 0);
std::cout << "in thread " << (void*) this << " nfds is now " << nfds << std::endl;
if (i != nfds) {
// move the last entry to the new empty spot
fds[i].fd = fds[nfds].fd;
fds[i].events = fds[nfds].events; // redundant; events are always the same
}
// delete the entry
fds[nfds].fd = 0;
fds[nfds].events = 0;
break;
} }
} }
} }
n--;
}
onStep(); onStep();
activeCrit.unlock(); activeCrit.unlock();
} }
@ -404,15 +416,11 @@ void CNServer::kill() {
active = false; active = false;
// kill all connections // kill all connections
std::list<CNSocket*>::iterator i = connections.begin(); for (auto& pair : connections) {
while (i != connections.end()) { CNSocket *cSock = pair.second;
CNSocket* cSock = *i; if (cSock->isAlive())
if (cSock->isAlive()) {
cSock->kill(); cSock->kill();
}
++i; // go to the next element
delete cSock; delete cSock;
} }

View File

@ -47,6 +47,7 @@
#include <csignal> #include <csignal>
#include <list> #include <list>
#include <queue> #include <queue>
#include <unordered_map>
#include "Defines.hpp" #include "Defines.hpp"
#include "settings.hpp" #include "settings.hpp"
@ -71,7 +72,7 @@ inline void* xmalloc(size_t sz) {
void* res = calloc(1, sz); void* res = calloc(1, sz);
if (res == NULL) { if (res == NULL) {
std::cerr << "[FATAL] OpenFusion: calloc failed to allocate memory!" << std::endl; std::cerr << "[FATAL] OpenFusion: out of memory!" << std::endl;
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -192,9 +193,14 @@ struct TimerEvent {
// in charge of accepting new connections and making sure each connection is kept alive // in charge of accepting new connections and making sure each connection is kept alive
class CNServer { class CNServer {
protected: protected:
std::list<CNSocket*> connections; std::unordered_map<SOCKET, CNSocket*> connections;
std::mutex activeCrit; std::mutex activeCrit;
const size_t STARTFDSCOUNT = 8; // number of initial PollFD slots
size_t fdsSize; // size of PollFD array in bytes
int nfds; // number of populated PollFD slots
PollFD *fds;
SOCKET sock; SOCKET sock;
uint16_t port; uint16_t port;
socklen_t addressSize; socklen_t addressSize;
@ -209,6 +215,9 @@ public:
CNServer(); CNServer();
CNServer(uint16_t p); CNServer(uint16_t p);
void addPollFD(SOCKET s);
void removePollFD(int i);
void start(); void start();
void kill(); void kill();
static void printPacket(CNPacketData *data, int type); static void printPacket(CNPacketData *data, int type);