diff options
| -rw-r--r-- | README.md | 11 | ||||
| -rw-r--r-- | arena.h | 109 | ||||
| -rw-r--r-- | chatty.c | 591 | ||||
| -rw-r--r-- | chatty.h | 239 | ||||
| -rw-r--r-- | common.h | 51 | ||||
| -rw-r--r-- | compile_flags.txt | 5 | ||||
| -rw-r--r-- | send.c | 50 | ||||
| -rw-r--r-- | server.c | 259 | 
8 files changed, 766 insertions, 549 deletions
@@ -18,20 +18,23 @@ The idea is the following:  - [ ] ctrl+z to suspend  ## server -- [ ] log messages to file (save history)  - [ ] check if when sending and the client is offline (due to connection loss) what happens  - [ ] timeout on recv? +- [ ] use threads to handle clients/ timeout when receiving because a client could theoretically +  stall the entire server.  ## common  - [x] handle messages that are too large +- [ ] log messages to file (save history)  - [ ] connect/disconnections messages  - [ ] use IP address / domain  - [ ] chat history +- [ ] asserting, logging if fail / halt execution  ## Protocol -For now the protocol consists of sending Message type over the network, but in the future something -more flexible might be required.  Because it will make it easier to do things like: -- request chat logs up to a certain point +- see `protocol.h` for more info +- [ ] make sections per message +- request chat logs from a certain point up to now (history)  - connect to a specific room  - connect/disconnect messages diff --git a/arena.h b/arena.h deleted file mode 100644 index 9327a5e..0000000 --- a/arena.h +++ /dev/null @@ -1,109 +0,0 @@ -#ifndef ARENA_IMPL -#define ARENA_IMPL - -#include "common.h" - -#include <fcntl.h> -#include <stdint.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <strings.h> -#include <sys/mman.h> -#include <unistd.h> - -#define PAGESIZE 4096 - -#ifndef ARENA_MEMORY -#define ARENA_MEMORY PAGESIZE -#endif - -struct Arena { -    void *memory; -    u64 size; -    u64 pos; -} typedef Arena; - -// Create an arena -Arena *ArenaAlloc(void); -// Destroy an arena -void ArenaRelease(Arena *arena); - -// Push bytes on to the arena | allocating -void *ArenaPush(Arena *arena, u64 size); -void *ArenaPushZero(Arena *arena, u64 size); - -#define PushArray(arena, type, count) (type *)ArenaPush((arena), sizeof(type)*(count)) -#define PushArrayZero(arena, type, count) (type *)ArenaPushZero((arena), sizeof(type) * (count)) -#define PushStruct(arena, type) PushArray((arena), (type), 1) -#define PushStructZero(arena, type) PushArrayZero((arena), (type), 1) - -// Free some bytes by popping the stack -void ArenaPop(Arena *arena, u64 size); -// Get the number of bytes allocated -u64 ArenaGetPos(Arena *arena); - -void ArenaSetPosBack(Arena *arena, u64 pos); -void ArenaClear(Arena *arena); - -Arena *ArenaAlloc(void) -{ -    // NOTE: If the arena is created here the pointer to the memory get's overwritten with size in -    // ArenaPush, so we are forced to use malloc -    Arena *arena = malloc(sizeof(Arena)); - -    arena->memory = mmap(NULL, ARENA_MEMORY, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); -    if (arena->memory == MAP_FAILED) -        return NULL; - -    arena->pos = 0; -    arena->size = ARENA_MEMORY; - -    return arena; -} - -void ArenaRelease(Arena *arena) -{ -    munmap(arena->memory, ARENA_MEMORY); -    free(arena); -} - -void *ArenaPush(Arena *arena, u64 size) -{ -    u8 *mem; -    mem = (u8 *)arena->memory + arena->pos; -    arena->pos += size; -    return mem; -} - -void *ArenaPushZero(Arena *arena, u64 size) -{ -    u8 *mem; -    mem = (u8 *)arena->memory + arena->pos; -    bzero(mem, size); -    arena->pos += size; -    return mem; -} - -void ArenaPop(Arena *arena, u64 size) -{ -    arena->pos -= size; -} - -u64 ArenaGetPos(Arena *arena) -{ -    return arena->pos; -} - -void ArenaSetPosBack(Arena *arena, u64 pos) -{ -    arena->pos -= pos; -} - -void ArenaClear(Arena *arena) -{ -    bzero(arena->memory, arena->size); -    arena->pos = 0; -} - -#endif @@ -1,8 +1,7 @@  #define TB_IMPL  #include "termbox2.h" -#include "arena.h" -#include "common.h" +#include "chatty.h"  #include <arpa/inet.h>  #include <assert.h> @@ -14,16 +13,12 @@  #define TIMEOUT_POLL 60 * 1000  // time to reconnect in seconds  #define TIMEOUT_RECONNECT 1 -// The input buffer is tied to an arena, INPUT_LEN specifies the intial number of wide characters -// allocated, and the INPUT_GROW specifies by how much the input should grow when it exceeds the -// buffer. -#define INPUT_LEN (256 * sizeof(wchar_t)) -#define INPUT_GROW (64 * sizeof(wchar_t)) +#define INPUT_LIMIT 512  // must be of AUTHOR_LEN -1  static u8 username[AUTHOR_LEN] = "(null)";  // file descriptros for polling -static struct pollfd *fds = NULL; +static struct pollfd* fds = NULL;  // mutex for locking fds when in thread_reconnect()  static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; @@ -32,226 +27,21 @@ enum { FDS_SERVER = 0,         FDS_RESIZE,         FDS_MAX }; -void *thread_reconnect(void *address_ptr); -void fillstr(wchar_t *str, wchar_t ch, u32 len); -void popup(u32 fg, u32 bg, char *text); -u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32 bg_pfx, char *pfx, s32 limit); -void screen_home(Arena *msgsArena, wchar_t input[]); - -int main(int argc, char **argv) +// fill str array with char +void +fillstr(u32* str, u32 ch, u32 len)  { -    // Use first argument as username -    if (argc > 1) { -        u32 arg_len = strlen(argv[1]); -        assert(arg_len <= AUTHOR_LEN - 1); -        memcpy(username, argv[1], arg_len); -        username[arg_len] = '\0'; -    } - -    s32 err = 0;                                 // error code for functions -    Arena *msgsArena = ArenaAlloc();             // Messages received & sent -    Arena *msgTextArena = ArenaAlloc();          // Text from received & sent messages -    Arena *bufArena = ArenaAlloc();              // data in buf -    u8 *buf = ArenaPush(bufArena, STREAM_LIMIT); // buffer used for receiving and sending messages -    Message *mbuf = (Message *)buf;              // index for buf as a message -    u32 nrecv = 0;                               // number of bytes received -    u32 recv_len = 0;                            // total length of the received stream -    u32 nsend = 0;                               // number of bytes sent -    Message *recv_msg = NULL;                    // message received pushed on the msgsArena - -    Arena *inputArena = ArenaAlloc();                  // data in input -    wchar_t *input = ArenaPush(inputArena, INPUT_LEN); // input buffer -    u32 input_len = 0;                                 // length of the input - -    struct tb_event ev; // event fork keypress & resize -    u8 quit = 0;        // boolean to indicate if we want to quit the main loop -    u8 *quitmsg = NULL; // this string will be printed before returning from main - -    pthread_t thr_rec; // thread for reconnecting to server when disconnected - -    // poopoo C cannot infer type -    fds = (struct pollfd[FDS_MAX]){ -        {-1, POLLIN, 0}, // FDS_SERVER -        {-1, POLLIN, 0}, // FDS_TTY -        {-1, POLLIN, 0}, // FDS_RESIZE -    }; - -    const struct sockaddr_in address = { -        AF_INET, -        htons(PORT), -        {0}, -    }; - -    // Connecting to server -    { -        s32 serverfd; -        serverfd = socket(AF_INET, SOCK_STREAM, 0); -        assert(serverfd > 2); // greater than STDERR - -        err = connect(serverfd, (struct sockaddr *)&address, sizeof(address)); -        if (err != 0) { -            perror("Server"); -            return 1; -        } -        fds[FDS_SERVER].fd = serverfd; -    } - -    // for wide character printing -    assert(setlocale(LC_ALL, "") != NULL); - -    // init -    tb_init(); -    tb_get_fds(&fds[FDS_TTY].fd, &fds[FDS_RESIZE].fd); - -    screen_home(msgsArena, input); -    tb_present(); - -    // main loop -    while (!quit) { -        err = poll(fds, FDS_MAX, TIMEOUT_POLL); -        // ignore resize events and use them to redraw the screen -        assert(err != -1 || errno == EINTR); - -        tb_clear(); - -        if (fds[FDS_SERVER].revents & POLLIN) { -            // got data from server -            nrecv = recv(fds[FDS_SERVER].fd, buf, STREAM_LIMIT, 0); -            assert(nrecv != -1); - -            // Server disconnects -            if (nrecv == 0) { -                // close diconnected server's socket -                err = close(fds[FDS_SERVER].fd); -                assert(err == 0); -                fds[FDS_SERVER].fd = -1; // ignore -                // start trying to reconnect in a thread -                err = pthread_create(&thr_rec, NULL, &thread_reconnect, (void *)&address); -                assert(err == 0); - -            } else { -                recv_msg = ArenaPush(msgsArena, sizeof(*mbuf)); -                // copy everything but the text -                memcpy(recv_msg, buf, AUTHOR_LEN + TIMESTAMP_LEN + sizeof(mbuf->text_len)); -                // allocate memeory for text -                recv_msg->text = ArenaPush(msgTextArena, mbuf->text_len * sizeof(*mbuf->text)); - -                // If we did not receive the entire message receive the remaining part -                recv_len = sizeof(*recv_msg) - sizeof(recv_msg->text) + recv_msg->text_len * sizeof(*recv_msg->text); -                if (recv_len > nrecv) { -                    // allocate needed space for buf -                    if (recv_len > bufArena->pos) -                        ArenaPush(bufArena, recv_len - bufArena->pos); - -                    // receive remaining bytes -                    u32 nr = recv(fds[FDS_SERVER].fd, buf + nrecv, recv_len - nrecv, 0); -                    assert(nr != -1); -                    nrecv += nr; -                    assert(nrecv == recv_len); -                } - -                // copy the text to the allocated space -                memcpy(recv_msg->text, buf + TIMESTAMP_LEN + AUTHOR_LEN + sizeof(recv_msg->text_len), recv_msg->text_len * sizeof(*mbuf->text)); -            } -        } - -        if (fds[FDS_TTY].revents & POLLIN) { -            // got a key event -            tb_poll_event(&ev); - -            switch (ev.key) { -            case TB_KEY_CTRL_W: -                // delete consecutive whitespace -                while (input_len) { -                    if (input[input_len - 1] == L' ') { -                        input[input_len - 1] = 0; -                        input_len--; -                        continue; -                    } -                    break; -                } -                // delete until whitespace -                while (input_len) { -                    if (input[input_len - 1] == L' ') -                        break; -                    // erase -                    input[input_len - 1] = 0; -                    input_len--; -                } -                break; -            case TB_KEY_CTRL_D: -            case TB_KEY_CTRL_C: -                quit = 1; -                break; -            case TB_KEY_CTRL_M: // send message -                if (input_len == 0) -                    // do not send empty message -                    break; -                if (fds[FDS_SERVER].fd == -1) -                    // do not send message to disconnected server -                    break; - -                // null terminate -                input[input_len] = 0; -                input_len++; -                // TODO: check size does not exceed buffer - -                // add to msgsArena -                Message *sendmsg = ArenaPush(msgsArena, sizeof(Message)); -                memcpy(sendmsg->author, username, AUTHOR_LEN); -                message_timestamp(sendmsg->timestamp); -                sendmsg->text_len = input_len; -                sendmsg->text = ArenaPush(msgTextArena, input_len * sizeof(wchar_t)); -                // copy the text to the allocated space -                memcpy(sendmsg->text, input, input_len * sizeof(wchar_t)); - -                // Send the message -                // copy everything but the text -                memcpy(buf, sendmsg, AUTHOR_LEN + TIMESTAMP_LEN + sizeof(wchar_t)); -                memcpy(&mbuf->text, input, input_len * sizeof(wchar_t)); -                nsend = send(fds[FDS_SERVER].fd, buf, MESSAGELENP(mbuf), 0); -                assert(nsend > 0); - -            case TB_KEY_CTRL_U: // clear input -                bzero(input, input_len * sizeof(wchar_t)); -                input_len = 0; -                break; -            default: -                if (ev.ch == 0) -                    break; - -                // append key to input buffer -                input[input_len] = ev.ch; -                input_len++; -                if (input_len * sizeof(*input) == inputArena->pos) -                    ArenaPush(inputArena, INPUT_GROW); -            } -            if (quit) -                break; -        } - -        // These are used to redraw the screen from threads -        if (fds[FDS_RESIZE].revents & POLLIN) { -            // ignore -            tb_poll_event(&ev); -        } - -        screen_home(msgsArena, input); - -        tb_present(); -    } - -    tb_shutdown(); - -    if (quitmsg != NULL) -        printf("%s\n", quitmsg); - -    ArenaRelease(msgTextArena); -    ArenaRelease(msgsArena); -    ArenaRelease(bufArena); -    ArenaRelease(inputArena); +    for (u32 i = 0; i < len; i++) +        str[i] = ch; +} -    return 0; +// Centered popup displaying message in the appropriate cololrs +void +popup(u32 fg, u32 bg, char* text) +{ +    u32 len = strlen(text); +    assert(len > 0); +    tb_print(global.width / 2 - len / 2, global.height / 2, fg, bg, text);  }  // Takes as paramter `struct sockaddr_in*` and uses it to connect to the server. @@ -259,18 +49,20 @@ int main(int argc, char **argv)  // paramter.  To indicate that the server is offline the fds[FDS_SERVER] is set to -1.  When online  // it is set to a non-zero value.  // Returns NULL. -void *thread_reconnect(void *address_ptr) +void* +thread_reconnect(void* address_ptr)  {      u32 serverfd, err; -    struct sockaddr_in *address = address_ptr; +    struct sockaddr_in* address = address_ptr;      while (1) {          serverfd = socket(AF_INET, SOCK_STREAM, 0);          assert(serverfd > 2); // greater than STDERR -        err = connect(serverfd, (struct sockaddr *)address, sizeof(*address)); +        err = connect(serverfd, (struct sockaddr*)address, sizeof(*address));          if (err == 0)              break;          assert(errno == ECONNREFUSED); +        // TODO: faster reconnection? (too many open files)          sleep(TIMEOUT_RECONNECT);      } @@ -286,40 +78,26 @@ void *thread_reconnect(void *address_ptr)      return NULL;  } -// fill str array with char -void fillstr(wchar_t *str, wchar_t ch, u32 len) -{ -    for (u32 i = 0; i < len; i++) -        str[i] = ch; -} - -// Centered popup displaying message in the appropriate cololrs -void popup(u32 fg, u32 bg, char *text) -{ -    u32 len = strlen(text); -    assert(len > 0); -    tb_print(global.width / 2 - len / 2, global.height / 2, fg, bg, text); -} - -// Print `text` of text_len` wide characters wrapped to limit.  x, y, fg and +// Print `text` wrapped to limit. x, y, fg and  // bg will be passed to the tb_printf() function calls.  // pfx is a string that will be printed first and will not be wrapped on characters like msg->text,  // this is useful when for example: printing messages and wanting to have consistent  // timestamp+author name.  // Returns the number of lines printed. -// TODO: remove text_len and calculate it in the function  // TODO: add y limit  // TODO:(bug) text after pfx is wrapped one too soon  // TODO: text == NULL to know how many lines *would* be printed +// - no this should be a separate function  // TODO: check if text[i] goes out of bounds -u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32 bg_pfx, char *pfx, s32 limit) +u32 +tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, u32* text, s32 text_len, u32 fg_pfx, u32 bg_pfx, u8* pfx, s32 limit)  {      assert(limit > 0);      // lines y, incremented after each wrap      s32 ly = y;      // character the text is split on -    wchar_t t = 0; +    u32 t = 0;      // index used for searching in string      s32 i = limit;      // previous i for windowing through the text @@ -327,17 +105,13 @@ u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32      // used when retrying to get a longer limit      u32 failed = 0; -    u32 text_len = 0; -    while (text[text_len] != 0) -        text_len++; -      // NOTE: We can assume that we need to wrap, therefore print a newline after the prefix string      if (pfx != NULL) {          tb_printf(x, ly, fg_pfx, bg_pfx, "%s", pfx);          // If the text fits on one line print the text and return          // Otherwise print the text on the next line -        s32 pfx_len = strlen(pfx); +        s32 pfx_len = strlen((char*)pfx);          if (limit > pfx_len + text_len) {              tb_printf(x + pfx_len, y, fg, bg, "%ls", text);              return 1; @@ -394,15 +168,14 @@ u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32  }  // home screen, the first screen the user sees -// it displays a prompt for user input and the received messages from msgsArena -void screen_home(Arena *msgsArena, wchar_t input[]) +// it displays a prompt with the user input of input_len wide characters +// and the received messages from msgsArena +void +screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len)  {      // config options -    const u32 box_max_len = 80; -    const u32 box_x = 0, box_y = global.height - 3, box_pad_x = 1, box_mar_x = 1, box_bwith = 1, box_height = 3; -    u32 input_len = 0; -    while (input[input_len] != 0) -        input_len++; +    const s32 box_max_len = 80; +    const s32 box_x = 0, box_y = global.height - 3, box_pad_x = 1, box_mar_x = 1, box_bwith = 1, box_height = 3;      const u32 prompt_x = box_x + box_pad_x + box_mar_x + box_bwith + input_len;      // the minimum height required is the hight for the box prompt @@ -428,28 +201,57 @@ void screen_home(Arena *msgsArena, wchar_t input[])          if (freesp <= 0)              goto draw_prompt; -        Message *messages = msgsArena->memory; -        assert(messages != NULL); +        u8* addr = msgsArena->addr; +        assert(addr != NULL);          // on what line to print the current message, used for scrolling          u32 msg_y = 0; -        u32 nmessages = (msgsArena->pos / sizeof(Message));          u32 offs = (nmessages > freesp) ? nmessages - freesp : 0; +        // In each case statement advance the addr pointer by the size of the message          for (u32 i = offs; i < nmessages; i++) { -            // Color user's own messages -            u32 fg = 0; -            if (strncmp((char *)username, (char *)messages[i].author, AUTHOR_LEN) == 0) { -                fg = TB_CYAN; -            } else { -                fg = TB_MAGENTA; +            HeaderMessage* header = (HeaderMessage*)addr; +            addr += sizeof(*header); + +            switch (header->type) { +            case HEADER_TYPE_TEXT: { +                TextMessage* message = (TextMessage*)addr; + +                u32 fg = 0; +                if (strncmp((char*)username, (char*)message->author, AUTHOR_LEN) == 0) { +                    fg = TB_CYAN; +                } else { +                    fg = TB_MAGENTA; +                } +                // prefix is of format "HH:MM:SS [<author>] ", so +                u8 pfx[AUTHOR_LEN - 1 + TIMESTAMP_LEN - 1 + 4 + 1] = {0}; +                u8 timestamp[TIMESTAMP_LEN]; +                formatTimestamp(timestamp, message->timestamp); +                sprintf((char*)pfx, "%s [%s] ", timestamp, message->author); +                msg_y += tb_printf_wrap(0, msg_y, TB_WHITE, 0, (u32*)&message->text, message->len, fg, 0, pfx, global.width); + +                u32 message_size = TEXTMESSAGE_SIZE + message->len * sizeof(*message->text); +                addr += message_size; +                break; +            } +            case HEADER_TYPE_PRESENCE: { +                PresenceMessage* message = (PresenceMessage*)addr; +                tb_printf(0, msg_y, 0, 0, " [%s] *%s*", message->author, presenceTypeString(message->type)); +                msg_y++; +                addr += sizeof(*message); +                break; +            } +            case HEADER_TYPE_HISTORY: { +                HistoryMessage* message = (HistoryMessage*)addr; +                addr += sizeof(*message); +                // TODO: implement +            } +            default: { +                tb_printf(0, msg_y, 0, 0, "%s", headerTypeString(header->type)); +                msg_y++; +                break; +            }              } - -            u32 ty = 0; -            char pfx[AUTHOR_LEN + TIMESTAMP_LEN - 2 + 5] = {0}; -            sprintf(pfx, "%s [%s] ", messages[i].timestamp, messages[i].author); -            ty = tb_printf_wrap(0, msg_y, TB_WHITE, 0, messages[i].text, fg, 0, pfx, global.width); -            msg_y += ty;          }      draw_prompt: @@ -469,10 +271,10 @@ void screen_home(Arena *msgsArena, wchar_t input[])                  box_len = global.width - box_mar_x * 2;              // +2 for corners and null terminator -            wchar_t box_up[box_len + 1]; -            wchar_t box_in[box_len + 1]; -            wchar_t box_down[box_len + 1]; -            wchar_t lr = L'─', ur = L'╭', rd = L'╮', dr = L'╰', ru = L'╯', ud = L'│'; +            u32 box_up[box_len + 1]; +            u32 box_in[box_len + 1]; +            u32 box_down[box_len + 1]; +            u32 lr = L'─', ur = L'╭', rd = L'╮', dr = L'╰', ru = L'╯', ud = L'│';              // top bar              box_up[0] = ur; @@ -503,7 +305,7 @@ void screen_home(Arena *msgsArena, wchar_t input[])                  return;              if (input_len > freesp) { -                wchar_t *text_offs = input + (input_len - freesp); +                u32* text_offs = input + (input_len - freesp);                  tb_printf(box_x + box_mar_x + box_pad_x + box_bwith, box_y + 1, 0, 0, "%ls", text_offs);                  global.cursor_x = box_x + box_pad_x + box_mar_x + box_bwith + freesp;              } else { @@ -518,3 +320,232 @@ void screen_home(Arena *msgsArena, wchar_t input[])          }      }  } + +int +main(int argc, char** argv) +{ +    // Use first argument as username +    if (argc > 1) { +        u32 arg_len = strlen(argv[1]); +        assert(arg_len <= AUTHOR_LEN - 1); +        memcpy(username, argv[1], arg_len); +        username[arg_len] = '\0'; +    } + +    s32 err = 0; // error code for functions + +    Arena* msgsArena = ArenaAlloc(Megabytes(64)); // Messages received & sent +    u32 nmessages = 0;                            // Number of messages in msgsArena +    s32 nrecv = 0;                                // number of bytes received +    s32 nsend = 0;                                // number of bytes sent + +    u32 input[INPUT_LIMIT] = {0}; // input buffer +    u32 ninput = 0;               // number of characters in input + +    struct tb_event ev; // event fork keypress & resize +    u8 quit = 0;        // boolean to indicate if we want to quit the main loop +    u8* quitmsg = NULL; // this string will be printed before returning from main + +    pthread_t thr_rec; // thread for reconnecting to server when disconnected + +    // poopoo C cannot infer type +    fds = (struct pollfd[FDS_MAX]){ +        {-1, POLLIN, 0}, // FDS_SERVER +        {-1, POLLIN, 0}, // FDS_TTY +        {-1, POLLIN, 0}, // FDS_RESIZE +    }; + +    const struct sockaddr_in address = { +        AF_INET, +        htons(PORT), +        {0}, +        {0}, +    }; + +    // Connecting to server +    { +        s32 serverfd; +        serverfd = socket(AF_INET, SOCK_STREAM, 0); +        assert(serverfd > 2); // greater than STDERR + +        err = connect(serverfd, (struct sockaddr*)&address, sizeof(address)); +        if (err != 0) { +            perror("Server"); +            return 1; +        } +        fds[FDS_SERVER].fd = serverfd; + +        // Introduce ourselves +        HeaderMessage header = HEADER_PRESENCEMESSAGE; +        PresenceMessage message = {.type = PRESENCE_TYPE_CONNECTED}; +        memcpy(message.author, username, AUTHOR_LEN); +        nsend = send(serverfd, &header, sizeof(header), 0); +        assert(nsend != -1); +        assert(nsend == sizeof(header)); +        nsend = send(serverfd, &message, sizeof(message), 0); +        assert(nsend != -1); +        assert(nsend == sizeof(message)); +    } + +    // for wide character printing +    assert(setlocale(LC_ALL, "") != NULL); + +    // init +    tb_init(); +    tb_get_fds(&fds[FDS_TTY].fd, &fds[FDS_RESIZE].fd); + +    screen_home(msgsArena, nmessages, input, ninput); +    tb_present(); + +    // main loop +    while (!quit) { +        err = poll(fds, FDS_MAX, TIMEOUT_POLL); +        // ignore resize events and use them to redraw the screen +        assert(err != -1 || errno == EINTR); + +        tb_clear(); + +        if (fds[FDS_SERVER].revents & POLLIN) { +            // got data from server +            HeaderMessage header; +            nrecv = recv(fds[FDS_SERVER].fd, &header, sizeof(header), 0); +            assert(nrecv != -1); + +            // Server disconnects +            if (nrecv == 0) { +                // close diconnected server's socket +                err = close(fds[FDS_SERVER].fd); +                assert(err == 0); +                fds[FDS_SERVER].fd = -1; // ignore +                // start trying to reconnect in a thread +                err = pthread_create(&thr_rec, NULL, &thread_reconnect, (void*)&address); +                assert(err == 0); +            } else { +                // TODO: validate version +                // if (header.version == PROTOCOL_VERSION) +                //     continue; + +                void* addr = ArenaPush(msgsArena, sizeof(header)); +                memcpy(addr, &header, sizeof(header)); + +                switch (header.type) { +                case HEADER_TYPE_TEXT: +                    recvTextMessage(msgsArena, fds[FDS_SERVER].fd, NULL); +                    nmessages++; +                    break; +                case HEADER_TYPE_PRESENCE:; +                    PresenceMessage* message = ArenaPush(msgsArena, sizeof(*message)); +                    nrecv = recv(fds[FDS_SERVER].fd, message, sizeof(*message), 0); +                    assert(nrecv != -1); +                    assert(nrecv == sizeof(*message)); +                    nmessages++; +                    break; +                default: +                    // TODO: log +                    break; +                } +            } +        } + +        if (fds[FDS_TTY].revents & POLLIN) { +            // got a key event +            tb_poll_event(&ev); + +            switch (ev.key) { +            case TB_KEY_CTRL_W: +                // delete consecutive whitespace +                while (ninput) { +                    if (input[ninput - 1] == L' ') { +                        input[ninput - 1] = 0; +                        ninput--; +                        continue; +                    } +                    break; +                } +                // delete until whitespace +                while (ninput) { +                    if (input[ninput - 1] == L' ') +                        break; +                    // erase +                    input[ninput - 1] = 0; +                    ninput--; +                } +                break; +            case TB_KEY_CTRL_D: +            case TB_KEY_CTRL_C: +                quit = 1; +                break; +            case TB_KEY_CTRL_M: // send message +                if (ninput == 0) +                    // do not send empty message +                    break; +                if (fds[FDS_SERVER].fd == -1) +                    // do not send message to disconnected server +                    break; + +                // null terminate +                input[ninput] = 0; +                ninput++; + +                // Save header +                HeaderMessage header = HEADER_TEXTMESSAGE; +                void* addr = ArenaPush(msgsArena, sizeof(header)); +                memcpy(addr, &header, sizeof(header)); + +                // Save message +                TextMessage* sendmsg = ArenaPush(msgsArena, TEXTMESSAGE_SIZE); +                memcpy(sendmsg->author, username, AUTHOR_LEN); +                sendmsg->timestamp = time(NULL); +                sendmsg->len = ninput; + +                u32 text_size = ninput * sizeof(*input); +                ArenaPush(msgsArena, text_size); +                memcpy(&sendmsg->text, input, text_size); + +                // Send message +                nsend = send(fds[FDS_SERVER].fd, &header, sizeof(header), 0); +                assert(nsend != -1); +                nsend = send(fds[FDS_SERVER].fd, sendmsg, TEXTMESSAGE_SIZE + TEXTMESSAGE_TEXT_SIZE((*sendmsg)), 0); +                assert(nsend != -1); + +                nmessages++; +                // also clear input +            case TB_KEY_CTRL_U: // clear input +                bzero(input, ninput * sizeof(*input)); +                ninput = 0; +                break; +            default: +                if (ev.ch == 0) +                    break; +                // TODO: logging +                if (ninput == INPUT_LIMIT - 1) // last byte reserved for \0 +                    break; + +                // append key to input buffer +                input[ninput] = ev.ch; +                ninput++; +            } +            if (quit) +                break; +        } + +        // These are used to redraw the screen from threads +        if (fds[FDS_RESIZE].revents & POLLIN) { +            // ignore +            tb_poll_event(&ev); +        } + +        screen_home(msgsArena, nmessages, input, ninput); + +        tb_present(); +    } + +    tb_shutdown(); + +    if (quitmsg != NULL) +        printf("%s\n", quitmsg); + +    ArenaRelease(msgsArena); + +    return 0; +} diff --git a/chatty.h b/chatty.h new file mode 100644 index 0000000..916ae4a --- /dev/null +++ b/chatty.h @@ -0,0 +1,239 @@ +#ifndef CHATTY_IMPL + +#include <assert.h> +#include <locale.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <strings.h> +#include <sys/mman.h> +#include <sys/socket.h> +#include <time.h> +#include <wchar.h> + +typedef uint8_t u8; +typedef uint16_t u16; +typedef uint32_t u32; +typedef uint64_t u64; +typedef int8_t s8; +typedef int16_t s16; +typedef int32_t s32; +typedef int64_t s64; +typedef enum { +    False = 0, +    True = 1 +} Bool; + +// port for chatty +#define PORT 9983 + +#define Kilobytes(Value) ((Value) * 1024) +#define Megabytes(Value) (Kilobytes(Value) * 1024) +#define Gigabytes(Value) (Megabytes((u64)Value) * 1024) +#define Terabytes(Value) (Gigabytes((u64)Value) * 1024) +#define PAGESIZE 4096 + +struct Arena { +    void* addr; +    u64 size; +    u64 pos; +} typedef Arena; + +#define PushArray(arena, type, count) (type*)ArenaPush((arena), sizeof(type) * (count)) +#define PushArrayZero(arena, type, count) (type*)ArenaPushZero((arena), sizeof(type) * (count)) +#define PushStruct(arena, type) PushArray((arena), (type), 1) +#define PushStructZero(arena, type) PushArrayZero((arena), (type), 1) + +Arena* +ArenaAlloc(u64 size) +{ +    Arena* arena = (Arena*)malloc(sizeof(Arena)); + +    arena->addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); +    if (arena->addr == MAP_FAILED) +        return NULL; +    arena->pos = 0; +    arena->size = size; + +    return arena; +} + +void +ArenaRelease(Arena* arena) +{ +    munmap(arena->addr, arena->size); +    free(arena); +} + +void* +ArenaPush(Arena* arena, u64 size) +{ +    u8* mem; +    mem = (u8*)arena->addr + arena->pos; +    arena->pos += size; +    return mem; +} + +/// Protocol +// - every message has format Header + Message +// TODO: authentication +// TODO: encryption + +/// Protocol Header +// - 2 bytes for version +// - 1 byte for message type +// - 16 bytes for checksum +// +// Text Message +// - 12 bytes for the author +// - 8 bytes for the timestamp +// - 2 bytes for the text length +// - x*4 bytes for the text +// +// History Message +// This message is for requesting messages sent after a timestamp. +// - 8 bytes for the timestamp + +/// Naming convention +// Messages end with the Message suffix (eg. TextMessag, HistoryMessage) +// A function that is coupled to a type works like +// <noun><type> eg. (printTextMessage, formatTimestamp) + +#define PROTOCOL_VERSION 0 + +typedef struct { +    u16 version; +    u8 type; +} HeaderMessage; + +enum { HEADER_TYPE_TEXT = 0, +       HEADER_TYPE_HISTORY, +       HEADER_TYPE_PRESENCE }; +#define HEADER_TEXTMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_TEXT}; +#define HEADER_HISTORYMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_HISTORY}; +#define HEADER_PRESENCEMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_PRESENCE}; + +// Size of author string including null terminator +#define AUTHOR_LEN 13 +// Size of formatted timestamp string including null terminator +#define TIMESTAMP_LEN 9 + +typedef struct { +    u8 checksum[16]; +    u8 author[AUTHOR_LEN]; +    u64 timestamp; +    u16 len;   // including null terminator +    u32* text; // placeholder for indexing +               // TODO: 0-length field? +} TextMessage; + +// Size of TextMessage without text pointer, used when receiving the message over a stream +#define TEXTMESSAGE_TEXT_SIZE(m) (m.len * sizeof(*m.text)) +#define TEXTMESSAGE_SIZE (sizeof(TextMessage) - sizeof(u32*)) + +typedef struct { +    u64 timestamp; +} HistoryMessage; + +typedef struct { +    u8 author[AUTHOR_LEN]; +    u8 type; +} PresenceMessage; +enum { PRESENCE_TYPE_CONNECTED = 0, +       PRESENCE_TYPE_DISCONNECTED }; + +// Returns string for type byte in HeaderMessage +u8* +headerTypeString(u8 type) +{ +    switch (type) { +    case HEADER_TYPE_TEXT: return (u8*)"TextMessage"; +    case HEADER_TYPE_HISTORY: return (u8*)"HistoryMessage"; +    case HEADER_TYPE_PRESENCE: return (u8*)"PresenceMessage"; +    default: return (u8*)"Unknown"; +    } +} + +u8* +presenceTypeString(u8 type) +{ +    switch (type) { +    case PRESENCE_TYPE_CONNECTED: return (u8*)"connected"; +    case PRESENCE_TYPE_DISCONNECTED: return (u8*)"disconnected"; +    default: return (u8*)"Unknown"; +    } +} + +// from Tsoding video on minicel (https://youtu.be/HCAgvKQDJng?t=4546) +// sv(https://github.com/tsoding/sv) +#define PH_FMT "header: v%d %s(%d)" +#define PH_ARG(header) header.version, headerTypeString(header.type), header.type + +void +formatTimestamp(u8 tmsp[TIMESTAMP_LEN], u64 t) +{ +    struct tm* ltime; +    ltime = localtime((time_t*)&t); +    strftime((char*)tmsp, TIMESTAMP_LEN, "%H:%M:%S", ltime); +} + +void +printTextMessage(TextMessage* message, u8 wide) +{ +    u8 timestamp[TIMESTAMP_LEN] = {0}; +    formatTimestamp(timestamp, message->timestamp); + +    assert(setlocale(LC_ALL, "") != NULL); + +    if (wide) +        wprintf(L"TextMessage: %s [%s] %ls\n", timestamp, message->author, (wchar_t*)&message->text); +    else { +        u8 str[message->len]; +        wcstombs((char*)str, (wchar_t*)&message->text, message->len * sizeof(*message->text)); +        printf("TextMessage: %s [%s] (%d)%s\n", timestamp, message->author, message->len, str); +    } +} + +// Receive a message from fd and store it to the msgsArena, +// if dest is not NULL point it to the new message created on msgsArena +// Returns the number of bytes received +u32 +recvTextMessage(Arena* msgsArena, u32 fd, TextMessage** dest) +{ +    s32 nrecv = 0; + +    TextMessage* message = ArenaPush(msgsArena, TEXTMESSAGE_SIZE); +    if (dest != NULL) +        *dest = message; + +    // Receive everything but the text so we can know the text's size and act accordingly +    nrecv = recv(fd, message, TEXTMESSAGE_SIZE, 0); +    assert(nrecv != -1); +    assert(nrecv == TEXTMESSAGE_SIZE); + +    nrecv = 0; + +    // Allocate memory for text and receive in that memory +    u32 text_size = message->len * sizeof(*message->text); +    ArenaPush(msgsArena, text_size); + +    nrecv = recv(fd, (u8*)&message->text, text_size, 0); +    assert(nrecv != -1); +    assert(nrecv == message->len * sizeof(*message->text)); + +    return TEXTMESSAGE_SIZE + nrecv; +} + +u32 +wstrlen(u32* str) +{ +    u32 i = 0; +    while (str[i] != 0) +        i++; +    return i; +} + +#endif +#define CHATTY_H diff --git a/common.h b/common.h deleted file mode 100644 index 700d741..0000000 --- a/common.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef COMMON_H -#define COMMON_H - -#include <stdarg.h> -#include <stdio.h> -#include <time.h> -#include <unistd.h> -#include <stdint.h> -#include <stddef.h> -#include <wchar.h> - -#define AUTHOR_LEN 13 -#define TIMESTAMP_LEN 9 -// port to listen on -#define PORT 9983 -// How much bytes can be sent at once over the stream -// This is also Intial size for buffer used to send and receive data, -// "initial" because the buffer is tied to an arena and can grow. -// Note: must be greater than sizeof(Message) - sizeof(Message.text) -#define STREAM_LIMIT 64 - -typedef uint8_t u8; -typedef uint16_t u16; -typedef uint32_t u32; -typedef uint64_t u64; -typedef int8_t s8; -typedef int16_t s16; -typedef int32_t s32; -typedef int64_t s64; - -struct Message { -    u8 author[AUTHOR_LEN]; -    u8 timestamp[TIMESTAMP_LEN]; -    // includes null terminator -    u16 text_len; -    wchar_t *text; -} typedef Message; - -#define MESSAGELEN(m) (AUTHOR_LEN + TIMESTAMP_LEN + sizeof(m.text_len)*sizeof(wchar_t) + m.text_len) -#define MESSAGELENP(m) (AUTHOR_LEN + TIMESTAMP_LEN + sizeof(m->text_len) + m->text_len*(sizeof(wchar_t))) - -void message_timestamp(u8 str[TIMESTAMP_LEN]) -{ -    time_t now; -    struct tm *ltime; -    time(&now); -    ltime = localtime(&now); -    strftime((char *)str, TIMESTAMP_LEN, "%H:%M:%S", ltime); -} - -#endif diff --git a/compile_flags.txt b/compile_flags.txt new file mode 100644 index 0000000..eb526c6 --- /dev/null +++ b/compile_flags.txt @@ -0,0 +1,5 @@ +-Wall +-Wextra +-pedantic +-std=c99 +-O3 @@ -7,17 +7,17 @@  #include <string.h>  #include <unistd.h> -#include "arena.h" -#include "common.h" +#include "chatty.h" -int main(int argc, char **argv) +int +main(int argc, char** argv)  {      if (argc < 3) {          fprintf(stderr, "usage: send <author> <msg>\n");          return 1;      } -    u32 err, serverfd, nsend; +    s32 err, serverfd, nsend;      serverfd = socket(AF_INET, SOCK_STREAM, 0);      assert(serverfd != -1); @@ -27,37 +27,41 @@ int main(int argc, char **argv)          htons(PORT),          {0},      }; -    err = connect(serverfd, (struct sockaddr *)&address, sizeof(address)); +    err = connect(serverfd, (struct sockaddr*)&address, sizeof(address));      assert(err == 0); -    u32 author_len = strlen(argv[1]); // add 1 for null terminator -    assert(author_len + 1 <= AUTHOR_LEN); +    HeaderMessage header = HEADER_TEXTMESSAGE; +    TextMessage* message;      // convert text to wide string      u32 text_len = strlen(argv[2]) + 1; -    wchar_t text_wide[text_len]; -    u32 size = mbstowcs(text_wide, argv[2], text_len - 1); +    u32 text_wide[text_len]; +    u32 size = mbstowcs((wchar_t*)text_wide, argv[2], text_len - 1);      assert(size == text_len - 1); -    // null terminate      text_wide[text_len - 1] = 0; -    Arena *bufArena = ArenaAlloc(); -    u8 *buf = ArenaPush(bufArena, (text_len - 1) * sizeof(*text_wide)); -    Message *mbuf = (Message *)buf; +    u32 author_len = strlen(argv[1]); +    assert(author_len + 1 <= AUTHOR_LEN); // add 1 for null terminator -    memcpy(mbuf->author, argv[1], author_len); -    message_timestamp(mbuf->timestamp); -    mbuf->text_len = text_len; -    memcpy(&mbuf->text, text_wide, mbuf->text_len * sizeof(wchar_t)); +    u8 buf[text_len * sizeof(*text_wide) + TEXTMESSAGE_SIZE]; +    bzero(buf, sizeof(buf)); +    message = (TextMessage*)buf; -    nsend = send(serverfd, buf, MESSAGELENP(mbuf), 0); +    memcpy(message->author, argv[1], author_len); +    message->timestamp = time(NULL); +    message->len = text_len; +    memcpy(&message->text, text_wide, text_len * sizeof(*message->text)); -    assert(nsend >= 0); +    nsend = send(serverfd, &header, sizeof(header), 0); +    assert(nsend != -1); +    printf("header bytes sent: %d\n", nsend); +    nsend = send(serverfd, buf, sizeof(buf), 0); +    assert(nsend != -1); -    printf("text_len: %d\n", text_len); -    fprintf(stdout, "Sent %d bytes.\n", nsend); - -    ArenaRelease(bufArena); +    printf("text length: %d\n", text_len); +    printf("buf size: %lu\n", sizeof(buf)); +    printf("text size: %lu\n", sizeof(*text_wide) * text_len); +    printf("message bytes sent: %d\n", nsend);      return 0;  } @@ -1,30 +1,38 @@ -#include "arena.h" -#include "common.h" +#include "chatty.h"  #include <assert.h>  #include <netinet/in.h>  #include <poll.h>  #include <stdarg.h> +#include <string.h>  #include <sys/socket.h> +#include <unistd.h>  // timeout on polling  #define TIMEOUT 60 * 1000  // max pending connections -#define PENDING_MAX 16 - -// the size of pollfd element in the fdsArena -// note: clientsArena and pollfd_size must have been initialisezd -#define FDS_SIZE fdsArena->pos / pollfd_size +#define MAX_CONNECTIONS 16 +// Get number of connections from arena position +// NOTE: this is somewhat wrong, because of when disconnections happen +#define FDS_SIZE (fdsArena->pos / sizeof(*fds))  // enum for indexing the fds array  enum { FDS_STDIN = 0,         FDS_SERVER,         FDS_CLIENTS }; -int main(void) +// Has information on clients +// For each pollfd in fds there should be a matching client in clients +// clients[i - FDS_CLIENTS] <=> fds[i] +typedef struct { +    u8 author[AUTHOR_LEN]; // matches author property on other message types +    Bool initialized;      // boolean +} Client; + +int +main(void)  { -    u32 err, serverfd, clientfd; -    u16 nclient = 0; +    s32 err, serverfd, clientfd;      u32 on = 1;      // Start listening on the socket @@ -32,7 +40,7 @@ int main(void)          serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);          assert(serverfd > 2); -        err = setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, (u8 *)&on, sizeof(on)); +        err = setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, (u8*)&on, sizeof(on));          assert(err == 0);          const struct sockaddr_in address = { @@ -41,36 +49,39 @@ int main(void)              {0},          }; -        err = bind(serverfd, (const struct sockaddr *)&address, sizeof(address)); +        err = bind(serverfd, (const struct sockaddr*)&address, sizeof(address));          assert(err == 0); -        err = listen(serverfd, PENDING_MAX); +        err = listen(serverfd, MAX_CONNECTIONS);          assert(err == 0);      } -    Arena *msgTextArena = ArenaAlloc();          // allocating text in messages that have a dynamic sized -    Message mrecv = {0};                         // message used for receiving messages from clients -    u32 nrecv = 0;                               // number of bytes received -    u32 recv_len;                                // Number of bytes of the message received over stream -    u32 nsend = 0;                               // number of bytes sent -    Arena *bufArena = ArenaAlloc();              // data in buf -    u8 *buf = ArenaPush(bufArena, STREAM_LIMIT); // temporary buffer for receiving and sending data -    Message *mbuf = (Message *)buf;              // pointer for indexing buf as a message - -    Arena *fdsArena = ArenaAlloc();        // arena for fds to accomodate multiple clients -    struct pollfd *fds = fdsArena->memory; // helper for indexing memory -    struct pollfd c = {0, POLLIN, 0};      // helper client structure fore reusing -    struct pollfd *fdsAddr;                // used for copying clients -    const u64 pollfd_size = sizeof(struct pollfd); +    Arena* msgsArena = ArenaAlloc(Megabytes(128)); // storing received messages +                                                   // NOTE: sent messages? +    s32 nrecv = 0;                                 // number of bytes received +    s32 nsend = 0;                                 // number of bytes sent + +    Arena* clientsArena = ArenaAlloc(MAX_CONNECTIONS * sizeof(Client)); +    Arena* fdsArena = ArenaAlloc(MAX_CONNECTIONS * sizeof(struct pollfd)); +    struct pollfd* fds = fdsArena->addr; +    Client* clients = clientsArena->addr; + +    struct pollfd* fdsAddr; +    struct pollfd newpollfd = {-1, POLLIN, 0};      // initialize fds structure -    // add stdin (c.fd == 0) -    fdsAddr = ArenaPush(fdsArena, pollfd_size); -    memcpy(fdsAddr, &c, pollfd_size); +    newpollfd.fd = 0; +    fdsAddr = ArenaPush(fdsArena, sizeof(*fds)); +    memcpy(fdsAddr, &newpollfd, sizeof(*fds));      // add serverfd -    c.fd = serverfd; -    fdsAddr = ArenaPush(fdsArena, pollfd_size); -    memcpy(fdsAddr, &c, pollfd_size); +    newpollfd.fd = serverfd; +    fdsAddr = ArenaPush(fdsArena, sizeof(*fds)); +    memcpy(fdsAddr, &newpollfd, sizeof(*fds)); +    newpollfd.fd = -1; + +    // Initialize the rest of the fds array +    for (u32 i = FDS_CLIENTS; i < MAX_CONNECTIONS; i++) +        fds[i] = newpollfd;      while (1) {          err = poll(fds, FDS_SIZE, TIMEOUT); @@ -83,81 +94,165 @@ int main(void)              clientfd = accept(serverfd, NULL, NULL);              assert(clientfd != -1);              assert(clientfd > serverfd); +            fprintf(stdout, "New connection(%d).\n", clientfd);              // fill up a hole -            u8 found = 0; -            for (u32 i = FDS_CLIENTS; i < FDS_SIZE; i++) { -                if (fds[i].fd == -1) { -                    fds[i].fd = clientfd; -                    // note we do not have to reset .revents because poll will set it to 0 next time -                    found = 1; +            u8 found; +            for (found = FDS_CLIENTS; found < FDS_SIZE; found++) +                if (fds[found].fd == -1)                      break; -                } -            } - -            // allocate an extra client because there was no empty spot in the fds array -            if (!found) { -                // add client to arena -                fdsAddr = ArenaPush(fdsArena, pollfd_size); -                c.fd = clientfd; -                memcpy(fdsAddr, &c, pollfd_size); +            if (found == FDS_SIZE) { +                // no more space, allocate +                struct pollfd* pollfd = ArenaPush(fdsArena, sizeof(*pollfd)); +                pollfd->fd = clientfd; +                pollfd->events = POLLIN; +            } else if (found == MAX_CONNECTIONS) { +                // TODO: reject connection +                close(clientfd); +                fprintf(stdout, "Max clients reached."); +            } else { +                // hole found +                fds[found].fd = clientfd; +                fds[found].events = POLLIN; +                fprintf(stdout, "Added pollfd(%d).\n", clientfd);              } - -            nclient++; -            fprintf(stdout, "connected(%d).\n", clientfd - serverfd);          } +        // Check for messages from clients          for (u32 i = FDS_CLIENTS; i < (FDS_SIZE); i++) {              if (!(fds[i].revents & POLLIN))                  continue; -            if (fds[i].fd == -1) +            assert(fds[i].fd != -1); + +            fprintf(stdout, "Message(%d).\n", fds[i].fd); +            // If this is the first message from the client it must be a presence message indicated +            // it connected. +            Client* client = clients + i - FDS_CLIENTS; +            if (!client->initialized) { +                fprintf(stdout, " Adding to clients(%d).\n", fds[i].fd); +                // Wait for PresenceMessage from new client to get author information +                HeaderMessage header; +                // TODO: handle wrong message, disconnection, etc. +                nrecv = recv(clientfd, &header, sizeof(header), 0); +                assert(nrecv != -1); +                assert(nrecv == sizeof(header)); +                if (header.type != HEADER_TYPE_PRESENCE) { +                    // TODO: reject connection +                    close(clientfd); +                    continue; +                } +                fprintf(stdout, "  Got header(%d).\n", fds[i].fd); + +                PresenceMessage message; +                // TODO: handle wrong message +                nrecv = recv(clientfd, &message, sizeof(message), 0); +                assert(nrecv != -1); +                assert(nrecv == sizeof(message)); +                fprintf(stdout, "  Got presence message(%d).\n", fds[i].fd); + +                memcpy(client->author, message.author, AUTHOR_LEN); +                client->initialized = True; + +                fprintf(stdout, "  Added to clients(%d): %s\n", fds[i].fd, client->author); + +                // Notify other clients from this new one +                // Reuse header and message +                for (u32 j = FDS_CLIENTS; j < (FDS_SIZE); j++) { +                    if (fds[j].fd == fds[i].fd) +                        continue; +                    if (fds[j].fd == -1) +                        continue; +                    fprintf(stdout, "  Notifying (%d)\n", fds[j].fd); + +                    nsend = send(fds[j].fd, &header, sizeof(header), 0); +                    assert(nsend != -1); +                    assert(nsend == sizeof(header)); +                    nsend = send(fds[j].fd, &message, sizeof(message), 0); +                    assert(nsend != -1); +                    assert(nsend == sizeof(message)); +                }                  continue; +            } -            nrecv = recv(fds[i].fd, buf, bufArena->pos, 0); +            // We received a message, try to parse the header +            HeaderMessage header; +            nrecv = recv(fds[i].fd, &header, sizeof(header), 0);              assert(nrecv != -1);              if (nrecv == 0) { -                fprintf(stdout, "disconnected(%d). \n", fds[i].fd - serverfd); +                fprintf(stdout, "Disconnected(%d). \n", fds[i].fd);                  shutdown(fds[i].fd, SHUT_RDWR); -                close(fds[i].fd); // send close to client -                fds[i].fd = -1;   // ignore in the future +                close(fds[i].fd);                             // send close to client +                fds[i].fd = -1;                               // ignore in the future +                clients[i - FDS_CLIENTS].initialized = False; // deinitialize client +                                                              // +                // Send disconnection to other connected clients +                HeaderMessage header = HEADER_PRESENCEMESSAGE; +                PresenceMessage message = { +                    .type = PRESENCE_TYPE_DISCONNECTED +                }; +                memcpy(message.author, clients[i - FDS_CLIENTS].author, AUTHOR_LEN); +                for (u32 j = FDS_CLIENTS; j < FDS_SIZE; j++) { +                    if (fds[j].fd == fds[i].fd) +                        continue; +                    if (fds[j].fd == -1) +                        continue; +                    nsend = send(fds[j].fd, &header, sizeof(header), 0); +                    assert(nsend != -1); +                    assert(nsend == sizeof(header)); +                    nsend = send(fds[j].fd, &message, sizeof(message), 0); +                    assert(nsend != -1); +                    assert(nsend == sizeof(message)); +                } +                  continue;              } -            recv_len = sizeof(*mbuf) - sizeof(mbuf->text) + mbuf->text_len * sizeof(*mbuf->text); -            if (recv_len > nrecv) { -                // allocate needed space for buf -                if (recv_len > bufArena->pos) -                    ArenaPush(bufArena, recv_len - bufArena->pos); - -                // receive remaining bytes -                u32 nr = recv(fds[i].fd, buf + nrecv, recv_len - nrecv, 0); -                assert(nr != -1); -                nrecv += nr; -                assert(nrecv == recv_len); -            } +            assert(nrecv == sizeof(header)); +            fprintf(stderr, " Received(%d): %d bytes -> " PH_FMT "\n", fds[i].fd, nrecv, PH_ARG(header)); -            // TODO: Do not print the message in the logs -            fprintf(stdout, "message(%d): %d bytes.\n", fds[i].fd - serverfd, nrecv); +            switch (header.type) { +            case HEADER_TYPE_TEXT:; +                TextMessage* message; +                nrecv = recvTextMessage(msgsArena, fds[i].fd, &message); +                fprintf(stderr, " Received(%d): %d bytes -> ", fds[i].fd, nrecv); +                printTextMessage(message, 0); -            for (u32 j = FDS_CLIENTS; j < (FDS_SIZE); j++) { -                if (j == i) -                    continue; -                if (fds[j].fd == -1) -                    continue; +                HeaderMessage header = HEADER_TEXTMESSAGE; +                // Send message to all other clients +                for (u32 j = FDS_CLIENTS; j < FDS_SIZE; j++) { +                    if (fds[j].fd == fds[i].fd) continue; +                    if (fds[j].fd == -1) continue; -                nsend = send(fds[j].fd, buf, nrecv, 0); -                assert(nsend != 1); -                assert(nsend == nrecv); -                fprintf(stdout, "retransmitted(%d->%d).\n", fds[i].fd - serverfd, fds[j].fd - serverfd); -            } +                    // NOTE: I wonder if this is more expensive than constructing a buffer and sending +                    // that +                    u32 nsend_total = 0; +                    nsend = send(fds[j].fd, &header, sizeof(header), 0); +                    assert(nsend != 1); +                    assert(nsend == sizeof(header)); +                    nsend_total += nsend; +                    nsend = send(fds[j].fd, message, TEXTMESSAGE_SIZE, 0); +                    assert(nsend != -1); +                    assert(nsend == TEXTMESSAGE_SIZE); +                    nsend_total += nsend; +                    nsend = send(fds[j].fd, &message->text, message->len * sizeof(*message->text), 0); +                    assert(nsend != -1); +                    assert(nsend == (message->len * sizeof(*message->text))); +                    nsend_total += nsend; -            ArenaPop(msgTextArena, mrecv.text_len); +                    fprintf(stdout, "  Retransmitted(%d->%d) %d bytes.\n", fds[i].fd, fds[j].fd, nsend_total); +                } +                break; +            default: +                fprintf(stdout, " Got unhandled message type '%s' from client %d", headerTypeString(header.type), fds[i].fd); +                continue; +            }          }      } +    ArenaRelease(clientsArena);      ArenaRelease(fdsArena); -    ArenaRelease(msgTextArena); +    ArenaRelease(msgsArena);      return 0;  }  | 
