diff --git a/lib/include/laika.h b/lib/include/laika.h index 21d24ed..9c0ee73 100644 --- a/lib/include/laika.h +++ b/lib/include/laika.h @@ -37,5 +37,7 @@ struct sLaika_socket; struct sLaika_pollList; struct sLaika_task; struct sLaika_taskService; +struct sLaika_content; +struct sLaika_contentContext; #endif \ No newline at end of file diff --git a/lib/include/lcontent.h b/lib/include/lcontent.h index 3c76d15..5997707 100644 --- a/lib/include/lcontent.h +++ b/lib/include/lcontent.h @@ -8,19 +8,28 @@ #include enum { - CONTENT_FILE, - CONTENT_PAYLOAD + CONTENT_IN = true, /* being recv'd from peer */ + CONTENT_OUT = false /* being sent to peer */ +}; + +enum { + CONTENT_TYPE_FILE }; enum { CONTENT_ERR_ID_IN_USE, - CONTENT_ERR_INVALID_ID + CONTENT_ERR_INVALID_ID, + CONTENT_ERR_REJECTED }; typedef uint8_t CONTENT_TYPE; typedef uint8_t CONTENT_ERRCODE; typedef uint16_t CONTENT_ID; +typedef void (*contentRecvEvent)(struct sLaika_contentContext *context, struct sLaika_content *content); +typedef bool (*contentNewEvent)(struct sLaika_contentContext *context, struct sLaika_content *content); +typedef void (*contentErrorEvent)(struct sLaika_contentContext *context, struct sLaika_content *content, CONTENT_ERRCODE err); + struct sLaika_content { struct sLaika_content *next; FILE *fd; @@ -28,22 +37,28 @@ struct sLaika_content { uint32_t processed; CONTENT_ID id; CONTENT_TYPE type; + bool direction; }; struct sLaika_contentContext { - struct sLaika_content *headIn; /* receiving from peer */ - struct sLaika_content *headOut; /* sending to peer */ + struct sLaika_content *head; CONTENT_ID nextID; + contentRecvEvent onReceived; + contentNewEvent onNew; + contentErrorEvent onError; }; void laikaF_initContext(struct sLaika_contentContext *context); void laikaF_cleanContext(struct sLaika_contentContext *context); -int laikaF_newOutContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type); -void laikaF_newInContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, CONTENT_TYPE type); +void laikaF_setupEvents(struct sLaika_contentContext *context, contentRecvEvent onRecv, contentNewEvent onNew, contentErrorEvent onError); + +int laikaF_sendContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type); void laikaF_pollContent(struct sLaika_peer *peer); +void laikaF_handleContentNew(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData); +void laikaF_handleContentError(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData); void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData); #endif \ No newline at end of file diff --git a/lib/src/lcontent.c b/lib/src/lcontent.c index c2b59b7..2643856 100644 --- a/lib/src/lcontent.c +++ b/lib/src/lcontent.c @@ -8,6 +8,8 @@ #define CONTENTCHUNK_MAX_BODY (LAIKA_MAX_PKTSIZE-sizeof(CONTENT_ID)) +/* ===========================================[[ Helper Functions ]]============================================= */ + size_t getSize(FILE *fd) { size_t sz; fseek(fd, 0L, SEEK_END); @@ -17,7 +19,7 @@ size_t getSize(FILE *fd) { } struct sLaika_content* getContentByID(struct sLaika_contentContext *context, CONTENT_ID id) { - struct sLaika_content *curr = context->headIn; + struct sLaika_content *curr = context->head; while (curr) { if (curr->id == id) @@ -34,8 +36,8 @@ void freeContent(struct sLaika_content *content) { laikaM_free(content); } -void rmvFromOut(struct sLaika_contentContext *context, struct sLaika_content *content) { - struct sLaika_content *last = NULL, *curr = context->headOut; +void rmvContent(struct sLaika_contentContext *context, struct sLaika_content *content) { + struct sLaika_content *last = NULL, *curr = context->head; while (curr) { /* if found, remove it! */ @@ -43,27 +45,7 @@ void rmvFromOut(struct sLaika_contentContext *context, struct sLaika_content *co if (last) last->next = curr->next; else - context->headOut = curr->next; - - freeContent(curr); - break; - } - - last = curr; - curr = curr->next; - } -} - -void rmvFromIn(struct sLaika_contentContext *context, struct sLaika_content *content) { - struct sLaika_content *last = NULL, *curr = context->headIn; - - while (curr) { - /* if found, remove it! */ - if (curr == content) { - if (last) - last->next = curr->next; - else - context->headIn = curr->next; + context->head = curr->next; freeContent(curr); break; @@ -81,25 +63,22 @@ void sendContentError(struct sLaika_peer *peer, CONTENT_ID id, CONTENT_ERRCODE e laikaS_endOutPacket(peer); } +/* ==============================================[[ Content API ]]=============================================== */ + void laikaF_initContext(struct sLaika_contentContext *context) { - context->headIn = NULL; - context->headOut = NULL; + context->head = NULL; context->nextID = 0; + + context->onReceived = NULL; + context->onNew = NULL; + context->onError = NULL; } void laikaF_cleanContext(struct sLaika_contentContext *context) { struct sLaika_content *tmp, *curr; - /* free IN list */ - curr = context->headIn; - while (curr) { - tmp = curr->next; - freeContent(curr); - curr = tmp; - } - - /* free OUT list */ - curr = context->headOut; + /* free content list */ + curr = context->head; while (curr) { tmp = curr->next; freeContent(curr); @@ -107,20 +86,33 @@ void laikaF_cleanContext(struct sLaika_contentContext *context) { } } -int laikaF_newOutContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type) { +void laikaF_setupEvents(struct sLaika_contentContext *context, contentRecvEvent onRecv, contentNewEvent onNew, contentErrorEvent onError) { + context->onReceived = onRecv; + context->onNew = onNew; + context->onError = onError; +} + +struct sLaika_content* laikaF_newContent(struct sLaika_contentContext *context, FILE *fd, size_t sz, CONTENT_ID id, CONTENT_TYPE type, bool direction) { struct sLaika_content *content = (struct sLaika_content*)laikaM_malloc(sizeof(struct sLaika_content)); - struct sLaika_contentContext *context = &peer->context; /* init content struct */ content->fd = fd; - content->sz = getSize(fd); + content->sz = sz; content->processed = 0; - content->id = context->nextID++; + content->id = id; content->type = type; + content->direction = direction; /* add to list */ - content->next = context->headOut; - context->headOut = content; + content->next = context->head; + context->head = content; + + return content; +} + +int laikaF_sendContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type) { + struct sLaika_contentContext *context = &peer->context; + struct sLaika_content *content = laikaF_newContent(context, fd, getSize(fd), context->nextID++, type, CONTENT_OUT); /* let the peer know we're sending them some content */ laikaS_startOutPacket(peer, LAIKAPKT_CONTENT_NEW); @@ -128,11 +120,12 @@ int laikaF_newOutContent(struct sLaika_peer *peer, FILE *fd, CONTENT_TYPE type) laikaS_writeInt(&peer->sock, &content->sz, sizeof(uint32_t)); laikaS_writeByte(&peer->sock, type); laikaS_endOutPacket(peer); + + return content->id; } /* new content we're recieving from a peer */ -void laikaF_newInContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, CONTENT_TYPE type) { - struct sLaika_content *content = (struct sLaika_content*)laikaM_malloc(sizeof(struct sLaika_content)); +struct sLaika_content* laikaF_recvContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, CONTENT_TYPE type) { struct sLaika_contentContext *context = &peer->context; if (getContentByID(context, id)) { @@ -140,46 +133,78 @@ void laikaF_newInContent(struct sLaika_peer *peer, CONTENT_ID id, uint32_t sz, C LAIKA_ERROR("ID [%d] is in use!\n", id); } - content->fd = tmpfile(); - content->sz = sz; - content->processed = 0; - content->id = id; - content->type = type; - - /* add to list */ - content->next = context->headIn; - context->headIn = content; + return laikaF_newContent(context, tmpfile(), sz, id, type, CONTENT_IN); } void laikaF_pollContent(struct sLaika_peer *peer) { uint8_t buff[CONTENTCHUNK_MAX_BODY]; struct sLaika_contentContext *context = &peer->context; - struct sLaika_content *tmp, *curr = context->headOut; + struct sLaika_content *tmp, *curr = context->head; int rd; /* traverse our out content, sending each chunk */ while (curr) { - /* if we've reached the end of the file stream, remove it! */ - if (rd = fread(buff, sizeof(uint8_t), MIN(curr->sz - curr->processed, CONTENTCHUNK_MAX_BODY), curr->fd) == 0) { - tmp = curr->next; - rmvFromOut(context, curr); - curr = tmp; - continue; + if (curr->direction == CONTENT_OUT) { + /* if we've reached the end of the file stream, remove it! */ + if (rd = fread(buff, sizeof(uint8_t), MIN(curr->sz - curr->processed, CONTENTCHUNK_MAX_BODY), curr->fd) == 0) { + tmp = curr->next; + rmvContent(context, curr); + curr = tmp; + continue; + } + + /* send chunk */ + laikaS_startVarPacket(peer, LAIKAPKT_CONTENT_CHUNK); + laikaS_writeInt(&peer->sock, &curr->id, sizeof(CONTENT_ID)); + laikaS_write(&peer->sock, buff, rd); + laikaS_endVarPacket(peer); + + curr->processed += rd; } - /* send chunk */ - laikaS_startVarPacket(peer, LAIKAPKT_CONTENT_CHUNK); - laikaS_writeInt(&peer->sock, &curr->id, sizeof(CONTENT_ID)); - laikaS_write(&peer->sock, buff, rd); - laikaS_endVarPacket(peer); - - curr->processed += rd; curr = curr->next; } } /* ============================================[[ Packet Handlers ]]============================================= */ +void laikaF_handleContentNew(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData) { + struct sLaika_contentContext *context = &peer->context; + struct sLaika_content *content; + uint32_t contentSize; + CONTENT_ID contentID; + CONTENT_TYPE contentType; + + laikaS_readInt(&peer->sock, &contentID, sizeof(CONTENT_ID)); + laikaS_readInt(&peer->sock, &contentSize, sizeof(uint32_t)); + contentType = laikaS_readByte(&peer->sock); + + content = laikaF_recvContent(peer, contentID, contentSize, contentType); + if (context->onNew && !context->onNew(context, content)) { + sendContentError(peer, contentID, CONTENT_ERR_REJECTED); + rmvContent(context, content); + } +} + +void laikaF_handleContentError(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData) { + struct sLaika_contentContext *context = &peer->context; + struct sLaika_content *content; + CONTENT_ID contentID; + uint8_t errCode; + + laikaS_readInt(&peer->sock, &contentID, sizeof(CONTENT_ID)); + errCode = laikaS_readByte(&peer->sock); + + if ((content = getContentByID(context, contentID)) == NULL) + LAIKA_ERROR("Received error for non-existant id %d!\n", coitentID); + + LAIKA_DEBUG("We received an errcode for id %d, err: %d\n", contentID, errCode); + if (context->onError) /* check if event exists! */ + context->onError(context, content, errCode); + + rmvContent(context, content); +} + void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void *uData) { uint8_t buff[CONTENTCHUNK_MAX_BODY]; struct sLaika_contentContext *context = &peer->context; @@ -192,15 +217,15 @@ void laikaF_handleContentChunk(struct sLaika_peer *peer, LAIKAPKT_SIZE sz, void /* read and sanity check id */ laikaS_readInt(&peer->sock, &id, sizeof(CONTENT_ID)); - if ((content = getContentByID(context, id)) == NULL) + if ((content = getContentByID(context, id)) == NULL || content->direction != CONTENT_IN) LAIKA_ERROR("chunk recieved with invalid id! [%d]\n", id); /* read data & write to file */ laikaS_read(&peer->sock, buff, bodySz); if (fwrite(buff, sizeof(uint8_t), bodySz, content->fd) != bodySz) { - rmvFromIn(context, content); - } else { - /* TODO: if content->processed == content->sz then handle full file received event */ - content->processed += bodySz; + rmvContent(context, content); + } else if ((content->processed += bodySz) == content->sz) { + if (context->onReceived) /* check if event exists! */ + context->onReceived(context, content); } -} \ No newline at end of file +} diff --git a/lib/src/lpeer.c b/lib/src/lpeer.c index 5f3e705..f7ff867 100644 --- a/lib/src/lpeer.c +++ b/lib/src/lpeer.c @@ -39,6 +39,8 @@ void laikaS_freePeer(struct sLaika_peer *peer) { laikaM_free(peer); } +/* ===========================================[[ Start/End Packets ]]============================================ */ + void laikaS_emptyOutPacket(struct sLaika_peer *peer, LAIKAPKT_ID id) { struct sLaika_socket *sock = &peer->sock; @@ -158,6 +160,8 @@ void laikaS_setSecure(struct sLaika_peer *peer, bool flag) { peer->useSecure = flag; } +/* ===========================================[[ Handle Poll Events ]]=========================================== */ + bool laikaS_handlePeerIn(struct sLaika_socket *sock) { struct sLaika_peer *peer = (struct sLaika_peer*)sock; int recvd; diff --git a/lib/src/lpolllist.c b/lib/src/lpolllist.c index a75bd0d..b7791b0 100644 --- a/lib/src/lpolllist.c +++ b/lib/src/lpolllist.c @@ -2,6 +2,8 @@ #include "lmem.h" #include "lpolllist.h" +/* ===========================================[[ Helper Functions ]]============================================= */ + typedef struct sLaika_hashMapElem { SOCKET fd; struct sLaika_socket *sock; @@ -19,6 +21,8 @@ uint64_t elem_hash(const void *item, uint64_t seed0, uint64_t seed1) { return (uint64_t)(u->fd); } +/* ==============================================[[ PollList API ]]============================================== */ + void laikaP_initPList(struct sLaika_pollList *pList) { laikaS_init();