1
0
mirror of https://github.com/CPunch/Laika.git synced 2024-11-21 20:40:05 +00:00

Lib: Minor refactoring, boilerplate content packet handlers

- content contexts now have events
- minor comments
This commit is contained in:
CPunch 2022-05-18 12:04:19 -05:00
parent 83002faa62
commit 3e60cc3c0f
5 changed files with 130 additions and 80 deletions

View File

@ -37,5 +37,7 @@ struct sLaika_socket;
struct sLaika_pollList; struct sLaika_pollList;
struct sLaika_task; struct sLaika_task;
struct sLaika_taskService; struct sLaika_taskService;
struct sLaika_content;
struct sLaika_contentContext;
#endif #endif

View File

@ -8,19 +8,28 @@
#include <inttypes.h> #include <inttypes.h>
enum { enum {
CONTENT_FILE, CONTENT_IN = true, /* being recv'd from peer */
CONTENT_PAYLOAD CONTENT_OUT = false /* being sent to peer */
};
enum {
CONTENT_TYPE_FILE
}; };
enum { enum {
CONTENT_ERR_ID_IN_USE, CONTENT_ERR_ID_IN_USE,
CONTENT_ERR_INVALID_ID CONTENT_ERR_INVALID_ID,
CONTENT_ERR_REJECTED
}; };
typedef uint8_t CONTENT_TYPE; typedef uint8_t CONTENT_TYPE;
typedef uint8_t CONTENT_ERRCODE; typedef uint8_t CONTENT_ERRCODE;
typedef uint16_t CONTENT_ID; typedef uint16_t CONTENT_ID;
typedef void (*contentRecvEvent)(struct sLaika_contentContext *context, struct sLaika_content *content);
typedef bool (*contentNewEvent)(struct sLaika_contentContext *context, struct sLaika_content *content);
typedef void (*contentErrorEvent)(struct sLaika_contentContext *context, struct sLaika_content *content, CONTENT_ERRCODE err);
struct sLaika_content { struct sLaika_content {
struct sLaika_content *next; struct sLaika_content *next;
FILE *fd; FILE *fd;
@ -28,22 +37,28 @@ struct sLaika_content {
uint32_t processed; uint32_t processed;
CONTENT_ID id; CONTENT_ID id;
CONTENT_TYPE type; CONTENT_TYPE type;
bool direction;
}; };
struct sLaika_contentContext { struct sLaika_contentContext {
struct sLaika_content *headIn; /* receiving from peer */ struct sLaika_content *head;
struct sLaika_content *headOut; /* sending to peer */
CONTENT_ID nextID; CONTENT_ID nextID;
contentRecvEvent onReceived;
contentNewEvent onNew;
contentErrorEvent onError;
}; };
void laikaF_initContext(struct sLaika_contentContext *context); void laikaF_initContext(struct sLaika_contentContext *context);
void laikaF_cleanContext(struct sLaika_contentContext *context); void laikaF_cleanContext(struct sLaika_contentContext *context);
int laikaF_newOutContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type); void laikaF_setupEvents(struct sLaika_contentContext *context, contentRecvEvent onRecv, contentNewEvent onNew, contentErrorEvent onError);
void laikaF_newInContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, CONTENT_TYPE type);
int laikaF_sendContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type);
void laikaF_pollContent(struct sLaika_peer *peer); void laikaF_pollContent(struct sLaika_peer *peer);
void laikaF_handleContentNew(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData);
void laikaF_handleContentError(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData);
void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData); void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData);
#endif #endif

View File

