diff options
author | Raymaekers Luca <raymaekers.luca@gmail.com> | 2024-10-26 16:26:34 +0200 |
---|---|---|
committer | Raymaekers Luca <raymaekers.luca@gmail.com> | 2024-10-26 16:28:46 +0200 |
commit | dc5c9fd10aa6ba4d1e5581cc1d5d2f0414b6efcd (patch) | |
tree | 4339cc9280b5706e6a64c078720ade8d94c7775b | |
parent | cafda3e8a60043629cd9a51490496b8cfd3cd921 (diff) |
Use dynamic limits for buffers
Use arenas where possible to have growable buffers
- Use of bufArena in server and client for receiving&sending messages
- Use of inputArena in client
Also organized code
-rw-r--r-- | README.md | 5 | ||||
-rw-r--r-- | chatty.c | 162 | ||||
-rw-r--r-- | common.h | 11 | ||||
-rw-r--r-- | send.c | 46 | ||||
-rw-r--r-- | server.c | 38 |
5 files changed, 154 insertions, 108 deletions
@@ -14,6 +14,8 @@ The idea is the following: - [x] bug: do not allow sending empty message - [x] wrapping messages - [x] bug: when sending message after diconnect (serverfd?) +- [x] Handle disconnection thiin a thread, the best way would be +- [ ] ctrl+z to suspend ## server - [ ] log messages to file (save history) @@ -21,7 +23,7 @@ The idea is the following: - [ ] timeout on recv? ## common -- [ ] handle messages that are too large +- [x] handle messages that are too large - [ ] connect/disconnections messages - [ ] use IP address / domain - [ ] chat history @@ -57,7 +59,6 @@ Notice, that this depends on knowing the text's length before allocating the mem - `Ctrl+U`: Erase input line - `Ctrl+W`: Erase word behind cursor - ## Resources I used for building this - source code I looked at: - https://github.com/git-bruh/matrix-tui @@ -14,6 +14,11 @@ #define TIMEOUT_POLL 60 * 1000 // time to reconnect in seconds #define TIMEOUT_RECONNECT 1 +// The input buffer is tied to an arena, INPUT_LEN specifies the intial number of wide characters +// allocated, and the INPUT_GROW specifies by how much the input should grow when it exceeds the +// buffer. +#define INPUT_LEN (256 * sizeof(wchar_t)) +#define INPUT_GROW (64 * sizeof(wchar_t)) // must be of AUTHOR_LEN -1 static u8 username[AUTHOR_LEN] = "(null)"; @@ -43,8 +48,33 @@ int main(int argc, char **argv) username[arg_len] = '\0'; } - s32 err, serverfd, ttyfd, resizefd, nsend; - setlocale(LC_ALL, ""); /* Fix unicode handling */ + s32 err = 0; // error code for functions + Arena *msgsArena = ArenaAlloc(); // Messages received & sent + Arena *msgTextArena = ArenaAlloc(); // Text from received & sent messages + Arena *bufArena = ArenaAlloc(); // data in buf + u8 *buf = ArenaPush(bufArena, STREAM_LIMIT); // buffer used for receiving and sending messages + Message *mbuf = (Message *)buf; // index for buf as a message + u32 nrecv = 0; // number of bytes received + u32 recv_len = 0; // total length of the received stream + u32 nsend = 0; // number of bytes sent + Message *recv_msg = NULL; // message received pushed on the msgsArena + + Arena *inputArena = ArenaAlloc(); // data in input + wchar_t *input = ArenaPush(inputArena, INPUT_LEN); // input buffer + u32 input_len = 0; // length of the input + + struct tb_event ev; // event fork keypress & resize + u8 quit = 0; // boolean to indicate if we want to quit the main loop + u8 *quitmsg = NULL; // this string will be printed before returning from main + + pthread_t thr_rec; // thread for reconnecting to server when disconnected + + // poopoo C cannot infer type + fds = (struct pollfd[FDS_MAX]){ + {-1, POLLIN, 0}, // FDS_SERVER + {-1, POLLIN, 0}, // FDS_TTY + {-1, POLLIN, 0}, // FDS_RESIZE + }; const struct sockaddr_in address = { AF_INET, @@ -52,43 +82,31 @@ int main(int argc, char **argv) {0}, }; - serverfd = socket(AF_INET, SOCK_STREAM, 0); - assert(serverfd > 2); // greater than STDERR + // Connecting to server + { + s32 serverfd; + serverfd = socket(AF_INET, SOCK_STREAM, 0); + assert(serverfd > 2); // greater than STDERR - err = connect(serverfd, (struct sockaddr *)&address, sizeof(address)); - if (err != 0) { - perror("Server"); - return 1; + err = connect(serverfd, (struct sockaddr *)&address, sizeof(address)); + if (err != 0) { + perror("Server"); + return 1; + } + fds[FDS_SERVER].fd = serverfd; } - tb_init(); - tb_get_fds(&ttyfd, &resizefd); - - // poopoo C cannot infer type - fds = (struct pollfd[FDS_MAX]){ - {serverfd, POLLIN, 0}, - { ttyfd, POLLIN, 0}, - {resizefd, POLLIN, 0}, - }; - - Arena *msgsArena = ArenaAlloc(); - // Message *messages = msgsArena->memory; // helper pointer, for indexing memory - Arena *msgTextArena = ArenaAlloc(); - u32 nrecv = 0; - // buffer used for receiving and sending messages - u8 buf[STREAM_BUF] = {0}; - Message *mbuf = (Message *)buf; + // for wide character printing + assert(setlocale(LC_ALL, "") != NULL); - wchar_t input[256] = {0}; - u32 input_len = 0; - struct tb_event ev; - char *errmsg = NULL; + // init + tb_init(); + tb_get_fds(&fds[FDS_TTY].fd, &fds[FDS_RESIZE].fd); - // Display loop screen_home(msgsArena, input); tb_present(); - u8 quit = 0; + // main loop while (!quit) { err = poll(fds, FDS_MAX, TIMEOUT_POLL); // ignore resize events and use them to redraw the screen @@ -98,35 +116,42 @@ int main(int argc, char **argv) if (fds[FDS_SERVER].revents & POLLIN) { // got data from server - u8 timestamp[TIMESTAMP_LEN]; - message_timestamp(timestamp); - nrecv = recv(fds[FDS_SERVER].fd, buf, STREAM_LIMIT, 0); assert(nrecv != -1); - // TODO: Handle this in a thread, the best way would be - // -> server disconnect info (somewhere, for now popup) - // -> user can still view messages, exit & type but not send - // -> try to reconnect in background + // Server disconnects if (nrecv == 0) { // close diconnected server's socket err = close(fds[FDS_SERVER].fd); assert(err == 0); fds[FDS_SERVER].fd = -1; // ignore // start trying to reconnect in a thread - pthread_t t; - err = pthread_create(&t, NULL, &thread_reconnect, (void*)&address); + err = pthread_create(&thr_rec, NULL, &thread_reconnect, (void *)&address); assert(err == 0); } else { - Message *buf_msg = (Message *)buf; // helper for indexing memory - Message *recvmsg = ArenaPush(msgsArena, sizeof(Message)); + recv_msg = ArenaPush(msgsArena, sizeof(*mbuf)); // copy everything but the text - memcpy(recvmsg, buf, AUTHOR_LEN + TIMESTAMP_LEN + sizeof(buf_msg->text_len)); + memcpy(recv_msg, buf, AUTHOR_LEN + TIMESTAMP_LEN + sizeof(mbuf->text_len)); // allocate memeory for text - recvmsg->text = ArenaPush(msgTextArena, recvmsg->text_len * sizeof(wchar_t)); + recv_msg->text = ArenaPush(msgTextArena, mbuf->text_len * sizeof(*mbuf->text)); + + // If we did not receive the entire message receive the remaining part + recv_len = sizeof(*recv_msg) - sizeof(recv_msg->text) + recv_msg->text_len * sizeof(*recv_msg->text); + if (recv_len > nrecv) { + // allocate needed space for buf + if (recv_len > bufArena->pos) + ArenaPush(bufArena, recv_len - bufArena->pos); + + // receive remaining bytes + u32 nr = recv(fds[FDS_SERVER].fd, buf + nrecv, recv_len - nrecv, 0); + assert(nr != -1); + nrecv += nr; + assert(nrecv == recv_len); + } + // copy the text to the allocated space - memcpy(recvmsg->text, buf + TIMESTAMP_LEN + AUTHOR_LEN + sizeof(recvmsg->text_len), recvmsg->text_len * sizeof(wchar_t)); + memcpy(recv_msg->text, buf + TIMESTAMP_LEN + AUTHOR_LEN + sizeof(recv_msg->text_len), recv_msg->text_len * sizeof(*mbuf->text)); } } @@ -184,7 +209,7 @@ int main(int argc, char **argv) // copy everything but the text memcpy(buf, sendmsg, AUTHOR_LEN + TIMESTAMP_LEN + sizeof(wchar_t)); memcpy(&mbuf->text, input, input_len * sizeof(wchar_t)); - nsend = send(fds[FDS_SERVER].fd, buf, AUTHOR_LEN + TIMESTAMP_LEN + input_len * sizeof(wchar_t), 0); + nsend = send(fds[FDS_SERVER].fd, buf, MESSAGELENP(mbuf), 0); assert(nsend > 0); case TB_KEY_CTRL_U: // clear input @@ -192,15 +217,14 @@ int main(int argc, char **argv) input_len = 0; break; default: - assert(ev.ch >= 0); if (ev.ch == 0) break; + // append key to input buffer - // TODO: check size does not exceed buffer input[input_len] = ev.ch; input_len++; - - break; + if (input_len * sizeof(*input) == inputArena->pos) + ArenaPush(inputArena, INPUT_GROW); } if (quit) break; @@ -219,11 +243,13 @@ int main(int argc, char **argv) tb_shutdown(); - if (errmsg != NULL) - printf("%s\n", errmsg); + if (quitmsg != NULL) + printf("%s\n", quitmsg); ArenaRelease(msgTextArena); ArenaRelease(msgsArena); + ArenaRelease(bufArena); + ArenaRelease(inputArena); return 0; } @@ -290,16 +316,6 @@ u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32 { assert(limit > 0); - /// Algorithm - // 1. Advance by limit - // 2. Look backwards for whitespace - // 3. split the string at the whitespace - // 4. print the string - // 5. restore the string (optional) - // 6. set the offset - // 7. repeat step 1. until i > len - // 8. print remaining part of the string - // lines y, incremented after each wrap s32 ly = y; // character the text is split on @@ -319,16 +335,32 @@ u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32 if (pfx != NULL) { tb_printf(x, ly, fg_pfx, bg_pfx, "%s", pfx); + // If the text fits on one line print the text and return + // Otherwise print the text on the next line s32 pfx_len = strlen(pfx); if (limit > pfx_len + text_len) { - // everything fits on one line - tb_printf(pfx_len, y, fg, bg, "%ls", text); + tb_printf(x + pfx_len, y, fg, bg, "%ls", text); return 1; } else { ly++; } } + /// Algorithm + // 1. Start at limit + // 2. Look backwards for whitespace + // 3. Whitespace found? + // n) failed++ + // i = limit + limit*failed + // step 2. + // y) step 4. + // 4. failed = 0 + // 5. terminate text at i found + // 6. print text + // 7. restore text[i] + // 8. step 2. until i >= text_len + // 9. print remaining part of the string + while (i < text_len) { // search backwards for whitespace while (i > offset && text[i] != L' ') @@ -367,7 +399,6 @@ void screen_home(Arena *msgsArena, wchar_t input[]) { // config options const u32 box_max_len = 80; - const u32 box_min_len = 3; const u32 box_x = 0, box_y = global.height - 3, box_pad_x = 1, box_mar_x = 1, box_bwith = 1, box_height = 3; u32 input_len = 0; while (input[input_len] != 0) @@ -487,4 +518,3 @@ void screen_home(Arena *msgsArena, wchar_t input[]) } } } - @@ -13,12 +13,11 @@ #define TIMESTAMP_LEN 9 // port to listen on #define PORT 9983 -// buffer size for holding data received from recv() -// TODO: choose a good size -#define STREAM_BUF 1024 -// max data received in one recv() call on serverfd -// TODO: choose a good size -#define STREAM_LIMIT 1024 +// How much bytes can be sent at once over the stream +// This is also Intial size for buffer used to send and receive data, +// "initial" because the buffer is tied to an arena and can grow. +// Note: must be greater than sizeof(Message) - sizeof(Message.text) +#define STREAM_LIMIT 64 typedef uint8_t u8; typedef uint16_t u16; @@ -3,10 +3,11 @@ #include <arpa/inet.h> #include <assert.h> #include <stdarg.h> -#include <string.h> #include <stdlib.h> +#include <string.h> #include <unistd.h> +#include "arena.h" #include "common.h" int main(int argc, char **argv) @@ -29,33 +30,34 @@ int main(int argc, char **argv) err = connect(serverfd, (struct sockaddr *)&address, sizeof(address)); assert(err == 0); - { - u32 author_len = strlen(argv[1]) + 1; // add 1 for null terminator - assert(author_len <= AUTHOR_LEN); + u32 author_len = strlen(argv[1]); // add 1 for null terminator + assert(author_len + 1 <= AUTHOR_LEN); - // convert text to wide string - u32 text_len = strlen(argv[2]) + 1; - wchar_t text_wide[text_len]; - u32 size = mbstowcs(text_wide, argv[2], text_len - 1); - assert(size == text_len - 1); - // null terminate - text_wide[text_len - 1] = 0; + // convert text to wide string + u32 text_len = strlen(argv[2]) + 1; + wchar_t text_wide[text_len]; + u32 size = mbstowcs(text_wide, argv[2], text_len - 1); + assert(size == text_len - 1); + // null terminate + text_wide[text_len - 1] = 0; - u8 buf[STREAM_BUF] = {0}; - Message *m = (Message *)buf; + Arena *bufArena = ArenaAlloc(); + u8 *buf = ArenaPush(bufArena, (text_len - 1) * sizeof(*text_wide)); + Message *mbuf = (Message *)buf; - memcpy(m->author, argv[1], author_len - 1); - message_timestamp(m->timestamp); - m->text_len = text_len; - memcpy(&m->text, text_wide, m->text_len * sizeof(wchar_t)); + memcpy(mbuf->author, argv[1], author_len); + message_timestamp(mbuf->timestamp); + mbuf->text_len = text_len; + memcpy(&mbuf->text, text_wide, mbuf->text_len * sizeof(wchar_t)); - nsend = send(serverfd, buf, MESSAGELENP(m), 0); + nsend = send(serverfd, buf, MESSAGELENP(mbuf), 0); - assert(nsend >= 0); + assert(nsend >= 0); - printf("text_len: %d\n", text_len); - fprintf(stdout, "Sent %d bytes.\n", nsend); - } + printf("text_len: %d\n", text_len); + fprintf(stdout, "Sent %d bytes.\n", nsend); + + ArenaRelease(bufArena); return 0; } @@ -48,15 +48,16 @@ int main(void) assert(err == 0); } - Arena *msgTextArena = ArenaAlloc(); // allocating text in messages that have a dynamic sized - Message mrecv = {0}; // message used for receiving messages from clients - u32 nrecv = 0; // number of bytes received - u32 nsend = 0; // number of bytes sent - u8 buf[STREAM_BUF] = {0}; // temporary buffer for received data, NOTE: this buffer - // is also use for retransmitting received messages to other - // clients. - - Arena *fdsArena = ArenaAlloc(); + Arena *msgTextArena = ArenaAlloc(); // allocating text in messages that have a dynamic sized + Message mrecv = {0}; // message used for receiving messages from clients + u32 nrecv = 0; // number of bytes received + u32 recv_len; // Number of bytes of the message received over stream + u32 nsend = 0; // number of bytes sent + Arena *bufArena = ArenaAlloc(); // data in buf + u8 *buf = ArenaPush(bufArena, STREAM_LIMIT); // temporary buffer for receiving and sending data + Message *mbuf = (Message *)buf; // pointer for indexing buf as a message + + Arena *fdsArena = ArenaAlloc(); // arena for fds to accomodate multiple clients struct pollfd *fds = fdsArena->memory; // helper for indexing memory struct pollfd c = {0, POLLIN, 0}; // helper client structure fore reusing struct pollfd *fdsAddr; // used for copying clients @@ -112,8 +113,8 @@ int main(void) if (fds[i].fd == -1) continue; - nrecv = recv(fds[i].fd, buf, STREAM_LIMIT, 0); - assert(nrecv >= 0); + nrecv = recv(fds[i].fd, buf, bufArena->pos, 0); + assert(nrecv != -1); if (nrecv == 0) { fprintf(stdout, "disconnected(%d). \n", fds[i].fd - serverfd); @@ -123,8 +124,21 @@ int main(void) continue; } + recv_len = sizeof(*mbuf) - sizeof(mbuf->text) + mbuf->text_len * sizeof(*mbuf->text); + if (recv_len > nrecv) { + // allocate needed space for buf + if (recv_len > bufArena->pos) + ArenaPush(bufArena, recv_len - bufArena->pos); + + // receive remaining bytes + u32 nr = recv(fds[i].fd, buf + nrecv, recv_len - nrecv, 0); + assert(nr != -1); + nrecv += nr; + assert(nrecv == recv_len); + } + // TODO: Do not print the message in the logs - fprintf(stdout, "message(%d): %d bytes.\n", fds[i].fd - serverfd, nrecv); + fprintf(stdout, "message(%d): %d bytes.\n", fds[i].fd - serverfd, nrecv); for (u32 j = FDS_CLIENTS; j < (FDS_SIZE); j++) { if (j == i) |