aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md11
-rw-r--r--arena.h109
-rw-r--r--chatty.c591
-rw-r--r--chatty.h239
-rw-r--r--common.h51
-rw-r--r--compile_flags.txt5
-rw-r--r--send.c50
-rw-r--r--server.c259
8 files changed, 766 insertions, 549 deletions
diff --git a/README.md b/README.md
index 816705b..5922b01 100644
--- a/README.md
+++ b/README.md
@@ -18,20 +18,23 @@ The idea is the following:
- [ ] ctrl+z to suspend
## server
-- [ ] log messages to file (save history)
- [ ] check if when sending and the client is offline (due to connection loss) what happens
- [ ] timeout on recv?
+- [ ] use threads to handle clients/ timeout when receiving because a client could theoretically
+ stall the entire server.
## common
- [x] handle messages that are too large
+- [ ] log messages to file (save history)
- [ ] connect/disconnections messages
- [ ] use IP address / domain
- [ ] chat history
+- [ ] asserting, logging if fail / halt execution
## Protocol
-For now the protocol consists of sending Message type over the network, but in the future something
-more flexible might be required. Because it will make it easier to do things like:
-- request chat logs up to a certain point
+- see `protocol.h` for more info
+- [ ] make sections per message
+- request chat logs from a certain point up to now (history)
- connect to a specific room
- connect/disconnect messages
diff --git a/arena.h b/arena.h
deleted file mode 100644
index 9327a5e..0000000
--- a/arena.h
+++ /dev/null
@@ -1,109 +0,0 @@
-#ifndef ARENA_IMPL
-#define ARENA_IMPL
-
-#include "common.h"
-
-#include <fcntl.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <strings.h>
-#include <sys/mman.h>
-#include <unistd.h>
-
-#define PAGESIZE 4096
-
-#ifndef ARENA_MEMORY
-#define ARENA_MEMORY PAGESIZE
-#endif
-
-struct Arena {
- void *memory;
- u64 size;
- u64 pos;
-} typedef Arena;
-
-// Create an arena
-Arena *ArenaAlloc(void);
-// Destroy an arena
-void ArenaRelease(Arena *arena);
-
-// Push bytes on to the arena | allocating
-void *ArenaPush(Arena *arena, u64 size);
-void *ArenaPushZero(Arena *arena, u64 size);
-
-#define PushArray(arena, type, count) (type *)ArenaPush((arena), sizeof(type)*(count))
-#define PushArrayZero(arena, type, count) (type *)ArenaPushZero((arena), sizeof(type) * (count))
-#define PushStruct(arena, type) PushArray((arena), (type), 1)
-#define PushStructZero(arena, type) PushArrayZero((arena), (type), 1)
-
-// Free some bytes by popping the stack
-void ArenaPop(Arena *arena, u64 size);
-// Get the number of bytes allocated
-u64 ArenaGetPos(Arena *arena);
-
-void ArenaSetPosBack(Arena *arena, u64 pos);
-void ArenaClear(Arena *arena);
-
-Arena *ArenaAlloc(void)
-{
- // NOTE: If the arena is created here the pointer to the memory get's overwritten with size in
- // ArenaPush, so we are forced to use malloc
- Arena *arena = malloc(sizeof(Arena));
-
- arena->memory = mmap(NULL, ARENA_MEMORY, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
- if (arena->memory == MAP_FAILED)
- return NULL;
-
- arena->pos = 0;
- arena->size = ARENA_MEMORY;
-
- return arena;
-}
-
-void ArenaRelease(Arena *arena)
-{
- munmap(arena->memory, ARENA_MEMORY);
- free(arena);
-}
-
-void *ArenaPush(Arena *arena, u64 size)
-{
- u8 *mem;
- mem = (u8 *)arena->memory + arena->pos;
- arena->pos += size;
- return mem;
-}
-
-void *ArenaPushZero(Arena *arena, u64 size)
-{
- u8 *mem;
- mem = (u8 *)arena->memory + arena->pos;
- bzero(mem, size);
- arena->pos += size;
- return mem;
-}
-
-void ArenaPop(Arena *arena, u64 size)
-{
- arena->pos -= size;
-}
-
-u64 ArenaGetPos(Arena *arena)
-{
- return arena->pos;
-}
-
-void ArenaSetPosBack(Arena *arena, u64 pos)
-{
- arena->pos -= pos;
-}
-
-void ArenaClear(Arena *arena)
-{
- bzero(arena->memory, arena->size);
- arena->pos = 0;
-}
-
-#endif
diff --git a/chatty.c b/chatty.c
index 3590471..e721a29 100644
--- a/chatty.c
+++ b/chatty.c
@@ -1,8 +1,7 @@
#define TB_IMPL
#include "termbox2.h"
-#include "arena.h"
-#include "common.h"
+#include "chatty.h"
#include <arpa/inet.h>
#include <assert.h>
@@ -14,16 +13,12 @@
#define TIMEOUT_POLL 60 * 1000
// time to reconnect in seconds
#define TIMEOUT_RECONNECT 1
-// The input buffer is tied to an arena, INPUT_LEN specifies the intial number of wide characters
-// allocated, and the INPUT_GROW specifies by how much the input should grow when it exceeds the
-// buffer.
-#define INPUT_LEN (256 * sizeof(wchar_t))
-#define INPUT_GROW (64 * sizeof(wchar_t))
+#define INPUT_LIMIT 512
// must be of AUTHOR_LEN -1
static u8 username[AUTHOR_LEN] = "(null)";
// file descriptros for polling
-static struct pollfd *fds = NULL;
+static struct pollfd* fds = NULL;
// mutex for locking fds when in thread_reconnect()
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -32,226 +27,21 @@ enum { FDS_SERVER = 0,
FDS_RESIZE,
FDS_MAX };
-void *thread_reconnect(void *address_ptr);
-void fillstr(wchar_t *str, wchar_t ch, u32 len);
-void popup(u32 fg, u32 bg, char *text);
-u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32 bg_pfx, char *pfx, s32 limit);
-void screen_home(Arena *msgsArena, wchar_t input[]);
-
-int main(int argc, char **argv)
+// fill str array with char
+void
+fillstr(u32* str, u32 ch, u32 len)
{
- // Use first argument as username
- if (argc > 1) {
- u32 arg_len = strlen(argv[1]);
- assert(arg_len <= AUTHOR_LEN - 1);
- memcpy(username, argv[1], arg_len);
- username[arg_len] = '\0';
- }
-
- s32 err = 0; // error code for functions
- Arena *msgsArena = ArenaAlloc(); // Messages received & sent
- Arena *msgTextArena = ArenaAlloc(); // Text from received & sent messages
- Arena *bufArena = ArenaAlloc(); // data in buf
- u8 *buf = ArenaPush(bufArena, STREAM_LIMIT); // buffer used for receiving and sending messages
- Message *mbuf = (Message *)buf; // index for buf as a message
- u32 nrecv = 0; // number of bytes received
- u32 recv_len = 0; // total length of the received stream
- u32 nsend = 0; // number of bytes sent
- Message *recv_msg = NULL; // message received pushed on the msgsArena
-
- Arena *inputArena = ArenaAlloc(); // data in input
- wchar_t *input = ArenaPush(inputArena, INPUT_LEN); // input buffer
- u32 input_len = 0; // length of the input
-
- struct tb_event ev; // event fork keypress & resize
- u8 quit = 0; // boolean to indicate if we want to quit the main loop
- u8 *quitmsg = NULL; // this string will be printed before returning from main
-
- pthread_t thr_rec; // thread for reconnecting to server when disconnected
-
- // poopoo C cannot infer type
- fds = (struct pollfd[FDS_MAX]){
- {-1, POLLIN, 0}, // FDS_SERVER
- {-1, POLLIN, 0}, // FDS_TTY
- {-1, POLLIN, 0}, // FDS_RESIZE
- };
-
- const struct sockaddr_in address = {
- AF_INET,
- htons(PORT),
- {0},
- };
-
- // Connecting to server
- {
- s32 serverfd;
- serverfd = socket(AF_INET, SOCK_STREAM, 0);
- assert(serverfd > 2); // greater than STDERR
-
- err = connect(serverfd, (struct sockaddr *)&address, sizeof(address));
- if (err != 0) {
- perror("Server");
- return 1;
- }
- fds[FDS_SERVER].fd = serverfd;
- }
-
- // for wide character printing
- assert(setlocale(LC_ALL, "") != NULL);
-
- // init
- tb_init();
- tb_get_fds(&fds[FDS_TTY].fd, &fds[FDS_RESIZE].fd);
-
- screen_home(msgsArena, input);
- tb_present();
-
- // main loop
- while (!quit) {
- err = poll(fds, FDS_MAX, TIMEOUT_POLL);
- // ignore resize events and use them to redraw the screen
- assert(err != -1 || errno == EINTR);
-
- tb_clear();
-
- if (fds[FDS_SERVER].revents & POLLIN) {
- // got data from server
- nrecv = recv(fds[FDS_SERVER].fd, buf, STREAM_LIMIT, 0);
- assert(nrecv != -1);
-
- // Server disconnects
- if (nrecv == 0) {
- // close diconnected server's socket
- err = close(fds[FDS_SERVER].fd);
- assert(err == 0);
- fds[FDS_SERVER].fd = -1; // ignore
- // start trying to reconnect in a thread
- err = pthread_create(&thr_rec, NULL, &thread_reconnect, (void *)&address);
- assert(err == 0);
-
- } else {
- recv_msg = ArenaPush(msgsArena, sizeof(*mbuf));
- // copy everything but the text
- memcpy(recv_msg, buf, AUTHOR_LEN + TIMESTAMP_LEN + sizeof(mbuf->text_len));
- // allocate memeory for text
- recv_msg->text = ArenaPush(msgTextArena, mbuf->text_len * sizeof(*mbuf->text));
-
- // If we did not receive the entire message receive the remaining part
- recv_len = sizeof(*recv_msg) - sizeof(recv_msg->text) + recv_msg->text_len * sizeof(*recv_msg->text);
- if (recv_len > nrecv) {
- // allocate needed space for buf
- if (recv_len > bufArena->pos)
- ArenaPush(bufArena, recv_len - bufArena->pos);
-
- // receive remaining bytes
- u32 nr = recv(fds[FDS_SERVER].fd, buf + nrecv, recv_len - nrecv, 0);
- assert(nr != -1);
- nrecv += nr;
- assert(nrecv == recv_len);
- }
-
- // copy the text to the allocated space
- memcpy(recv_msg->text, buf + TIMESTAMP_LEN + AUTHOR_LEN + sizeof(recv_msg->text_len), recv_msg->text_len * sizeof(*mbuf->text));
- }
- }
-
- if (fds[FDS_TTY].revents & POLLIN) {
- // got a key event
- tb_poll_event(&ev);
-
- switch (ev.key) {
- case TB_KEY_CTRL_W:
- // delete consecutive whitespace
- while (input_len) {
- if (input[input_len - 1] == L' ') {
- input[input_len - 1] = 0;
- input_len--;
- continue;
- }
- break;
- }
- // delete until whitespace
- while (input_len) {
- if (input[input_len - 1] == L' ')
- break;
- // erase
- input[input_len - 1] = 0;
- input_len--;
- }
- break;
- case TB_KEY_CTRL_D:
- case TB_KEY_CTRL_C:
- quit = 1;
- break;
- case TB_KEY_CTRL_M: // send message
- if (input_len == 0)
- // do not send empty message
- break;
- if (fds[FDS_SERVER].fd == -1)
- // do not send message to disconnected server
- break;
-
- // null terminate
- input[input_len] = 0;
- input_len++;
- // TODO: check size does not exceed buffer
-
- // add to msgsArena
- Message *sendmsg = ArenaPush(msgsArena, sizeof(Message));
- memcpy(sendmsg->author, username, AUTHOR_LEN);
- message_timestamp(sendmsg->timestamp);
- sendmsg->text_len = input_len;
- sendmsg->text = ArenaPush(msgTextArena, input_len * sizeof(wchar_t));
- // copy the text to the allocated space
- memcpy(sendmsg->text, input, input_len * sizeof(wchar_t));
-
- // Send the message
- // copy everything but the text
- memcpy(buf, sendmsg, AUTHOR_LEN + TIMESTAMP_LEN + sizeof(wchar_t));
- memcpy(&mbuf->text, input, input_len * sizeof(wchar_t));
- nsend = send(fds[FDS_SERVER].fd, buf, MESSAGELENP(mbuf), 0);
- assert(nsend > 0);
-
- case TB_KEY_CTRL_U: // clear input
- bzero(input, input_len * sizeof(wchar_t));
- input_len = 0;
- break;
- default:
- if (ev.ch == 0)
- break;
-
- // append key to input buffer
- input[input_len] = ev.ch;
- input_len++;
- if (input_len * sizeof(*input) == inputArena->pos)
- ArenaPush(inputArena, INPUT_GROW);
- }
- if (quit)
- break;
- }
-
- // These are used to redraw the screen from threads
- if (fds[FDS_RESIZE].revents & POLLIN) {
- // ignore
- tb_poll_event(&ev);
- }
-
- screen_home(msgsArena, input);
-
- tb_present();
- }
-
- tb_shutdown();
-
- if (quitmsg != NULL)
- printf("%s\n", quitmsg);
-
- ArenaRelease(msgTextArena);
- ArenaRelease(msgsArena);
- ArenaRelease(bufArena);
- ArenaRelease(inputArena);
+ for (u32 i = 0; i < len; i++)
+ str[i] = ch;
+}
- return 0;
+// Centered popup displaying message in the appropriate cololrs
+void
+popup(u32 fg, u32 bg, char* text)
+{
+ u32 len = strlen(text);
+ assert(len > 0);
+ tb_print(global.width / 2 - len / 2, global.height / 2, fg, bg, text);
}
// Takes as paramter `struct sockaddr_in*` and uses it to connect to the server.
@@ -259,18 +49,20 @@ int main(int argc, char **argv)
// paramter. To indicate that the server is offline the fds[FDS_SERVER] is set to -1. When online
// it is set to a non-zero value.
// Returns NULL.
-void *thread_reconnect(void *address_ptr)
+void*
+thread_reconnect(void* address_ptr)
{
u32 serverfd, err;
- struct sockaddr_in *address = address_ptr;
+ struct sockaddr_in* address = address_ptr;
while (1) {
serverfd = socket(AF_INET, SOCK_STREAM, 0);
assert(serverfd > 2); // greater than STDERR
- err = connect(serverfd, (struct sockaddr *)address, sizeof(*address));
+ err = connect(serverfd, (struct sockaddr*)address, sizeof(*address));
if (err == 0)
break;
assert(errno == ECONNREFUSED);
+ // TODO: faster reconnection? (too many open files)
sleep(TIMEOUT_RECONNECT);
}
@@ -286,40 +78,26 @@ void *thread_reconnect(void *address_ptr)
return NULL;
}
-// fill str array with char
-void fillstr(wchar_t *str, wchar_t ch, u32 len)
-{
- for (u32 i = 0; i < len; i++)
- str[i] = ch;
-}
-
-// Centered popup displaying message in the appropriate cololrs
-void popup(u32 fg, u32 bg, char *text)
-{
- u32 len = strlen(text);
- assert(len > 0);
- tb_print(global.width / 2 - len / 2, global.height / 2, fg, bg, text);
-}
-
-// Print `text` of text_len` wide characters wrapped to limit. x, y, fg and
+// Print `text` wrapped to limit. x, y, fg and
// bg will be passed to the tb_printf() function calls.
// pfx is a string that will be printed first and will not be wrapped on characters like msg->text,
// this is useful when for example: printing messages and wanting to have consistent
// timestamp+author name.
// Returns the number of lines printed.
-// TODO: remove text_len and calculate it in the function
// TODO: add y limit
// TODO:(bug) text after pfx is wrapped one too soon
// TODO: text == NULL to know how many lines *would* be printed
+// - no this should be a separate function
// TODO: check if text[i] goes out of bounds
-u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32 bg_pfx, char *pfx, s32 limit)
+u32
+tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, u32* text, s32 text_len, u32 fg_pfx, u32 bg_pfx, u8* pfx, s32 limit)
{
assert(limit > 0);
// lines y, incremented after each wrap
s32 ly = y;
// character the text is split on
- wchar_t t = 0;
+ u32 t = 0;
// index used for searching in string
s32 i = limit;
// previous i for windowing through the text
@@ -327,17 +105,13 @@ u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32
// used when retrying to get a longer limit
u32 failed = 0;
- u32 text_len = 0;
- while (text[text_len] != 0)
- text_len++;
-
// NOTE: We can assume that we need to wrap, therefore print a newline after the prefix string
if (pfx != NULL) {
tb_printf(x, ly, fg_pfx, bg_pfx, "%s", pfx);
// If the text fits on one line print the text and return
// Otherwise print the text on the next line
- s32 pfx_len = strlen(pfx);
+ s32 pfx_len = strlen((char*)pfx);
if (limit > pfx_len + text_len) {
tb_printf(x + pfx_len, y, fg, bg, "%ls", text);
return 1;
@@ -394,15 +168,14 @@ u32 tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, wchar_t *text, u32 fg_pfx, u32
}
// home screen, the first screen the user sees
-// it displays a prompt for user input and the received messages from msgsArena
-void screen_home(Arena *msgsArena, wchar_t input[])
+// it displays a prompt with the user input of input_len wide characters
+// and the received messages from msgsArena
+void
+screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len)
{
// config options
- const u32 box_max_len = 80;
- const u32 box_x = 0, box_y = global.height - 3, box_pad_x = 1, box_mar_x = 1, box_bwith = 1, box_height = 3;
- u32 input_len = 0;
- while (input[input_len] != 0)
- input_len++;
+ const s32 box_max_len = 80;
+ const s32 box_x = 0, box_y = global.height - 3, box_pad_x = 1, box_mar_x = 1, box_bwith = 1, box_height = 3;
const u32 prompt_x = box_x + box_pad_x + box_mar_x + box_bwith + input_len;
// the minimum height required is the hight for the box prompt
@@ -428,28 +201,57 @@ void screen_home(Arena *msgsArena, wchar_t input[])
if (freesp <= 0)
goto draw_prompt;
- Message *messages = msgsArena->memory;
- assert(messages != NULL);
+ u8* addr = msgsArena->addr;
+ assert(addr != NULL);
// on what line to print the current message, used for scrolling
u32 msg_y = 0;
- u32 nmessages = (msgsArena->pos / sizeof(Message));
u32 offs = (nmessages > freesp) ? nmessages - freesp : 0;
+ // In each case statement advance the addr pointer by the size of the message
for (u32 i = offs; i < nmessages; i++) {
- // Color user's own messages
- u32 fg = 0;
- if (strncmp((char *)username, (char *)messages[i].author, AUTHOR_LEN) == 0) {
- fg = TB_CYAN;
- } else {
- fg = TB_MAGENTA;
+ HeaderMessage* header = (HeaderMessage*)addr;
+ addr += sizeof(*header);
+
+ switch (header->type) {
+ case HEADER_TYPE_TEXT: {
+ TextMessage* message = (TextMessage*)addr;
+
+ u32 fg = 0;
+ if (strncmp((char*)username, (char*)message->author, AUTHOR_LEN) == 0) {
+ fg = TB_CYAN;
+ } else {
+ fg = TB_MAGENTA;
+ }
+ // prefix is of format "HH:MM:SS [<author>] ", so
+ u8 pfx[AUTHOR_LEN - 1 + TIMESTAMP_LEN - 1 + 4 + 1] = {0};
+ u8 timestamp[TIMESTAMP_LEN];
+ formatTimestamp(timestamp, message->timestamp);
+ sprintf((char*)pfx, "%s [%s] ", timestamp, message->author);
+ msg_y += tb_printf_wrap(0, msg_y, TB_WHITE, 0, (u32*)&message->text, message->len, fg, 0, pfx, global.width);
+
+ u32 message_size = TEXTMESSAGE_SIZE + message->len * sizeof(*message->text);
+ addr += message_size;
+ break;
+ }
+ case HEADER_TYPE_PRESENCE: {
+ PresenceMessage* message = (PresenceMessage*)addr;
+ tb_printf(0, msg_y, 0, 0, " [%s] *%s*", message->author, presenceTypeString(message->type));
+ msg_y++;
+ addr += sizeof(*message);
+ break;
+ }
+ case HEADER_TYPE_HISTORY: {
+ HistoryMessage* message = (HistoryMessage*)addr;
+ addr += sizeof(*message);
+ // TODO: implement
+ }
+ default: {
+ tb_printf(0, msg_y, 0, 0, "%s", headerTypeString(header->type));
+ msg_y++;
+ break;
+ }
}
-
- u32 ty = 0;
- char pfx[AUTHOR_LEN + TIMESTAMP_LEN - 2 + 5] = {0};
- sprintf(pfx, "%s [%s] ", messages[i].timestamp, messages[i].author);
- ty = tb_printf_wrap(0, msg_y, TB_WHITE, 0, messages[i].text, fg, 0, pfx, global.width);
- msg_y += ty;
}
draw_prompt:
@@ -469,10 +271,10 @@ void screen_home(Arena *msgsArena, wchar_t input[])
box_len = global.width - box_mar_x * 2;
// +2 for corners and null terminator
- wchar_t box_up[box_len + 1];
- wchar_t box_in[box_len + 1];
- wchar_t box_down[box_len + 1];
- wchar_t lr = L'─', ur = L'╭', rd = L'╮', dr = L'╰', ru = L'╯', ud = L'│';
+ u32 box_up[box_len + 1];
+ u32 box_in[box_len + 1];
+ u32 box_down[box_len + 1];
+ u32 lr = L'─', ur = L'╭', rd = L'╮', dr = L'╰', ru = L'╯', ud = L'│';
// top bar
box_up[0] = ur;
@@ -503,7 +305,7 @@ void screen_home(Arena *msgsArena, wchar_t input[])
return;
if (input_len > freesp) {
- wchar_t *text_offs = input + (input_len - freesp);
+ u32* text_offs = input + (input_len - freesp);
tb_printf(box_x + box_mar_x + box_pad_x + box_bwith, box_y + 1, 0, 0, "%ls", text_offs);
global.cursor_x = box_x + box_pad_x + box_mar_x + box_bwith + freesp;
} else {
@@ -518,3 +320,232 @@ void screen_home(Arena *msgsArena, wchar_t input[])
}
}
}
+
+int
+main(int argc, char** argv)
+{
+ // Use first argument as username
+ if (argc > 1) {
+ u32 arg_len = strlen(argv[1]);
+ assert(arg_len <= AUTHOR_LEN - 1);
+ memcpy(username, argv[1], arg_len);
+ username[arg_len] = '\0';
+ }
+
+ s32 err = 0; // error code for functions
+
+ Arena* msgsArena = ArenaAlloc(Megabytes(64)); // Messages received & sent
+ u32 nmessages = 0; // Number of messages in msgsArena
+ s32 nrecv = 0; // number of bytes received
+ s32 nsend = 0; // number of bytes sent
+
+ u32 input[INPUT_LIMIT] = {0}; // input buffer
+ u32 ninput = 0; // number of characters in input
+
+ struct tb_event ev; // event fork keypress & resize
+ u8 quit = 0; // boolean to indicate if we want to quit the main loop
+ u8* quitmsg = NULL; // this string will be printed before returning from main
+
+ pthread_t thr_rec; // thread for reconnecting to server when disconnected
+
+ // poopoo C cannot infer type
+ fds = (struct pollfd[FDS_MAX]){
+ {-1, POLLIN, 0}, // FDS_SERVER
+ {-1, POLLIN, 0}, // FDS_TTY
+ {-1, POLLIN, 0}, // FDS_RESIZE
+ };
+
+ const struct sockaddr_in address = {
+ AF_INET,
+ htons(PORT),
+ {0},
+ {0},
+ };
+
+ // Connecting to server
+ {
+ s32 serverfd;
+ serverfd = socket(AF_INET, SOCK_STREAM, 0);
+ assert(serverfd > 2); // greater than STDERR
+
+ err = connect(serverfd, (struct sockaddr*)&address, sizeof(address));
+ if (err != 0) {
+ perror("Server");
+ return 1;
+ }
+ fds[FDS_SERVER].fd = serverfd;
+
+ // Introduce ourselves
+ HeaderMessage header = HEADER_PRESENCEMESSAGE;
+ PresenceMessage message = {.type = PRESENCE_TYPE_CONNECTED};
+ memcpy(message.author, username, AUTHOR_LEN);
+ nsend = send(serverfd, &header, sizeof(header), 0);
+ assert(nsend != -1);
+ assert(nsend == sizeof(header));
+ nsend = send(serverfd, &message, sizeof(message), 0);
+ assert(nsend != -1);
+ assert(nsend == sizeof(message));
+ }
+
+ // for wide character printing
+ assert(setlocale(LC_ALL, "") != NULL);
+
+ // init
+ tb_init();
+ tb_get_fds(&fds[FDS_TTY].fd, &fds[FDS_RESIZE].fd);
+
+ screen_home(msgsArena, nmessages, input, ninput);
+ tb_present();
+
+ // main loop
+ while (!quit) {
+ err = poll(fds, FDS_MAX, TIMEOUT_POLL);
+ // ignore resize events and use them to redraw the screen
+ assert(err != -1 || errno == EINTR);
+
+ tb_clear();
+
+ if (fds[FDS_SERVER].revents & POLLIN) {
+ // got data from server
+ HeaderMessage header;
+ nrecv = recv(fds[FDS_SERVER].fd, &header, sizeof(header), 0);
+ assert(nrecv != -1);
+
+ // Server disconnects
+ if (nrecv == 0) {
+ // close diconnected server's socket
+ err = close(fds[FDS_SERVER].fd);
+ assert(err == 0);
+ fds[FDS_SERVER].fd = -1; // ignore
+ // start trying to reconnect in a thread
+ err = pthread_create(&thr_rec, NULL, &thread_reconnect, (void*)&address);
+ assert(err == 0);
+ } else {
+ // TODO: validate version
+ // if (header.version == PROTOCOL_VERSION)
+ // continue;
+
+ void* addr = ArenaPush(msgsArena, sizeof(header));
+ memcpy(addr, &header, sizeof(header));
+
+ switch (header.type) {
+ case HEADER_TYPE_TEXT:
+ recvTextMessage(msgsArena, fds[FDS_SERVER].fd, NULL);
+ nmessages++;
+ break;
+ case HEADER_TYPE_PRESENCE:;
+ PresenceMessage* message = ArenaPush(msgsArena, sizeof(*message));
+ nrecv = recv(fds[FDS_SERVER].fd, message, sizeof(*message), 0);
+ assert(nrecv != -1);
+ assert(nrecv == sizeof(*message));
+ nmessages++;
+ break;
+ default:
+ // TODO: log
+ break;
+ }
+ }
+ }
+
+ if (fds[FDS_TTY].revents & POLLIN) {
+ // got a key event
+ tb_poll_event(&ev);
+
+ switch (ev.key) {
+ case TB_KEY_CTRL_W:
+ // delete consecutive whitespace
+ while (ninput) {
+ if (input[ninput - 1] == L' ') {
+ input[ninput - 1] = 0;
+ ninput--;
+ continue;
+ }
+ break;
+ }
+ // delete until whitespace
+ while (ninput) {
+ if (input[ninput - 1] == L' ')
+ break;
+ // erase
+ input[ninput - 1] = 0;
+ ninput--;
+ }
+ break;
+ case TB_KEY_CTRL_D:
+ case TB_KEY_CTRL_C:
+ quit = 1;
+ break;
+ case TB_KEY_CTRL_M: // send message
+ if (ninput == 0)
+ // do not send empty message
+ break;
+ if (fds[FDS_SERVER].fd == -1)
+ // do not send message to disconnected server
+ break;
+
+ // null terminate
+ input[ninput] = 0;
+ ninput++;
+
+ // Save header
+ HeaderMessage header = HEADER_TEXTMESSAGE;
+ void* addr = ArenaPush(msgsArena, sizeof(header));
+ memcpy(addr, &header, sizeof(header));
+
+ // Save message
+ TextMessage* sendmsg = ArenaPush(msgsArena, TEXTMESSAGE_SIZE);
+ memcpy(sendmsg->author, username, AUTHOR_LEN);
+ sendmsg->timestamp = time(NULL);
+ sendmsg->len = ninput;
+
+ u32 text_size = ninput * sizeof(*input);
+ ArenaPush(msgsArena, text_size);
+ memcpy(&sendmsg->text, input, text_size);
+
+ // Send message
+ nsend = send(fds[FDS_SERVER].fd, &header, sizeof(header), 0);
+ assert(nsend != -1);
+ nsend = send(fds[FDS_SERVER].fd, sendmsg, TEXTMESSAGE_SIZE + TEXTMESSAGE_TEXT_SIZE((*sendmsg)), 0);
+ assert(nsend != -1);
+
+ nmessages++;
+ // also clear input
+ case TB_KEY_CTRL_U: // clear input
+ bzero(input, ninput * sizeof(*input));
+ ninput = 0;
+ break;
+ default:
+ if (ev.ch == 0)
+ break;
+ // TODO: logging
+ if (ninput == INPUT_LIMIT - 1) // last byte reserved for \0
+ break;
+
+ // append key to input buffer
+ input[ninput] = ev.ch;
+ ninput++;
+ }
+ if (quit)
+ break;
+ }
+
+ // These are used to redraw the screen from threads
+ if (fds[FDS_RESIZE].revents & POLLIN) {
+ // ignore
+ tb_poll_event(&ev);
+ }
+
+ screen_home(msgsArena, nmessages, input, ninput);
+
+ tb_present();
+ }
+
+ tb_shutdown();
+
+ if (quitmsg != NULL)
+ printf("%s\n", quitmsg);
+
+ ArenaRelease(msgsArena);
+
+ return 0;
+}
diff --git a/chatty.h b/chatty.h
new file mode 100644
index 0000000..916ae4a
--- /dev/null
+++ b/chatty.h
@@ -0,0 +1,239 @@
+#ifndef CHATTY_IMPL
+
+#include <assert.h>
+#include <locale.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <time.h>
+#include <wchar.h>
+
+typedef uint8_t u8;
+typedef uint16_t u16;
+typedef uint32_t u32;
+typedef uint64_t u64;
+typedef int8_t s8;
+typedef int16_t s16;
+typedef int32_t s32;
+typedef int64_t s64;
+typedef enum {
+ False = 0,
+ True = 1
+} Bool;
+
+// port for chatty
+#define PORT 9983
+
+#define Kilobytes(Value) ((Value) * 1024)
+#define Megabytes(Value) (Kilobytes(Value) * 1024)
+#define Gigabytes(Value) (Megabytes((u64)Value) * 1024)
+#define Terabytes(Value) (Gigabytes((u64)Value) * 1024)
+#define PAGESIZE 4096
+
+struct Arena {
+ void* addr;
+ u64 size;
+ u64 pos;
+} typedef Arena;
+
+#define PushArray(arena, type, count) (type*)ArenaPush((arena), sizeof(type) * (count))
+#define PushArrayZero(arena, type, count) (type*)ArenaPushZero((arena), sizeof(type) * (count))
+#define PushStruct(arena, type) PushArray((arena), (type), 1)
+#define PushStructZero(arena, type) PushArrayZero((arena), (type), 1)
+
+Arena*
+ArenaAlloc(u64 size)
+{
+ Arena* arena = (Arena*)malloc(sizeof(Arena));
+
+ arena->addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ if (arena->addr == MAP_FAILED)
+ return NULL;
+ arena->pos = 0;
+ arena->size = size;
+
+ return arena;
+}
+
+void
+ArenaRelease(Arena* arena)
+{
+ munmap(arena->addr, arena->size);
+ free(arena);
+}
+
+void*
+ArenaPush(Arena* arena, u64 size)
+{
+ u8* mem;
+ mem = (u8*)arena->addr + arena->pos;
+ arena->pos += size;
+ return mem;
+}
+
+/// Protocol
+// - every message has format Header + Message
+// TODO: authentication
+// TODO: encryption
+
+/// Protocol Header
+// - 2 bytes for version
+// - 1 byte for message type
+// - 16 bytes for checksum
+//
+// Text Message
+// - 12 bytes for the author
+// - 8 bytes for the timestamp
+// - 2 bytes for the text length
+// - x*4 bytes for the text
+//
+// History Message
+// This message is for requesting messages sent after a timestamp.
+// - 8 bytes for the timestamp
+
+/// Naming convention
+// Messages end with the Message suffix (eg. TextMessag, HistoryMessage)
+// A function that is coupled to a type works like
+// <noun><type> eg. (printTextMessage, formatTimestamp)
+
+#define PROTOCOL_VERSION 0
+
+typedef struct {
+ u16 version;
+ u8 type;
+} HeaderMessage;
+
+enum { HEADER_TYPE_TEXT = 0,
+ HEADER_TYPE_HISTORY,
+ HEADER_TYPE_PRESENCE };
+#define HEADER_TEXTMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_TEXT};
+#define HEADER_HISTORYMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_HISTORY};
+#define HEADER_PRESENCEMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_PRESENCE};
+
+// Size of author string including null terminator
+#define AUTHOR_LEN 13
+// Size of formatted timestamp string including null terminator
+#define TIMESTAMP_LEN 9
+
+typedef struct {
+ u8 checksum[16];
+ u8 author[AUTHOR_LEN];
+ u64 timestamp;
+ u16 len; // including null terminator
+ u32* text; // placeholder for indexing
+ // TODO: 0-length field?
+} TextMessage;
+
+// Size of TextMessage without text pointer, used when receiving the message over a stream
+#define TEXTMESSAGE_TEXT_SIZE(m) (m.len * sizeof(*m.text))
+#define TEXTMESSAGE_SIZE (sizeof(TextMessage) - sizeof(u32*))
+
+typedef struct {
+ u64 timestamp;
+} HistoryMessage;
+
+typedef struct {
+ u8 author[AUTHOR_LEN];
+ u8 type;
+} PresenceMessage;
+enum { PRESENCE_TYPE_CONNECTED = 0,
+ PRESENCE_TYPE_DISCONNECTED };
+
+// Returns string for type byte in HeaderMessage
+u8*
+headerTypeString(u8 type)
+{
+ switch (type) {
+ case HEADER_TYPE_TEXT: return (u8*)"TextMessage";
+ case HEADER_TYPE_HISTORY: return (u8*)"HistoryMessage";
+ case HEADER_TYPE_PRESENCE: return (u8*)"PresenceMessage";
+ default: return (u8*)"Unknown";
+ }
+}
+
+u8*
+presenceTypeString(u8 type)
+{
+ switch (type) {
+ case PRESENCE_TYPE_CONNECTED: return (u8*)"connected";
+ case PRESENCE_TYPE_DISCONNECTED: return (u8*)"disconnected";
+ default: return (u8*)"Unknown";
+ }
+}
+
+// from Tsoding video on minicel (https://youtu.be/HCAgvKQDJng?t=4546)
+// sv(https://github.com/tsoding/sv)
+#define PH_FMT "header: v%d %s(%d)"
+#define PH_ARG(header) header.version, headerTypeString(header.type), header.type
+
+void
+formatTimestamp(u8 tmsp[TIMESTAMP_LEN], u64 t)
+{
+ struct tm* ltime;
+ ltime = localtime((time_t*)&t);
+ strftime((char*)tmsp, TIMESTAMP_LEN, "%H:%M:%S", ltime);
+}
+
+void
+printTextMessage(TextMessage* message, u8 wide)
+{
+ u8 timestamp[TIMESTAMP_LEN] = {0};
+ formatTimestamp(timestamp, message->timestamp);
+
+ assert(setlocale(LC_ALL, "") != NULL);
+
+ if (wide)
+ wprintf(L"TextMessage: %s [%s] %ls\n", timestamp, message->author, (wchar_t*)&message->text);
+ else {
+ u8 str[message->len];
+ wcstombs((char*)str, (wchar_t*)&message->text, message->len * sizeof(*message->text));
+ printf("TextMessage: %s [%s] (%d)%s\n", timestamp, message->author, message->len, str);
+ }
+}
+
+// Receive a message from fd and store it to the msgsArena,
+// if dest is not NULL point it to the new message created on msgsArena
+// Returns the number of bytes received
+u32
+recvTextMessage(Arena* msgsArena, u32 fd, TextMessage** dest)
+{
+ s32 nrecv = 0;
+
+ TextMessage* message = ArenaPush(msgsArena, TEXTMESSAGE_SIZE);
+ if (dest != NULL)
+ *dest = message;
+
+ // Receive everything but the text so we can know the text's size and act accordingly
+ nrecv = recv(fd, message, TEXTMESSAGE_SIZE, 0);
+ assert(nrecv != -1);
+ assert(nrecv == TEXTMESSAGE_SIZE);
+
+ nrecv = 0;
+
+ // Allocate memory for text and receive in that memory
+ u32 text_size = message->len * sizeof(*message->text);
+ ArenaPush(msgsArena, text_size);
+
+ nrecv = recv(fd, (u8*)&message->text, text_size, 0);
+ assert(nrecv != -1);
+ assert(nrecv == message->len * sizeof(*message->text));
+
+ return TEXTMESSAGE_SIZE + nrecv;
+}
+
+u32
+wstrlen(u32* str)
+{
+ u32 i = 0;
+ while (str[i] != 0)
+ i++;
+ return i;
+}
+
+#endif
+#define CHATTY_H
diff --git a/common.h b/common.h
deleted file mode 100644
index 700d741..0000000
--- a/common.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#ifndef COMMON_H
-#define COMMON_H
-
-#include <stdarg.h>
-#include <stdio.h>
-#include <time.h>
-#include <unistd.h>
-#include <stdint.h>
-#include <stddef.h>
-#include <wchar.h>
-
-#define AUTHOR_LEN 13
-#define TIMESTAMP_LEN 9
-// port to listen on
-#define PORT 9983
-// How much bytes can be sent at once over the stream
-// This is also Intial size for buffer used to send and receive data,
-// "initial" because the buffer is tied to an arena and can grow.
-// Note: must be greater than sizeof(Message) - sizeof(Message.text)
-#define STREAM_LIMIT 64
-
-typedef uint8_t u8;
-typedef uint16_t u16;
-typedef uint32_t u32;
-typedef uint64_t u64;
-typedef int8_t s8;
-typedef int16_t s16;
-typedef int32_t s32;
-typedef int64_t s64;
-
-struct Message {
- u8 author[AUTHOR_LEN];
- u8 timestamp[TIMESTAMP_LEN];
- // includes null terminator
- u16 text_len;
- wchar_t *text;
-} typedef Message;
-
-#define MESSAGELEN(m) (AUTHOR_LEN + TIMESTAMP_LEN + sizeof(m.text_len)*sizeof(wchar_t) + m.text_len)
-#define MESSAGELENP(m) (AUTHOR_LEN + TIMESTAMP_LEN + sizeof(m->text_len) + m->text_len*(sizeof(wchar_t)))
-
-void message_timestamp(u8 str[TIMESTAMP_LEN])
-{
- time_t now;
- struct tm *ltime;
- time(&now);
- ltime = localtime(&now);
- strftime((char *)str, TIMESTAMP_LEN, "%H:%M:%S", ltime);
-}
-
-#endif
diff --git a/compile_flags.txt b/compile_flags.txt
new file mode 100644
index 0000000..eb526c6
--- /dev/null
+++ b/compile_flags.txt
@@ -0,0 +1,5 @@
+-Wall
+-Wextra
+-pedantic
+-std=c99
+-O3
diff --git a/send.c b/send.c
index def9e0b..f526b4c 100644
--- a/send.c
+++ b/send.c
@@ -7,17 +7,17 @@
#include <string.h>
#include <unistd.h>
-#include "arena.h"
-#include "common.h"
+#include "chatty.h"
-int main(int argc, char **argv)
+int
+main(int argc, char** argv)
{
if (argc < 3) {
fprintf(stderr, "usage: send <author> <msg>\n");
return 1;
}
- u32 err, serverfd, nsend;
+ s32 err, serverfd, nsend;
serverfd = socket(AF_INET, SOCK_STREAM, 0);
assert(serverfd != -1);
@@ -27,37 +27,41 @@ int main(int argc, char **argv)
htons(PORT),
{0},
};
- err = connect(serverfd, (struct sockaddr *)&address, sizeof(address));
+ err = connect(serverfd, (struct sockaddr*)&address, sizeof(address));
assert(err == 0);
- u32 author_len = strlen(argv[1]); // add 1 for null terminator
- assert(author_len + 1 <= AUTHOR_LEN);
+ HeaderMessage header = HEADER_TEXTMESSAGE;
+ TextMessage* message;
// convert text to wide string
u32 text_len = strlen(argv[2]) + 1;
- wchar_t text_wide[text_len];
- u32 size = mbstowcs(text_wide, argv[2], text_len - 1);
+ u32 text_wide[text_len];
+ u32 size = mbstowcs((wchar_t*)text_wide, argv[2], text_len - 1);
assert(size == text_len - 1);
- // null terminate
text_wide[text_len - 1] = 0;
- Arena *bufArena = ArenaAlloc();
- u8 *buf = ArenaPush(bufArena, (text_len - 1) * sizeof(*text_wide));
- Message *mbuf = (Message *)buf;
+ u32 author_len = strlen(argv[1]);
+ assert(author_len + 1 <= AUTHOR_LEN); // add 1 for null terminator
- memcpy(mbuf->author, argv[1], author_len);
- message_timestamp(mbuf->timestamp);
- mbuf->text_len = text_len;
- memcpy(&mbuf->text, text_wide, mbuf->text_len * sizeof(wchar_t));
+ u8 buf[text_len * sizeof(*text_wide) + TEXTMESSAGE_SIZE];
+ bzero(buf, sizeof(buf));
+ message = (TextMessage*)buf;
- nsend = send(serverfd, buf, MESSAGELENP(mbuf), 0);
+ memcpy(message->author, argv[1], author_len);
+ message->timestamp = time(NULL);
+ message->len = text_len;
+ memcpy(&message->text, text_wide, text_len * sizeof(*message->text));
- assert(nsend >= 0);
+ nsend = send(serverfd, &header, sizeof(header), 0);
+ assert(nsend != -1);
+ printf("header bytes sent: %d\n", nsend);
+ nsend = send(serverfd, buf, sizeof(buf), 0);
+ assert(nsend != -1);
- printf("text_len: %d\n", text_len);
- fprintf(stdout, "Sent %d bytes.\n", nsend);
-
- ArenaRelease(bufArena);
+ printf("text length: %d\n", text_len);
+ printf("buf size: %lu\n", sizeof(buf));
+ printf("text size: %lu\n", sizeof(*text_wide) * text_len);
+ printf("message bytes sent: %d\n", nsend);
return 0;
}
diff --git a/server.c b/server.c
index 428ddde..18ce0f9 100644
--- a/server.c
+++ b/server.c
@@ -1,30 +1,38 @@
-#include "arena.h"
-#include "common.h"
+#include "chatty.h"
#include <assert.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdarg.h>
+#include <string.h>
#include <sys/socket.h>
+#include <unistd.h>
// timeout on polling
#define TIMEOUT 60 * 1000
// max pending connections
-#define PENDING_MAX 16
-
-// the size of pollfd element in the fdsArena
-// note: clientsArena and pollfd_size must have been initialisezd
-#define FDS_SIZE fdsArena->pos / pollfd_size
+#define MAX_CONNECTIONS 16
+// Get number of connections from arena position
+// NOTE: this is somewhat wrong, because of when disconnections happen
+#define FDS_SIZE (fdsArena->pos / sizeof(*fds))
// enum for indexing the fds array
enum { FDS_STDIN = 0,
FDS_SERVER,
FDS_CLIENTS };
-int main(void)
+// Has information on clients
+// For each pollfd in fds there should be a matching client in clients
+// clients[i - FDS_CLIENTS] <=> fds[i]
+typedef struct {
+ u8 author[AUTHOR_LEN]; // matches author property on other message types
+ Bool initialized; // boolean
+} Client;
+
+int
+main(void)
{
- u32 err, serverfd, clientfd;
- u16 nclient = 0;
+ s32 err, serverfd, clientfd;
u32 on = 1;
// Start listening on the socket
@@ -32,7 +40,7 @@ int main(void)
serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
assert(serverfd > 2);
- err = setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, (u8 *)&on, sizeof(on));
+ err = setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, (u8*)&on, sizeof(on));
assert(err == 0);
const struct sockaddr_in address = {
@@ -41,36 +49,39 @@ int main(void)
{0},
};
- err = bind(serverfd, (const struct sockaddr *)&address, sizeof(address));
+ err = bind(serverfd, (const struct sockaddr*)&address, sizeof(address));
assert(err == 0);
- err = listen(serverfd, PENDING_MAX);
+ err = listen(serverfd, MAX_CONNECTIONS);
assert(err == 0);
}
- Arena *msgTextArena = ArenaAlloc(); // allocating text in messages that have a dynamic sized
- Message mrecv = {0}; // message used for receiving messages from clients
- u32 nrecv = 0; // number of bytes received
- u32 recv_len; // Number of bytes of the message received over stream
- u32 nsend = 0; // number of bytes sent
- Arena *bufArena = ArenaAlloc(); // data in buf
- u8 *buf = ArenaPush(bufArena, STREAM_LIMIT); // temporary buffer for receiving and sending data
- Message *mbuf = (Message *)buf; // pointer for indexing buf as a message
-
- Arena *fdsArena = ArenaAlloc(); // arena for fds to accomodate multiple clients
- struct pollfd *fds = fdsArena->memory; // helper for indexing memory
- struct pollfd c = {0, POLLIN, 0}; // helper client structure fore reusing
- struct pollfd *fdsAddr; // used for copying clients
- const u64 pollfd_size = sizeof(struct pollfd);
+ Arena* msgsArena = ArenaAlloc(Megabytes(128)); // storing received messages
+ // NOTE: sent messages?
+ s32 nrecv = 0; // number of bytes received
+ s32 nsend = 0; // number of bytes sent
+
+ Arena* clientsArena = ArenaAlloc(MAX_CONNECTIONS * sizeof(Client));
+ Arena* fdsArena = ArenaAlloc(MAX_CONNECTIONS * sizeof(struct pollfd));
+ struct pollfd* fds = fdsArena->addr;
+ Client* clients = clientsArena->addr;
+
+ struct pollfd* fdsAddr;
+ struct pollfd newpollfd = {-1, POLLIN, 0};
// initialize fds structure
- // add stdin (c.fd == 0)
- fdsAddr = ArenaPush(fdsArena, pollfd_size);
- memcpy(fdsAddr, &c, pollfd_size);
+ newpollfd.fd = 0;
+ fdsAddr = ArenaPush(fdsArena, sizeof(*fds));
+ memcpy(fdsAddr, &newpollfd, sizeof(*fds));
// add serverfd
- c.fd = serverfd;
- fdsAddr = ArenaPush(fdsArena, pollfd_size);
- memcpy(fdsAddr, &c, pollfd_size);
+ newpollfd.fd = serverfd;
+ fdsAddr = ArenaPush(fdsArena, sizeof(*fds));
+ memcpy(fdsAddr, &newpollfd, sizeof(*fds));
+ newpollfd.fd = -1;
+
+ // Initialize the rest of the fds array
+ for (u32 i = FDS_CLIENTS; i < MAX_CONNECTIONS; i++)
+ fds[i] = newpollfd;
while (1) {
err = poll(fds, FDS_SIZE, TIMEOUT);
@@ -83,81 +94,165 @@ int main(void)
clientfd = accept(serverfd, NULL, NULL);
assert(clientfd != -1);
assert(clientfd > serverfd);
+ fprintf(stdout, "New connection(%d).\n", clientfd);
// fill up a hole
- u8 found = 0;
- for (u32 i = FDS_CLIENTS; i < FDS_SIZE; i++) {
- if (fds[i].fd == -1) {
- fds[i].fd = clientfd;
- // note we do not have to reset .revents because poll will set it to 0 next time
- found = 1;
+ u8 found;
+ for (found = FDS_CLIENTS; found < FDS_SIZE; found++)
+ if (fds[found].fd == -1)
break;
- }
- }
-
- // allocate an extra client because there was no empty spot in the fds array
- if (!found) {
- // add client to arena
- fdsAddr = ArenaPush(fdsArena, pollfd_size);
- c.fd = clientfd;
- memcpy(fdsAddr, &c, pollfd_size);
+ if (found == FDS_SIZE) {
+ // no more space, allocate
+ struct pollfd* pollfd = ArenaPush(fdsArena, sizeof(*pollfd));
+ pollfd->fd = clientfd;
+ pollfd->events = POLLIN;
+ } else if (found == MAX_CONNECTIONS) {
+ // TODO: reject connection
+ close(clientfd);
+ fprintf(stdout, "Max clients reached.");
+ } else {
+ // hole found
+ fds[found].fd = clientfd;
+ fds[found].events = POLLIN;
+ fprintf(stdout, "Added pollfd(%d).\n", clientfd);
}
-
- nclient++;
- fprintf(stdout, "connected(%d).\n", clientfd - serverfd);
}
+ // Check for messages from clients
for (u32 i = FDS_CLIENTS; i < (FDS_SIZE); i++) {
if (!(fds[i].revents & POLLIN))
continue;
- if (fds[i].fd == -1)
+ assert(fds[i].fd != -1);
+
+ fprintf(stdout, "Message(%d).\n", fds[i].fd);
+ // If this is the first message from the client it must be a presence message indicated
+ // it connected.
+ Client* client = clients + i - FDS_CLIENTS;
+ if (!client->initialized) {
+ fprintf(stdout, " Adding to clients(%d).\n", fds[i].fd);
+ // Wait for PresenceMessage from new client to get author information
+ HeaderMessage header;
+ // TODO: handle wrong message, disconnection, etc.
+ nrecv = recv(clientfd, &header, sizeof(header), 0);
+ assert(nrecv != -1);
+ assert(nrecv == sizeof(header));
+ if (header.type != HEADER_TYPE_PRESENCE) {
+ // TODO: reject connection
+ close(clientfd);
+ continue;
+ }
+ fprintf(stdout, " Got header(%d).\n", fds[i].fd);
+
+ PresenceMessage message;
+ // TODO: handle wrong message
+ nrecv = recv(clientfd, &message, sizeof(message), 0);
+ assert(nrecv != -1);
+ assert(nrecv == sizeof(message));
+ fprintf(stdout, " Got presence message(%d).\n", fds[i].fd);
+
+ memcpy(client->author, message.author, AUTHOR_LEN);
+ client->initialized = True;
+
+ fprintf(stdout, " Added to clients(%d): %s\n", fds[i].fd, client->author);
+
+ // Notify other clients from this new one
+ // Reuse header and message
+ for (u32 j = FDS_CLIENTS; j < (FDS_SIZE); j++) {
+ if (fds[j].fd == fds[i].fd)
+ continue;
+ if (fds[j].fd == -1)
+ continue;
+ fprintf(stdout, " Notifying (%d)\n", fds[j].fd);
+
+ nsend = send(fds[j].fd, &header, sizeof(header), 0);
+ assert(nsend != -1);
+ assert(nsend == sizeof(header));
+ nsend = send(fds[j].fd, &message, sizeof(message), 0);
+ assert(nsend != -1);
+ assert(nsend == sizeof(message));
+ }
continue;
+ }
- nrecv = recv(fds[i].fd, buf, bufArena->pos, 0);
+ // We received a message, try to parse the header
+ HeaderMessage header;
+ nrecv = recv(fds[i].fd, &header, sizeof(header), 0);
assert(nrecv != -1);
if (nrecv == 0) {
- fprintf(stdout, "disconnected(%d). \n", fds[i].fd - serverfd);
+ fprintf(stdout, "Disconnected(%d). \n", fds[i].fd);
shutdown(fds[i].fd, SHUT_RDWR);
- close(fds[i].fd); // send close to client
- fds[i].fd = -1; // ignore in the future
+ close(fds[i].fd); // send close to client
+ fds[i].fd = -1; // ignore in the future
+ clients[i - FDS_CLIENTS].initialized = False; // deinitialize client
+ //
+ // Send disconnection to other connected clients
+ HeaderMessage header = HEADER_PRESENCEMESSAGE;
+ PresenceMessage message = {
+ .type = PRESENCE_TYPE_DISCONNECTED
+ };
+ memcpy(message.author, clients[i - FDS_CLIENTS].author, AUTHOR_LEN);
+ for (u32 j = FDS_CLIENTS; j < FDS_SIZE; j++) {
+ if (fds[j].fd == fds[i].fd)
+ continue;
+ if (fds[j].fd == -1)
+ continue;
+ nsend = send(fds[j].fd, &header, sizeof(header), 0);
+ assert(nsend != -1);
+ assert(nsend == sizeof(header));
+ nsend = send(fds[j].fd, &message, sizeof(message), 0);
+ assert(nsend != -1);
+ assert(nsend == sizeof(message));
+ }
+
continue;
}
- recv_len = sizeof(*mbuf) - sizeof(mbuf->text) + mbuf->text_len * sizeof(*mbuf->text);
- if (recv_len > nrecv) {
- // allocate needed space for buf
- if (recv_len > bufArena->pos)
- ArenaPush(bufArena, recv_len - bufArena->pos);
-
- // receive remaining bytes
- u32 nr = recv(fds[i].fd, buf + nrecv, recv_len - nrecv, 0);
- assert(nr != -1);
- nrecv += nr;
- assert(nrecv == recv_len);
- }
+ assert(nrecv == sizeof(header));
+ fprintf(stderr, " Received(%d): %d bytes -> " PH_FMT "\n", fds[i].fd, nrecv, PH_ARG(header));
- // TODO: Do not print the message in the logs
- fprintf(stdout, "message(%d): %d bytes.\n", fds[i].fd - serverfd, nrecv);
+ switch (header.type) {
+ case HEADER_TYPE_TEXT:;
+ TextMessage* message;
+ nrecv = recvTextMessage(msgsArena, fds[i].fd, &message);
+ fprintf(stderr, " Received(%d): %d bytes -> ", fds[i].fd, nrecv);
+ printTextMessage(message, 0);
- for (u32 j = FDS_CLIENTS; j < (FDS_SIZE); j++) {
- if (j == i)
- continue;
- if (fds[j].fd == -1)
- continue;
+ HeaderMessage header = HEADER_TEXTMESSAGE;
+ // Send message to all other clients
+ for (u32 j = FDS_CLIENTS; j < FDS_SIZE; j++) {
+ if (fds[j].fd == fds[i].fd) continue;
+ if (fds[j].fd == -1) continue;
- nsend = send(fds[j].fd, buf, nrecv, 0);
- assert(nsend != 1);
- assert(nsend == nrecv);
- fprintf(stdout, "retransmitted(%d->%d).\n", fds[i].fd - serverfd, fds[j].fd - serverfd);
- }
+ // NOTE: I wonder if this is more expensive than constructing a buffer and sending
+ // that
+ u32 nsend_total = 0;
+ nsend = send(fds[j].fd, &header, sizeof(header), 0);
+ assert(nsend != 1);
+ assert(nsend == sizeof(header));
+ nsend_total += nsend;
+ nsend = send(fds[j].fd, message, TEXTMESSAGE_SIZE, 0);
+ assert(nsend != -1);
+ assert(nsend == TEXTMESSAGE_SIZE);
+ nsend_total += nsend;
+ nsend = send(fds[j].fd, &message->text, message->len * sizeof(*message->text), 0);
+ assert(nsend != -1);
+ assert(nsend == (message->len * sizeof(*message->text)));
+ nsend_total += nsend;
- ArenaPop(msgTextArena, mrecv.text_len);
+ fprintf(stdout, " Retransmitted(%d->%d) %d bytes.\n", fds[i].fd, fds[j].fd, nsend_total);
+ }
+ break;
+ default:
+ fprintf(stdout, " Got unhandled message type '%s' from client %d", headerTypeString(header.type), fds[i].fd);
+ continue;
+ }
}
}
+ ArenaRelease(clientsArena);
ArenaRelease(fdsArena);
- ArenaRelease(msgTextArena);
+ ArenaRelease(msgsArena);
return 0;
}