@ -8,6 +8,8 @@
#define CONTENTCHUNK_MAX_BODY (LAIKA_MAX_PKTSIZE-sizeof(CONTENT_ID)) #define CONTENTCHUNK_MAX_BODY (LAIKA_MAX_PKTSIZE-sizeof(CONTENT_ID))
/* ===========================================[[ Helper Functions ]]============================================= */
size_t getSize(FILE *fd) { size_t getSize(FILE *fd) {
size_t sz; size_t sz;
fseek(fd, 0L, SEEK_END); fseek(fd, 0L, SEEK_END);
@ -17,7 +19,7 @@ size_t getSize(FILE *fd) {
} }
struct sLaika_content* getContentByID(struct sLaika_contentContext *context, CONTENT_ID id) { struct sLaika_content* getContentByID(struct sLaika_contentContext *context, CONTENT_ID id) {
struct sLaika_content *curr = context->headIn; struct sLaika_content *curr = context->head;
while (curr) { while (curr) {
if (curr->id == id) if (curr->id == id)
@ -34,8 +36,8 @@ void freeContent(struct sLaika_content *content) {
laikaM_free(content); laikaM_free(content);
} }
void rmvFromOut(struct sLaika_contentContext *context, struct sLaika_content *content) { void rmvContent(struct sLaika_contentContext *context, struct sLaika_content *content) {
struct sLaika_content *last = NULL, *curr = context->headOut; struct sLaika_content *last = NULL, *curr = context->head;
while (curr) { while (curr) {
/* if found, remove it! */ /* if found, remove it! */
@ -43,27 +45,7 @@ void rmvFromOut(struct sLaika_contentContext *context, struct sLaika_content *co
if (last) if (last)
last->next = curr->next; last->next = curr->next;
else else
context->headOut = curr->next; context->head = curr->next;
freeContent(curr);
break;
}
last = curr;
curr = curr->next;
}
}
void rmvFromIn(struct sLaika_contentContext *context, struct sLaika_content *content) {
struct sLaika_content *last = NULL, *curr = context->headIn;
while (curr) {
/* if found, remove it! */
if (curr == content) {
if (last)
last->next = curr->next;
else
context->headIn = curr->next;
freeContent(curr); freeContent(curr);
break; break;
@ -81,25 +63,22 @@ void sendContentError(struct sLaika_peer *peer, CONTENT_ID id, CONTENT_ERRCODE e
laikaS_endOutPacket(peer); laikaS_endOutPacket(peer);
} }
/* ==============================================[[ Content API ]]=============================================== */
void laikaF_initContext(struct sLaika_contentContext *context) { void laikaF_initContext(struct sLaika_contentContext *context) {
context->headIn = NULL; context->head = NULL;
context->headOut = NULL;
context->nextID = 0; context->nextID = 0;
context->onReceived = NULL;
context->onNew = NULL;
context->onError = NULL;
} }
void laikaF_cleanContext(struct sLaika_contentContext *context) { void laikaF_cleanContext(struct sLaika_contentContext *context) {
struct sLaika_content *tmp, *curr; struct sLaika_content *tmp, *curr;
/* free IN list */ /* free content list */
curr = context->headIn; curr = context->head;
while (curr) {
tmp = curr->next;
freeContent(curr);
curr = tmp;
}
/* free OUT list */
curr = context->headOut;
while (curr) { while (curr) {
tmp = curr->next; tmp = curr->next;
freeContent(curr); freeContent(curr);
@ -107,20 +86,33 @@ void laikaF_cleanContext(struct sLaika_contentContext *context) {
} }
} }
int laikaF_newOutContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type) { void laikaF_setupEvents(struct sLaika_contentContext *context, contentRecvEvent onRecv, contentNewEvent onNew, contentErrorEvent onError) {
context->onReceived = onRecv;
context->onNew = onNew;
context->onError = onError;
}
struct sLaika_content* laikaF_newContent(struct sLaika_contentContext *context, FILE *fd, size_t sz, CONTENT_ID id, CONTENT_TYPE type, bool direction) {
struct sLaika_content *content = (struct sLaika_content*)laikaM_malloc(sizeof(struct sLaika_content)); struct sLaika_content *content = (struct sLaika_content*)laikaM_malloc(sizeof(struct sLaika_content));
struct sLaika_contentContext *context = &peer->context;
/* init content struct */ /* init content struct */
content->fd = fd; content->fd = fd;
content->sz = getSize(fd); content->sz = sz;
content->processed = 0; content->processed = 0;
content->id = context->nextID++; content->id = id;
content->type = type; content->type = type;
content->direction = direction;
/* add to list */ /* add to list */
content->next = context->headOut; content->next = context->head;
context->headOut = content; context->head = content;
return content;
}
int laikaF_sendContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type) {
struct sLaika_contentContext *context = &peer->context;
struct sLaika_content *content = laikaF_newContent(context, fd, getSize(fd), context->nextID++, type, CONTENT_OUT);
/* let the peer know we're sending them some content */ /* let the peer know we're sending them some content */
laikaS_startOutPacket(peer, LAIKAPKT_CONTENT_NEW); laikaS_startOutPacket(peer, LAIKAPKT_CONTENT_NEW);
@ -128,11 +120,12 @@ int laikaF_newOutContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type)
laikaS_writeInt(&peer->sock, &content->sz, sizeof(uint32_t)); laikaS_writeInt(&peer->sock, &content->sz, sizeof(uint32_t));
laikaS_writeByte(&peer->sock, type); laikaS_writeByte(&peer->sock, type);
laikaS_endOutPacket(peer); laikaS_endOutPacket(peer);
return content->id;
} }
/* new content we're recieving from a peer */ /* new content we're recieving from a peer */
void laikaF_newInContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, CONTENT_TYPE type) { struct sLaika_content* laikaF_recvContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, CONTENT_TYPE type) {
struct sLaika_content *content = (struct sLaika_content*)laikaM_malloc(sizeof(struct sLaika_content));
struct sLaika_contentContext *context = &peer->context; struct sLaika_contentContext *context = &peer->context;
if (getContentByID(context, id)) { if (getContentByID(context, id)) {
@ -140,46 +133,78 @@ void laikaF_newInContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, C
LAIKA_ERROR("ID [%d] is in use!\n", id); LAIKA_ERROR("ID [%d] is in use!\n", id);
} }
content->fd = tmpfile(); return laikaF_newContent(context, tmpfile(), sz, id, type, CONTENT_IN);
content->sz = sz;
content->processed = 0;
content->id = id;
content->type = type;
/* add to list */
content->next = context->headIn;
context->headIn = content;
} }
void laikaF_pollContent(struct sLaika_peer *peer) { void laikaF_pollContent(struct sLaika_peer *peer) {
uint8_t buff[CONTENTCHUNK_MAX_BODY]; uint8_t buff[CONTENTCHUNK_MAX_BODY];
struct sLaika_contentContext *context = &peer->context; struct sLaika_contentContext *context = &peer->context;
struct sLaika_content *tmp, *curr = context->headOut; struct sLaika_content *tmp, *curr = context->head;
int rd; int rd;
/* traverse our out content, sending each chunk */ /* traverse our out content, sending each chunk */
while (curr) { while (curr) {
/* if we've reached the end of the file stream, remove it! */ if (curr->direction == CONTENT_OUT) {
if (rd = fread(buff, sizeof(uint8_t), MIN(curr->sz - curr->processed, CONTENTCHUNK_MAX_BODY), curr->fd) == 0) { /* if we've reached the end of the file stream, remove it! */
tmp = curr->next; if (rd = fread(buff, sizeof(uint8_t), MIN(curr->sz - curr->processed, CONTENTCHUNK_MAX_BODY), curr->fd) == 0) {
rmvFromOut(context, curr); tmp = curr->next;
curr = tmp; rmvContent(context, curr);
continue; curr = tmp;
continue;
}
/* send chunk */
laikaS_startVarPacket(peer, LAIKAPKT_CONTENT_CHUNK);
laikaS_writeInt(&peer->sock, &curr->id, sizeof(CONTENT_ID));
laikaS_write(&peer->sock, buff, rd);
laikaS_endVarPacket(peer);
curr->processed += rd;
} }
/* send chunk */
laikaS_startVarPacket(peer, LAIKAPKT_CONTENT_CHUNK);
laikaS_writeInt(&peer->sock, &curr->id, sizeof(CONTENT_ID));
laikaS_write(&peer->sock, buff, rd);
laikaS_endVarPacket(peer);
curr->processed += rd;
curr = curr->next; curr = curr->next;
} }
} }
/* ============================================[[ Packet Handlers ]]============================================= */ /* ============================================[[ Packet Handlers ]]============================================= */
void laikaF_handleContentNew(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData) {
struct sLaika_contentContext *context = &peer->context;
struct sLaika_content *content;
uint32_t contentSize;
CONTENT_ID contentID;
CONTENT_TYPE contentType;
laikaS_readInt(&peer->sock, &contentID, sizeof(CONTENT_ID));
laikaS_readInt(&peer->sock, &contentSize, sizeof(uint32_t));
contentType = laikaS_readByte(&peer->sock);
content = laikaF_recvContent(peer, contentID, contentSize, contentType);
if (context->onNew && !context->onNew(context, content)) {
sendContentError(peer, contentID, CONTENT_ERR_REJECTED);
rmvContent(context, content);
}
}
void laikaF_handleContentError(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData) {
struct sLaika_contentContext *context = &peer->context;
struct sLaika_content *content;
CONTENT_ID contentID;
uint8_t errCode;
laikaS_readInt(&peer->sock, &contentID, sizeof(CONTENT_ID));
errCode = laikaS_readByte(&peer->sock);
if ((content = getContentByID(context, contentID)) == NULL)
LAIKA_ERROR("Received error for non-existant id %d!\n", coitentID);
LAIKA_DEBUG("We received an errcode for id %d, err: %d\n", contentID, errCode);
if (context->onError) /* check if event exists! */
context->onError(context, content, errCode);
rmvContent(context, content);
}
void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData) { void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData) {
uint8_t buff[CONTENTCHUNK_MAX_BODY]; uint8_t buff[CONTENTCHUNK_MAX_BODY];
struct sLaika_contentContext *context = &peer->context; struct sLaika_contentContext *context = &peer->context;
@ -192,15 +217,15 @@ void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void
/* read and sanity check id */ /* read and sanity check id */
laikaS_readInt(&peer->sock, &id, sizeof(CONTENT_ID)); laikaS_readInt(&peer->sock, &id, sizeof(CONTENT_ID));
if ((content = getContentByID(context, id)) == NULL) if ((content = getContentByID(context, id)) == NULL || content->direction != CONTENT_IN)
LAIKA_ERROR("chunk recieved with invalid id! [%d]\n", id); LAIKA_ERROR("chunk recieved with invalid id! [%d]\n", id);
/* read data & write to file */ /* read data & write to file */
laikaS_read(&peer->sock, buff, bodySz); laikaS_read(&peer->sock, buff, bodySz);
if (fwrite(buff, sizeof(uint8_t), bodySz, content->fd) != bodySz) { if (fwrite(buff, sizeof(uint8_t), bodySz, content->fd) != bodySz) {
rmvFromIn(context, content); rmvContent(context, content);
} else { } else if ((content->processed += bodySz) == content->sz) {
/* TODO: if content->processed == content->sz then handle full file received event */ if (context->onReceived) /* check if event exists! */
content->processed += bodySz; context->onReceived(context, content);
} }
} }

View File

@ -39,6 +39,8 @@ void laikaS_freePeer(struct sLaika_peer *peer) {
laikaM_free(peer); laikaM_free(peer);
} }
/* ===========================================[[ Start/End Packets ]]============================================ */
void laikaS_emptyOutPacket(struct sLaika_peer *peer, LAIKAPKT_ID id) { void laikaS_emptyOutPacket(struct sLaika_peer *peer, LAIKAPKT_ID id) {
struct sLaika_socket *sock = &peer->sock; struct sLaika_socket *sock = &peer->sock;
@ -158,6 +160,8 @@ void laikaS_setSecure(struct sLaika_peer *peer, bool flag) {
peer->useSecure = flag; peer->useSecure = flag;
} }
/* ===========================================[[ Handle Poll Events ]]=========================================== */
bool laikaS_handlePeerIn(struct sLaika_socket *sock) { bool laikaS_handlePeerIn(struct sLaika_socket *sock) {
struct sLaika_peer *peer = (struct sLaika_peer*)sock; struct sLaika_peer *peer = (struct sLaika_peer*)sock;
int recvd; int recvd;

View File

@ -2,6 +2,8 @@
#include "lmem.h" #include "lmem.h"
#include "lpolllist.h" #include "lpolllist.h"
/* ===========================================[[ Helper Functions ]]============================================= */
typedef struct sLaika_hashMapElem { typedef struct sLaika_hashMapElem {
SOCKET fd; SOCKET fd;
struct sLaika_socket *sock; struct sLaika_socket *sock;
@ -19,6 +21,8 @@ uint64_t elem_hash(const void *item, uint64_t seed0, uint64_t seed1) {
return (uint64_t)(u->fd); return (uint64_t)(u->fd);
} }
/* ==============================================[[ PollList API ]]============================================== */
void laikaP_initPList(struct sLaika_pollList *pList) { void laikaP_initPList(struct sLaika_pollList *pList) {
laikaS_init(); laikaS_init();