aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaymaekers Luca <raymaekers.luca@gmail.com>2024-10-31 00:32:07 +0100
committerRaymaekers Luca <raymaekers.luca@gmail.com>2024-11-03 00:58:07 +0100
commitb9aeccef208d6d5b7d40b71886981723f1e14b95 (patch)
tree0312eeeb23f17bd6ba7861c112a382a10207eda9
parent48733b6acfa27af8e030d9b7abfb9109b1ce89e0 (diff)
Added ID system with 1 and 2-way communication
Each client now has an ID that is permanently stored to ID_FILE location. To implement this each client now uses two connections to the server, one for bidirectional communication and one for unidirectional communication. This makes it easier to not receive unexpected message. Also each client and server now has a Client struct that represents a client and a clientsArena associated with it. Minor changes: - Added logging to LOGFILE, that can be turned with LOGGING macro. - Added more error types - Added error handling on server - Added error messages - Added convenience functions - Added disconnectAndNotify() function for convenience - Use recvTextMessageResult as multiple-value-return-type instead of ** - Separated protocol stuff into protocol.h - Added Result types when wanting to return multiple values - Do not allocate arena's with malloc - Added recvAnyMessageType for receiving messages that do not need to be stored - Add UNIFD and BIFD file descriptors for separating requests chatty.c: - Convert ID to string in screen_home() - Removed the fds global variable - Pass fds to threadReconnect - Implement faster sleep with nanosleep(2) - Close file descriptors when failed so we do not have too many file descriptors open server.c: - Send presence messages on disconnect & connect - renamed i to conn
-rw-r--r--.gitignore3
-rw-r--r--README.md13
-rw-r--r--chatty.c393
-rw-r--r--chatty.h220
-rw-r--r--protocol.h360
-rw-r--r--send.c78
-rw-r--r--server.c567
7 files changed, 1134 insertions, 500 deletions
diff --git a/.gitignore b/.gitignore
index 7b92c6b..91c2cfe 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
chatty
send
server
+_id
+_clients
+*.log
diff --git a/README.md b/README.md
index c6f0e0a..c3a7d19 100644
--- a/README.md
+++ b/README.md
@@ -15,19 +15,28 @@ The idea is the following:
- [x] wrapping messages
- [x] bug: when sending message after diconnect (serverfd?)
- [x] Handle disconnection thiin a thread, the best way would be
-- [ ] ctrl+z to suspend
+- [x] Add limit_y to printf_wrap
+- [x] id2string on clients
+- [x] ctrl+z to suspend
+- [ ] bug(tb_printf_wrap): text after pfx is wrapped one too soon
## server
+- [x] import clients
- [ ] check if when sending and the client is offline (due to connection loss) what happens
- [ ] timeout on recv?
- [ ] use threads to handle clients/ timeout when receiving because a client could theoretically
stall the entire server.
- [ ] do not crash on errors from clients
- implement error message?
+ - timeout on recv with setsockopt
## common
- [x] handle messages that are too large
-- [ ] log messages to file (save history)
+- [x] refactor i&self into conn
+- [x] logging
+- [x] Req|Inf connection per client
+- [ ] bug: blocking after `Added pollfd`, after importing a client and then connecting with the
+ id/or without? After reconnection fails chatty blocks (remove sleep)
- [ ] connect/disconnections messages
- [ ] use IP address / domain
- [ ] chat history
diff --git a/chatty.c b/chatty.c
index 3b59001..4746aa7 100644
--- a/chatty.c
+++ b/chatty.c
@@ -2,6 +2,7 @@
#include "termbox2.h"
#include "chatty.h"
+#include "protocol.h"
#include <arpa/inet.h>
#include <assert.h>
@@ -14,19 +15,39 @@
// time to reconnect in seconds
#define TIMEOUT_RECONNECT 1
#define INPUT_LIMIT 512
-
-// must be of AUTHOR_LEN -1
-static u8 username[AUTHOR_LEN] = "(null)";
-// file descriptros for polling
-static struct pollfd* fds = NULL;
-// mutex for locking fds when in thread_reconnect()
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-enum { FDS_SERVER = 0,
+// Filepath where user ID is stored
+#define ID_FILE "_id"
+// Import id from ID_FILE
+#define IMPORT_ID
+// Filepath where logged
+#define LOGFILE "chatty.log"
+// enable logging
+#define LOGGING
+
+enum { FDS_UNI = 0, // for one-way communication with the server (eg. TextMessage)
+ FDS_BI, // For two-way communication with the server (eg. IDMessage)
FDS_TTY,
FDS_RESIZE,
FDS_MAX };
+typedef struct {
+ u8 author[AUTHOR_LEN];
+ ID id;
+} Client;
+#define CLIENT_FMT "[%s](%lu)"
+#define CLIENT_ARG(client) client.author, client.id
+
+typedef struct {
+ s32 err; // Error while connecting
+ s32 unifd;
+ s32 bifd;
+} ConnectionResult;
+
+// Client used by chatty
+global_variable Client user = {0};
+// Address of chatty server
+global_variable struct sockaddr_in address;
+
// fill str array with char
void
fillstr(u32* str, u32 ch, u32 len)
@@ -44,39 +65,118 @@ popup(u32 fg, u32 bg, char* text)
tb_print(global.width / 2 - len / 2, global.height / 2, fg, bg, text);
}
+// Returns client in clientsArena matching id
+// Returns user if the id was the user's ID
+// Returns 0 if nothing was found
+Client*
+getClientById(Arena* clientsArena, ID id)
+{
+ if (id == user.id) return &user;
+
+ Client* clients = clientsArena->addr;
+ for (u64 i = 0; i < (clientsArena->pos / sizeof(*clients)); i++) {
+ if (clients[i].id == id)
+ return clients + i;
+ }
+ return 0;
+}
+
+// Request information of client from fd byd id and add it to clientsArena
+// Returns pointer to added client
+Client*
+addClientInfo(Arena* clientsArena, s32 fd, u64 id)
+{
+ // Request information about ID
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_ID);
+ IDMessage id_message = {.id = id};
+ sendAnyMessage(fd, &header, &id_message);
+
+ Client* client = ArenaPush(clientsArena, sizeof(*client));
+
+ // Wait for response
+ IntroductionMessage introduction_message;
+ recvAnyMessageType(fd, &header, &introduction_message, HEADER_TYPE_INTRODUCTION);
+
+ // Add the information
+ memcpy(client->author, introduction_message.author, AUTHOR_LEN);
+ client->id = id;
+
+ loggingf("Got " CLIENT_FMT "\n", CLIENT_ARG((*client)));
+ return client;
+}
+
+// Tries to connect to address and populates resulting file descriptors in ConnectionResult.
+ConnectionResult
+getConnection(struct sockaddr_in* address)
+{
+ ConnectionResult result;
+ result.unifd = socket(AF_INET, SOCK_STREAM, 0);
+ result.bifd = socket(AF_INET, SOCK_STREAM, 0);
+ result.err = connect(result.unifd, (struct sockaddr*)address, sizeof(*address));
+ if (result.err) return result; // We do not overwrite the error and return early so we can be
+ // certain of what error errno belongs to.
+ result.err = connect(result.bifd, (struct sockaddr*)address, sizeof(*address));
+ return result;
+}
+
// Connect to *address_ptr of type `struct sockaddr_in*`. If it failed wait for TIMEOUT_RECONNECT
// seconds.
// This function is meant to be run by a thread.
// An offline server means fds[FDS_SERVER] is set to -1. When online
// it is set to with the appropriate file descriptor.
-// Returns NULL.
+// Returns 0.
+#define Miliseconds(s) (s*1000*1000)
void*
-thread_reconnect(void* address_ptr)
+threadReconnect(void* fds_ptr)
{
- u32 serverfd, err;
- struct sockaddr_in* address = address_ptr;
-
+ struct pollfd* fds = fds_ptr;
+ ConnectionResult result;
+ struct timespec t = { 0, Miliseconds(300) }; // 300 miliseconds
+ loggingf("Trying to reconnect\n");
while (1) {
- serverfd = socket(AF_INET, SOCK_STREAM, 0);
- assert(serverfd > 2); // greater than STDERR
- err = connect(serverfd, (struct sockaddr*)address, sizeof(*address));
- if (err == 0)
- break;
- assert(errno == ECONNREFUSED);
- // TODO: faster reconnection? (too many open files)
- sleep(TIMEOUT_RECONNECT);
+ nanosleep(&t, &t);
+ result = getConnection(&address);
+ if (result.err) {
+ // loggingf("err: %d\n", result.err);
+ loggingf("errno: %d\n", errno);
+ } else if (result.unifd != -1 && result.bifd != -1) {
+ loggingf("Reconnect succeeded (%d, %d), authenticating\n", result.unifd, result.bifd);
+ // We assume that we already have an ID
+ // TODO: could there be a problem if a message is received at the same time?
+ // - not on server restart, but what if we lost connection?
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_ID);
+ IDMessage id_message = {.id = user.id};
+ sendAnyMessage(result.bifd, &header, &id_message);
+
+ ErrorMessage error_message;
+ s32 nrecv = recvAnyMessageType(result.bifd, &header, &error_message, HEADER_TYPE_ERROR);
+ if (nrecv == -1 || nrecv == 0) {
+ loggingf("Error on receive, retrying...\n");
+ continue;
+ }
+
+ assert(header.type == HEADER_TYPE_ERROR);
+ if (error_message.type == ERROR_TYPE_SUCCESS) {
+ loggingf("Reconnected\n");
+ break;
+ } else {
+ loggingf("err: %s\n", errorTypeString(error_message.type));
+ }
+ }
+ if (result.unifd != -1)
+ close(result.unifd);
+ if (result.bifd != -1)
+ close(result.bifd);
+ loggingf("Failed, retrying..\n");
}
- // if the server would send a disconnect again and the polling catches up there could be two
- // threads accessing fds.
- pthread_mutex_lock(&mutex);
- fds[FDS_SERVER].fd = serverfd;
- pthread_mutex_unlock(&mutex);
+ fds[FDS_BI].fd = result.bifd;
+ fds[FDS_UNI].fd = result.unifd;
- // ask to redraw screen
+ // Redraw screen
raise(SIGWINCH);
- return NULL;
+ return 0;
}
// Print `text` wrapped to limit_x. It will print no more than limit_y lines. x, y, fg and
@@ -85,9 +185,8 @@ thread_reconnect(void* address_ptr)
// this is useful when for example: printing messages and wanting to have consistent
// timestamp+author name.
// Returns the number of lines printed.
-// TODO: add y limit
-// TODO:(bug) text after pfx is wrapped one too soon
-// TODO: text == NULL to know how many lines *would* be printed
+// TODO: (bug) text after pfx is wrapped one too soon
+// TODO: text == 0 to know how many lines *would* be printed
// - no this should be a separate function
// TODO: check if text[i] goes out of bounds
u32
@@ -107,7 +206,7 @@ tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, u32* text, s32 text_len, u32 fg_pfx
u32 failed = 0;
// NOTE: We can assume that we need to wrap, therefore print a newline after the prefix string
- if (pfx != NULL) {
+ if (pfx != 0) {
tb_printf(x, ly, fg_pfx, bg_pfx, "%s", pfx);
// If the text fits on one line print the text and return
@@ -174,7 +273,7 @@ tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, u32* text, s32 text_len, u32 fg_pfx
// it displays a prompt with the user input of input_len wide characters
// and the received messages from msgsArena
void
-screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len)
+screen_home(Arena* msgsArena, u32 nmessages, Arena* clientsArena, struct pollfd* fds, u32 input[], u32 input_len)
{
// config options
const s32 box_max_len = 80;
@@ -205,7 +304,7 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len)
goto draw_prompt;
u8* addr = msgsArena->addr;
- assert(addr != NULL);
+ assert(addr != 0);
// on what line to print the current message, used for scrolling
u32 msg_y = 0;
@@ -238,46 +337,62 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len)
HeaderMessage* header = (HeaderMessage*)addr;
addr += sizeof(*header);
+ // Get Client for message
+ ID* id;
+ Client* client;
+ switch (header->type) {
+ case HEADER_TYPE_TEXT:
+ id = &((TextMessage*)addr)->id;
+ case HEADER_TYPE_PRESENCE:
+ id = &((PresenceMessage*)addr)->id;
+ client = getClientById(clientsArena, *id);
+ if (!client) {
+ loggingf("Client not known, requesting from server\n");
+ client = addClientInfo(clientsArena, fds[FDS_BI].fd, *id);
+ }
+ assert(client);
+ break;
+ }
+
switch (header->type) {
case HEADER_TYPE_TEXT: {
TextMessage* message = (TextMessage*)addr;
+ // Color own messages
u32 fg = 0;
- if (strncmp((char*)username, (char*)message->author, AUTHOR_LEN) == 0) {
+ if (user.id == message->id) {
fg = TB_CYAN;
} else {
fg = TB_MAGENTA;
}
- // prefix is of format "HH:MM:SS [<author>] ", so
+
+ // prefix is of format "HH:MM:SS [<author>] ", create it
u8 pfx[AUTHOR_LEN - 1 + TIMESTAMP_LEN - 1 + 4 + 1] = {0};
u8 timestamp[TIMESTAMP_LEN];
formatTimestamp(timestamp, message->timestamp);
- sprintf((char*)pfx, "%s [%s] ", timestamp, message->author);
- // TODO: y_limit
+ sprintf((char*)pfx, "%s [%s] ", timestamp, client->author);
+
msg_y += tb_printf_wrap(0, msg_y, TB_WHITE, 0, (u32*)&message->text, message->len, fg, 0, pfx, global.width, free_y - msg_y);
u32 message_size = TEXTMESSAGE_SIZE + message->len * sizeof(*message->text);
addr += message_size;
- break;
- }
+ } break;
case HEADER_TYPE_PRESENCE: {
PresenceMessage* message = (PresenceMessage*)addr;
- tb_printf(0, msg_y, 0, 0, " [%s] *%s*", message->author, presenceTypeString(message->type));
+ tb_printf(0, msg_y, 0, 0, " [%s] *%s*", client->author, presenceTypeString(message->type));
msg_y++;
addr += sizeof(*message);
- break;
- }
+ } break;
case HEADER_TYPE_HISTORY: {
HistoryMessage* message = (HistoryMessage*)addr;
addr += sizeof(*message);
// TODO: implement
- }
- default: {
+ } break;
+ default:
tb_printf(0, msg_y, 0, 0, "%s", headerTypeString(header->type));
msg_y++;
break;
}
- }
}
draw_prompt:
@@ -340,7 +455,7 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len)
}
}
- if (fds[FDS_SERVER].fd == -1) {
+ if (fds[FDS_UNI].fd == -1 || fds[FDS_BI].fd == -1) {
// show error popup
popup(TB_RED, TB_BLACK, "Server disconnected.");
}
@@ -350,77 +465,128 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len)
int
main(int argc, char** argv)
{
- // Use first argument as username
- if (argc > 1) {
- u32 arg_len = strlen(argv[1]);
- assert(arg_len <= AUTHOR_LEN - 1);
- memcpy(username, argv[1], arg_len);
- username[arg_len] = '\0';
+ if (argc < 2) {
+ fprintf(stderr, "usage: chatty <username>\n");
+ return 1;
}
+ u32 arg_len = strlen(argv[1]);
+ assert(arg_len <= AUTHOR_LEN - 1);
+ memcpy(user.author, argv[1], arg_len);
+ user.author[arg_len] = '\0';
+
s32 err = 0; // error code for functions
- Arena* msgsArena = ArenaAlloc(Megabytes(64)); // Messages received & sent
- u32 nmessages = 0; // Number of messages in msgsArena
- s32 nrecv = 0; // number of bytes received
- s32 nsend = 0; // number of bytes sent
+ u32 nmessages = 0; // Number of messages in msgsArena
+ s32 nrecv = 0; // number of bytes received
u32 input[INPUT_LIMIT] = {0}; // input buffer
u32 ninput = 0; // number of characters in input
+ Arena msgsArena;
+ Arena clientsArena;
+ ArenaAlloc(&msgsArena, Megabytes(64)); // Messages received & sent
+ ArenaAlloc(&clientsArena, Megabytes(1)); // Arena for storing clients
+
struct tb_event ev; // event fork keypress & resize
u8 quit = 0; // boolean to indicate if we want to quit the main loop
- u8* quitmsg = NULL; // this string will be printed before returning from main
+ u8* quitmsg = 0; // this string will be printed before returning from main
pthread_t thr_rec; // thread for reconnecting to server when disconnected
+#ifdef LOGGING
+ logfd = open(LOGFILE, O_RDWR | O_CREAT | O_TRUNC, 0600);
+ assert(logfd != -1);
+#else
+ logfd = 2; // stderr
+#endif
+
// poopoo C cannot infer type
- fds = (struct pollfd[FDS_MAX]){
- {-1, POLLIN, 0}, // FDS_SERVER
+ struct pollfd fds[FDS_MAX] = {
+ {-1, POLLIN, 0}, // FDS_UNI
+ {-1, POLLIN, 0}, // FDS_BI
{-1, POLLIN, 0}, // FDS_TTY
{-1, POLLIN, 0}, // FDS_RESIZE
};
- const struct sockaddr_in address = {
+ address = (struct sockaddr_in){
AF_INET,
htons(PORT),
{0},
{0},
};
- // Connecting to server
- {
- s32 serverfd;
- serverfd = socket(AF_INET, SOCK_STREAM, 0);
- assert(serverfd > 2); // greater than STDERR
-
- err = connect(serverfd, (struct sockaddr*)&address, sizeof(address));
- if (err != 0) {
- perror("Server");
+ ConnectionResult result = getConnection(&address);
+ if (result.err) {
+ perror("Server");
+ return 1;
+ }
+ assert(result.unifd != -1);
+ assert(result.bifd != -1);
+ assert(!result.err);
+ fds[FDS_BI].fd = result.bifd;
+ fds[FDS_UNI].fd = result.unifd;
+
+#ifdef IMPORT_ID
+ // File for storing the user's ID.
+ u32 idfile = open(ID_FILE, O_RDWR | O_CREAT, 0600);
+ s32 nread = read(idfile, &user.id, sizeof(user.id));
+ assert(nread != -1);
+ // see "Authentication" in chatty.h
+ if (nread == sizeof(user.id)) {
+ // Scenario 1: We know our id
+
+ // Send IDMessage and check if it is correct
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_ID);
+ IDMessage message = {.id = user.id};
+ sendAnyMessage(fds[FDS_BI].fd, &header, &message);
+
+ ErrorMessage error_message = {0};
+ recvAnyMessageType(fds[FDS_BI].fd, &header, &error_message, HEADER_TYPE_ERROR);
+
+ switch (error_message.type) {
+ case ERROR_TYPE_SUCCESS: break;
+ case ERROR_TYPE_NOTFOUND:
+ printf("Server does not know our ID. Consider removing '" ID_FILE "'\n");
+ return 1;
+ default:
+ printf("Server: %s\n", errorTypeString(error_message.type));
return 1;
}
- fds[FDS_SERVER].fd = serverfd;
-
- // Introduce ourselves
- HeaderMessage header = HEADER_PRESENCEMESSAGE;
- PresenceMessage message = {.type = PRESENCE_TYPE_CONNECTED};
- memcpy(message.author, username, AUTHOR_LEN);
- nsend = send(serverfd, &header, sizeof(header), 0);
- assert(nsend != -1);
- assert(nsend == sizeof(header));
- nsend = send(serverfd, &message, sizeof(message), 0);
- assert(nsend != -1);
- assert(nsend == sizeof(message));
+ } else {
+#else
+ if (1) {
+#endif
+ // Scenario 2: We do not have an ID
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_INTRODUCTION);
+ IntroductionMessage message = {0};
+ // copy user data into message
+ memcpy(message.author, user.author, AUTHOR_LEN);
+
+ // Send the introduction message
+ sendAnyMessage(fds[FDS_BI].fd, &header, &message);
+
+ IDMessage id_message = {0};
+ // Receive the response IDMessage
+ recvAnyMessageType(fds[FDS_BI].fd, &header, &id_message, HEADER_TYPE_ID);
+ assert(header.type == HEADER_TYPE_ID);
+ user.id = id_message.id;
+#ifdef IMPORT_ID
+ // Save permanently
+ write(idfile, &user.id, sizeof(user.id));
+ close(idfile);
+#endif
}
+ loggingf("Got ID: %lu\n", user.id);
// for wide character printing
- assert(setlocale(LC_ALL, "") != NULL);
+ assert(setlocale(LC_ALL, "") != 0);
// init
tb_init();
tb_get_fds(&fds[FDS_TTY].fd, &fds[FDS_RESIZE].fd);
- screen_home(msgsArena, nmessages, input, ninput);
+ screen_home(&msgsArena, nmessages, &clientsArena, fds, input, ninput);
tb_present();
// main loop
@@ -431,43 +597,45 @@ main(int argc, char** argv)
tb_clear();
- if (fds[FDS_SERVER].revents & POLLIN) {
+ if (fds[FDS_UNI].revents & POLLIN) {
// got data from server
HeaderMessage header;
- nrecv = recv(fds[FDS_SERVER].fd, &header, sizeof(header), 0);
+ nrecv = recv(fds[FDS_UNI].fd, &header, sizeof(header), 0);
assert(nrecv != -1);
// Server disconnects
if (nrecv == 0) {
// close diconnected server's socket
- err = close(fds[FDS_SERVER].fd);
+ err = close(fds[FDS_UNI].fd);
assert(err == 0);
- fds[FDS_SERVER].fd = -1; // ignore
+ fds[FDS_UNI].fd = -1; // ignore
// start trying to reconnect in a thread
- err = pthread_create(&thr_rec, NULL, &thread_reconnect, (void*)&address);
+ err = pthread_create(&thr_rec, 0, &threadReconnect, (void*)fds);
assert(err == 0);
} else {
- // TODO: validate version
- // if (header.version == PROTOCOL_VERSION)
- // continue;
+ if (header.version != PROTOCOL_VERSION) {
+ loggingf("Header received does not match version\n");
+ continue;
+ }
- void* addr = ArenaPush(msgsArena, sizeof(header));
+ void* addr = ArenaPush(&msgsArena, sizeof(header));
memcpy(addr, &header, sizeof(header));
+ // Messages handled from server
switch (header.type) {
case HEADER_TYPE_TEXT:
- recvTextMessage(msgsArena, fds[FDS_SERVER].fd, NULL);
+ recvTextMessage(&msgsArena, fds[FDS_UNI].fd);
nmessages++;
break;
case HEADER_TYPE_PRESENCE:;
- PresenceMessage* message = ArenaPush(msgsArena, sizeof(*message));
- nrecv = recv(fds[FDS_SERVER].fd, message, sizeof(*message), 0);
+ PresenceMessage* message = ArenaPush(&msgsArena, sizeof(*message));
+ nrecv = recv(fds[FDS_UNI].fd, message, sizeof(*message), 0);
assert(nrecv != -1);
assert(nrecv == sizeof(*message));
nmessages++;
break;
default:
- // TODO: log
+ loggingf("Got unhandled message: %s\n", headerTypeString(header.type));
break;
}
}
@@ -512,7 +680,7 @@ main(int argc, char** argv)
if (ninput == 0)
// do not send empty message
break;
- if (fds[FDS_SERVER].fd == -1)
+ if (fds[FDS_UNI].fd == -1)
// do not send message to disconnected server
break;
@@ -521,25 +689,21 @@ main(int argc, char** argv)
ninput++;
// Save header
- HeaderMessage header = HEADER_TEXTMESSAGE;
- void* addr = ArenaPush(msgsArena, sizeof(header));
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_TEXT);
+ void* addr = ArenaPush(&msgsArena, sizeof(header));
memcpy(addr, &header, sizeof(header));
// Save message
- TextMessage* sendmsg = ArenaPush(msgsArena, TEXTMESSAGE_SIZE);
- memcpy(sendmsg->author, username, AUTHOR_LEN);
- sendmsg->timestamp = time(NULL);
+ TextMessage* sendmsg = ArenaPush(&msgsArena, TEXTMESSAGE_SIZE);
+ sendmsg->id = user.id;
+ sendmsg->timestamp = time(0);
sendmsg->len = ninput;
u32 text_size = ninput * sizeof(*input);
- ArenaPush(msgsArena, text_size);
+ ArenaPush(&msgsArena, text_size);
memcpy(&sendmsg->text, input, text_size);
- // Send message
- nsend = send(fds[FDS_SERVER].fd, &header, sizeof(header), 0);
- assert(nsend != -1);
- nsend = send(fds[FDS_SERVER].fd, sendmsg, TEXTMESSAGE_SIZE + TEXTMESSAGE_TEXT_SIZE((*sendmsg)), 0);
- assert(nsend != -1);
+ sendAnyMessage(fds[FDS_UNI].fd, &header, sendmsg);
nmessages++;
// also clear input
@@ -550,7 +714,8 @@ main(int argc, char** argv)
default:
if (ev.ch == 0)
break;
- // TODO: logging
+
+ // TODO: show error
if (ninput == INPUT_LIMIT - 1) // last byte reserved for \0
break;
@@ -568,17 +733,15 @@ main(int argc, char** argv)
tb_poll_event(&ev);
}
- screen_home(msgsArena, nmessages, input, ninput);
+ screen_home(&msgsArena, nmessages, &clientsArena, fds, input, ninput);
tb_present();
}
tb_shutdown();
- if (quitmsg != NULL)
+ if (quitmsg != 0)
printf("%s\n", quitmsg);
- ArenaRelease(msgsArena);
-
return 0;
}
diff --git a/chatty.h b/chatty.h
index 916ae4a..5337656 100644
--- a/chatty.h
+++ b/chatty.h
@@ -1,7 +1,8 @@
-#ifndef CHATTY_IMPL
+#ifndef CHATTY_H
#include <assert.h>
#include <locale.h>
+#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
@@ -11,6 +12,7 @@
#include <sys/mman.h>
#include <sys/socket.h>
#include <time.h>
+#include <unistd.h>
#include <wchar.h>
typedef uint8_t u8;
@@ -28,13 +30,54 @@ typedef enum {
// port for chatty
#define PORT 9983
+// max number of bytes that can be logged at once
+#define LOGMESSAGE_MAX 2048
+#define LOG_FMT "%H:%M:%S "
+#define LOG_LEN 10
#define Kilobytes(Value) ((Value) * 1024)
#define Megabytes(Value) (Kilobytes(Value) * 1024)
#define Gigabytes(Value) (Megabytes((u64)Value) * 1024)
#define Terabytes(Value) (Gigabytes((u64)Value) * 1024)
#define PAGESIZE 4096
+#define local_persist static
+#define global_variable
+#define internal static
+global_variable s32 logfd;
+
+u32
+wstrlen(u32* str)
+{
+ u32 i = 0;
+ while (str[i] != 0)
+ i++;
+ return i;
+}
+
+void
+loggingf(char* format, ...)
+{
+ char buf[LOGMESSAGE_MAX];
+ va_list args;
+ va_start(args, format);
+
+ vsnprintf(buf, sizeof(buf), format, args);
+ va_end(args);
+
+ int n = 0;
+ while (*(buf + n) != 0) n++;
+
+ u64 t = time(0);
+ u8 timestamp[LOG_LEN];
+ struct tm* ltime = localtime((time_t*)&t);
+ strftime((char*)timestamp, LOG_LEN, LOG_FMT, ltime);
+ write(logfd, timestamp, LOG_LEN - 1);
+
+ write(logfd, buf, n);
+}
+
+// Arena Allocator
struct Arena {
void* addr;
u64 size;
@@ -46,25 +89,20 @@ struct Arena {
#define PushStruct(arena, type) PushArray((arena), (type), 1)
#define PushStructZero(arena, type) PushArrayZero((arena), (type), 1)
-Arena*
-ArenaAlloc(u64 size)
+// Returns arena in case of success, or 0 if it failed to alllocate the memory
+void
+ArenaAlloc(Arena* arena, u64 size)
{
- Arena* arena = (Arena*)malloc(sizeof(Arena));
-
- arena->addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
- if (arena->addr == MAP_FAILED)
- return NULL;
+ arena->addr = mmap(0, size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ assert(arena->addr != MAP_FAILED);
arena->pos = 0;
arena->size = size;
-
- return arena;
}
void
ArenaRelease(Arena* arena)
{
munmap(arena->addr, arena->size);
- free(arena);
}
void*
@@ -73,167 +111,9 @@ ArenaPush(Arena* arena, u64 size)
u8* mem;
mem = (u8*)arena->addr + arena->pos;
arena->pos += size;
+ assert(arena->pos <= arena->size);
return mem;
}
-/// Protocol
-// - every message has format Header + Message
-// TODO: authentication
-// TODO: encryption
-
-/// Protocol Header
-// - 2 bytes for version
-// - 1 byte for message type
-// - 16 bytes for checksum
-//
-// Text Message
-// - 12 bytes for the author
-// - 8 bytes for the timestamp
-// - 2 bytes for the text length
-// - x*4 bytes for the text
-//
-// History Message
-// This message is for requesting messages sent after a timestamp.
-// - 8 bytes for the timestamp
-
-/// Naming convention
-// Messages end with the Message suffix (eg. TextMessag, HistoryMessage)
-// A function that is coupled to a type works like
-// <noun><type> eg. (printTextMessage, formatTimestamp)
-
-#define PROTOCOL_VERSION 0
-
-typedef struct {
- u16 version;
- u8 type;
-} HeaderMessage;
-
-enum { HEADER_TYPE_TEXT = 0,
- HEADER_TYPE_HISTORY,
- HEADER_TYPE_PRESENCE };
-#define HEADER_TEXTMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_TEXT};
-#define HEADER_HISTORYMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_HISTORY};
-#define HEADER_PRESENCEMESSAGE {.version = PROTOCOL_VERSION, .type = HEADER_TYPE_PRESENCE};
-
-// Size of author string including null terminator
-#define AUTHOR_LEN 13
-// Size of formatted timestamp string including null terminator
-#define TIMESTAMP_LEN 9
-
-typedef struct {
- u8 checksum[16];
- u8 author[AUTHOR_LEN];
- u64 timestamp;
- u16 len; // including null terminator
- u32* text; // placeholder for indexing
- // TODO: 0-length field?
-} TextMessage;
-
-// Size of TextMessage without text pointer, used when receiving the message over a stream
-#define TEXTMESSAGE_TEXT_SIZE(m) (m.len * sizeof(*m.text))
-#define TEXTMESSAGE_SIZE (sizeof(TextMessage) - sizeof(u32*))
-
-typedef struct {
- u64 timestamp;
-} HistoryMessage;
-
-typedef struct {
- u8 author[AUTHOR_LEN];
- u8 type;
-} PresenceMessage;
-enum { PRESENCE_TYPE_CONNECTED = 0,
- PRESENCE_TYPE_DISCONNECTED };
-
-// Returns string for type byte in HeaderMessage
-u8*
-headerTypeString(u8 type)
-{
- switch (type) {
- case HEADER_TYPE_TEXT: return (u8*)"TextMessage";
- case HEADER_TYPE_HISTORY: return (u8*)"HistoryMessage";
- case HEADER_TYPE_PRESENCE: return (u8*)"PresenceMessage";
- default: return (u8*)"Unknown";
- }
-}
-
-u8*
-presenceTypeString(u8 type)
-{
- switch (type) {
- case PRESENCE_TYPE_CONNECTED: return (u8*)"connected";
- case PRESENCE_TYPE_DISCONNECTED: return (u8*)"disconnected";
- default: return (u8*)"Unknown";
- }
-}
-
-// from Tsoding video on minicel (https://youtu.be/HCAgvKQDJng?t=4546)
-// sv(https://github.com/tsoding/sv)
-#define PH_FMT "header: v%d %s(%d)"
-#define PH_ARG(header) header.version, headerTypeString(header.type), header.type
-
-void
-formatTimestamp(u8 tmsp[TIMESTAMP_LEN], u64 t)
-{
- struct tm* ltime;
- ltime = localtime((time_t*)&t);
- strftime((char*)tmsp, TIMESTAMP_LEN, "%H:%M:%S", ltime);
-}
-
-void
-printTextMessage(TextMessage* message, u8 wide)
-{
- u8 timestamp[TIMESTAMP_LEN] = {0};
- formatTimestamp(timestamp, message->timestamp);
-
- assert(setlocale(LC_ALL, "") != NULL);
-
- if (wide)
- wprintf(L"TextMessage: %s [%s] %ls\n", timestamp, message->author, (wchar_t*)&message->text);
- else {
- u8 str[message->len];
- wcstombs((char*)str, (wchar_t*)&message->text, message->len * sizeof(*message->text));
- printf("TextMessage: %s [%s] (%d)%s\n", timestamp, message->author, message->len, str);
- }
-}
-
-// Receive a message from fd and store it to the msgsArena,
-// if dest is not NULL point it to the new message created on msgsArena
-// Returns the number of bytes received
-u32
-recvTextMessage(Arena* msgsArena, u32 fd, TextMessage** dest)
-{
- s32 nrecv = 0;
-
- TextMessage* message = ArenaPush(msgsArena, TEXTMESSAGE_SIZE);
- if (dest != NULL)
- *dest = message;
-
- // Receive everything but the text so we can know the text's size and act accordingly
- nrecv = recv(fd, message, TEXTMESSAGE_SIZE, 0);
- assert(nrecv != -1);
- assert(nrecv == TEXTMESSAGE_SIZE);
-
- nrecv = 0;
-
- // Allocate memory for text and receive in that memory
- u32 text_size = message->len * sizeof(*message->text);
- ArenaPush(msgsArena, text_size);
-
- nrecv = recv(fd, (u8*)&message->text, text_size, 0);
- assert(nrecv != -1);
- assert(nrecv == message->len * sizeof(*message->text));
-
- return TEXTMESSAGE_SIZE + nrecv;
-}
-
-u32
-wstrlen(u32* str)
-{
- u32 i = 0;
- while (str[i] != 0)
- i++;
- return i;
-}
-
#endif
#define CHATTY_H
diff --git a/protocol.h b/protocol.h
new file mode 100644
index 0000000..84d636f
--- /dev/null
+++ b/protocol.h
@@ -0,0 +1,360 @@
+#ifndef PROTOCOL_H
+#define PROTOCOL_H
+
+#include "chatty.h"
+
+/// Protocol
+// - every message has format Header + Message
+// TODO: security
+//
+/// ID
+// - So clients can be identified uniquely.
+// - 8 bytes
+// - number that increments for each new client
+//
+/// Strings
+// - strings are sent with their null terminator
+//
+/// Authentication
+// This is what happens when the first time a client connects.
+// Scenario 1. We alreayd have an ID
+// 1. client-> Send own ID
+// 2. server-> knows ID?
+// y. server-> Success
+// n. 1. server-> Error 'notfound'
+// 2. client-> exit
+// Scenario 2. We do not have an ID
+// 1. client-> Introduces
+// 2. server-> Sends & Saves ID
+// 3. client-> Saves ID
+//
+/// Naming convention
+// Messages end with the Message suffix (eg. TextMessag, HistoryMessage)
+//
+// A function that is coupled to a type works like
+// <noun><type> eg. (printTextMessage, formatTimestamp)
+
+#define PROTOCOL_VERSION 0
+// Size of author string including null terminator
+#define AUTHOR_LEN 13
+// Size of formatted timestamp string including null terminator
+#define TIMESTAMP_LEN 9
+#define TIMESTAMP_FORMAT "%H:%M:%S"
+
+typedef u64 ID;
+
+// - 2 bytes for version
+// - 1 byte for message type
+// - 16 bytes for checksum
+typedef struct {
+ u16 version;
+ u8 type;
+} HeaderMessage;
+
+typedef enum {
+ HEADER_TYPE_TEXT = 0,
+ HEADER_TYPE_HISTORY,
+ HEADER_TYPE_PRESENCE,
+ HEADER_TYPE_ID,
+ HEADER_TYPE_INTRODUCTION,
+ HEADER_TYPE_ERROR
+} HeaderType;
+// shorthand for creating a header with a value from the enum
+#define HEADER_INIT(t) {.version = PROTOCOL_VERSION, .type = t}
+// from Tsoding video on minicel (https://youtu.be/HCAgvKQDJng?t=4546)
+// sv(https://github.com/tsoding/sv)
+#define HEADER_FMT "header: v%d %s(%d)"
+#define HEADER_ARG(header) header.version, headerTypeString(header.type), header.type
+
+// For sending texts to other clients
+// - 13 bytes for the author
+// - 8 bytes for the timestamp
+// - 8 bytes for id
+// - 2 bytes for the text length
+// - x*4 bytes for the text
+typedef struct {
+ ID id;
+ u64 timestamp; // timestamp of when the message was sent
+ u16 len;
+ wchar_t* text; // placeholder for indexing
+ // wchar_t* is used, because this renders the text in the debugger
+} TextMessage;
+// Size of TextMessage without text pointer
+#define TEXTMESSAGE_SIZE (sizeof(TextMessage) - sizeof(u32*))
+
+// Requesting messages sent after a timestamp.
+// - 8 bytes for the timestamp
+typedef struct {
+ u64 timestamp;
+} HistoryMessage;
+
+// Introduce the client to the server by sending the client's information.
+// See "First connection".
+// - 13 bytes for author
+typedef struct {
+ u8 author[AUTHOR_LEN];
+} IntroductionMessage;
+#define INTRODUCTION_FMT "introduction: %s"
+#define INTRODUCTION_ARG(message) message.author
+
+// Request IntroductionMessage for client with that id.
+// See "First connection" if this message is used when the client connects for the first time.
+// be used to retrieve information about a client with an unknown ID.
+// - 8 bytes for id
+typedef struct {
+ ID id;
+} IDMessage;
+
+// Notifying the sender's state, such as "connected", "disconnected", "AFK", ...
+// - 8 bytes for id
+// - 1 byte for type
+typedef struct {
+ ID id;
+ u8 type;
+} PresenceMessage;
+typedef enum {
+ PRESENCE_TYPE_CONNECTED = 0,
+ PRESENCE_TYPE_DISCONNECTED,
+ PRESENCE_TYPE_AFK
+} PresenceType;
+
+// Send an error message
+// - 1 byte for type
+typedef struct {
+ u8 type;
+} ErrorMessage;
+typedef enum {
+ ERROR_TYPE_BADMESSAGE = 0,
+ ERROR_TYPE_NOTFOUND,
+ ERROR_TYPE_SUCCESS,
+ ERROR_TYPE_ALREADYCONNECTED,
+ ERROR_TYPE_TOOMANYCONNECTIONS
+} ErrorType;
+#define ERROR_INIT(t) {.type = t}
+
+typedef struct {
+ s32 nrecv;
+ TextMessage* message;
+} recvTextMessageResult;
+
+// Returns string for type byte in HeaderMessage
+u8*
+headerTypeString(HeaderType type)
+{
+ switch (type) {
+ case HEADER_TYPE_TEXT: return (u8*)"TextMessage";
+ case HEADER_TYPE_HISTORY: return (u8*)"HistoryMessage";
+ case HEADER_TYPE_PRESENCE: return (u8*)"PresenceMessage";
+ case HEADER_TYPE_ID: return (u8*)"IDMessage";
+ case HEADER_TYPE_INTRODUCTION: return (u8*)"IntroductionMessage";
+ case HEADER_TYPE_ERROR: return (u8*)"ErrorMessage";
+ default: return (u8*)"Unknown";
+ }
+}
+
+u8*
+presenceTypeString(PresenceType type)
+{
+ switch (type) {
+ case PRESENCE_TYPE_CONNECTED: return (u8*)"connected";
+ case PRESENCE_TYPE_DISCONNECTED: return (u8*)"disconnected";
+ case PRESENCE_TYPE_AFK: return (u8*)"afk";
+ default: return (u8*)"Unknown";
+ }
+}
+
+u8*
+errorTypeString(ErrorType type)
+{
+ switch (type) {
+ case ERROR_TYPE_BADMESSAGE: return (u8*)"bad message";
+ case ERROR_TYPE_NOTFOUND: return (u8*)"not found";
+ case ERROR_TYPE_SUCCESS: return (u8*)"success";
+ case ERROR_TYPE_ALREADYCONNECTED: return (u8*)"already connected";
+ case ERROR_TYPE_TOOMANYCONNECTIONS: return (u8*)"too many connections";
+ default: return (u8*)"Unknown";
+ }
+}
+
+// Formats time t into tmsp string
+void
+formatTimestamp(u8 timestamp_str[TIMESTAMP_LEN], u64 timestamp)
+{
+ struct tm* ltime;
+ ltime = localtime((time_t*)&timestamp);
+ strftime((char*)timestamp_str, TIMESTAMP_LEN, TIMESTAMP_FORMAT, ltime);
+}
+
+// Receive a message from fd and store it in the msgsArena,
+// Returns pointer to the allocated memory
+TextMessage*
+recvTextMessage(Arena* msgsArena, u32 fd)
+{
+ TextMessage* message = ArenaPush(msgsArena, TEXTMESSAGE_SIZE);
+
+ // Receive everything but the text so we can know the text's size and act accordingly
+ s32 nrecv = recv(fd, message, TEXTMESSAGE_SIZE, 0);
+ assert(nrecv != -1);
+ assert(nrecv == TEXTMESSAGE_SIZE);
+
+ // Allocate memory for text and receive in that memory
+ u32 text_size = message->len * sizeof(*message->text);
+ ArenaPush(msgsArena, text_size);
+
+ nrecv = recv(fd, (u8*)&message->text, text_size, 0);
+ assert(nrecv != -1);
+ assert(nrecv == message->len * sizeof(*message->text));
+
+ return message;
+}
+
+typedef struct {
+ HeaderMessage* header;
+ void* message;
+} Message;
+
+u32
+getMessageSize(HeaderType type)
+{
+ u32 size = 0;
+ switch (type) {
+ case HEADER_TYPE_ERROR: size = sizeof(ErrorMessage); break;
+ case HEADER_TYPE_HISTORY: size = sizeof(HistoryMessage); break;
+ case HEADER_TYPE_ID: size = sizeof(IDMessage); break;
+ case HEADER_TYPE_INTRODUCTION: size = sizeof(IntroductionMessage); break;
+ case HEADER_TYPE_PRESENCE: size = sizeof(PresenceMessage); break;
+ default: assert(0);
+ }
+ return size;
+}
+
+s32
+recvAnyMessageType(s32 fd, HeaderMessage* header, void *anyMessage, HeaderType type)
+{
+ s32 nrecv = recv(fd, header, sizeof(*header), 0);
+ if (nrecv == -1 || nrecv == 0)
+ return nrecv;
+ assert(nrecv == sizeof(*header));
+
+ s32 size = 0;
+ switch (type) {
+ case HEADER_TYPE_ERROR:
+ case HEADER_TYPE_HISTORY:
+ case HEADER_TYPE_ID:
+ case HEADER_TYPE_INTRODUCTION:
+ case HEADER_TYPE_PRESENCE:
+ size = getMessageSize(header->type);
+ break;
+ case HEADER_TYPE_TEXT: {
+ TextMessage* message = anyMessage;
+ size = TEXTMESSAGE_SIZE + message->len * sizeof(*message->text);
+ } break;
+ default: assert(0); break;
+ }
+ assert(header->type == type);
+
+ nrecv = recv(fd, anyMessage, size, 0);
+ assert(nrecv != -1);
+ assert(nrecv == size);
+
+ return size;
+}
+
+// Get any message into arena
+Message
+recvAnyMessage(Arena* arena, s32 fd)
+{
+ HeaderMessage* header = ArenaPush(arena, sizeof(*header));
+ s32 nrecv = recv(fd, header, sizeof(*header), 0);
+ assert(nrecv != -1);
+ assert(nrecv == sizeof(*header));
+
+ s32 size = 0;
+ switch (header->type) {
+ case HEADER_TYPE_ERROR:
+ case HEADER_TYPE_HISTORY:
+ case HEADER_TYPE_ID:
+ case HEADER_TYPE_INTRODUCTION:
+ case HEADER_TYPE_PRESENCE:
+ size = getMessageSize(header->type);
+ break;
+ case HEADER_TYPE_TEXT: {
+ Message result;
+ result.header = header;
+ result.message = recvTextMessage(arena, fd);
+ return result;
+ } break;
+ default: assert(0); break;
+ }
+
+ void* message = ArenaPush(arena, size);
+ nrecv = recv(fd, message, size, 0);
+ assert(nrecv != -1);
+ assert(nrecv == size);
+
+ Message result;
+ result.header = header;
+ result.message = message;
+
+ return result;
+}
+
+Message
+waitForMessageType(Arena* arena, Arena* queueArena, u32 fd, HeaderType type)
+{
+ Message message;
+ while (1) {
+ message = recvAnyMessage(arena, fd);
+ if (message.header->type == type)
+ break;
+ ArenaPush(queueArena, getMessageSize(message.header->type));
+ }
+ return message;
+}
+
+// Generic sending function for sending any type of message to fd
+// Returns number of bytes sent in message or -1 if there was an error.
+s32
+sendAnyMessage(u32 fd, HeaderMessage* header, void* anyMessage)
+{
+ s32 nsend_total;
+ s32 nsend = send(fd, header, sizeof(*header), 0);
+ if (nsend == -1) return nsend;
+ assert(nsend == sizeof(*header));
+ nsend_total = nsend;
+
+ s32 size = 0;
+ switch (header->type) {
+ case HEADER_TYPE_ERROR:
+ case HEADER_TYPE_HISTORY:
+ case HEADER_TYPE_ID:
+ case HEADER_TYPE_INTRODUCTION:
+ case HEADER_TYPE_PRESENCE:
+ size = getMessageSize(header->type);
+ break;
+ case HEADER_TYPE_TEXT: {
+ nsend = send(fd, anyMessage, TEXTMESSAGE_SIZE, 0);
+ assert(nsend != -1);
+ assert(nsend == TEXTMESSAGE_SIZE);
+ nsend_total += nsend;
+ // set size to remaning text size that should be sent
+ TextMessage* message = (TextMessage*)anyMessage;
+ size = message->len * sizeof(*message->text);
+ nsend = 0;
+
+ anyMessage = &message->text;
+ } break;
+ default:
+ fprintf(stdout, "sendAnyMessage(%d)|Cannot send %s\n", fd, headerTypeString(header->type));
+ return 0;
+ }
+
+ nsend = send(fd, anyMessage, size, 0);
+ if (nsend == -1) return nsend;
+ assert(nsend == size);
+ nsend_total += nsend;
+
+ return nsend_total;
+}
+
+#endif
diff --git a/send.c b/send.c
index 585c925..689c6dc 100644
--- a/send.c
+++ b/send.c
@@ -8,6 +8,7 @@
#include <unistd.h>
#include "chatty.h"
+#include "protocol.h"
int
main(int argc, char** argv)
@@ -17,61 +18,70 @@ main(int argc, char** argv)
return 1;
}
- s32 err, serverfd, nsend;
+ s32 err, serverfd, nsend, nrecv;
serverfd = socket(AF_INET, SOCK_STREAM, 0);
assert(serverfd != -1);
- const struct sockaddr_in address = {
- AF_INET,
- htons(PORT),
- {0},
- };
+ const struct sockaddr_in address = {AF_INET, htons(PORT), {0}, {0}};
err = connect(serverfd, (struct sockaddr*)&address, sizeof(address));
assert(err == 0);
- // convert text to wide string
- u32 text_len = strlen(argv[2]) + 1;
- u32 text_wide[text_len];
- u32 size = mbstowcs((wchar_t*)text_wide, argv[2], text_len - 1);
- assert(size == text_len - 1);
- text_wide[text_len - 1] = 0;
- u32 author_len = strlen(argv[1]);
- assert(author_len + 1 <= AUTHOR_LEN); // add 1 for null terminator
-
- // Introduce ourselves
+ // Get our ID
+ ID id = 0;
{
- HeaderMessage header = HEADER_PRESENCEMESSAGE;
- PresenceMessage message;
+ // get author len
+ u32 author_len = strlen(argv[1]);
+ assert(author_len + 1 <= AUTHOR_LEN); // add 1 for null terminator
+
+ // Introduce ourselves
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_INTRODUCTION);
+ IntroductionMessage message;
memcpy(message.author, argv[1], author_len);
nsend = send(serverfd, &header, sizeof(header), 0);
assert(nsend != -1);
nsend = send(serverfd, &message, sizeof(message), 0);
assert(nsend != -1);
- }
- HeaderMessage header = HEADER_TEXTMESSAGE;
- TextMessage* message;
+ // Get id
+ nrecv = recv(serverfd, &header, sizeof(header), 0);
+ assert(nrecv != -1);
+ if (header.type == HEADER_TYPE_ERROR) {
+ ErrorMessage message;
+ nrecv = recv(serverfd, &message, sizeof(message), 0);
+ fprintf(stderr, "Got '%s' error.\n'", errorTypeString(message.type));
+ close(serverfd);
+ return 1;
+ }
+ assert(header.type == HEADER_TYPE_ID);
+ IDMessage idmessage;
+ nrecv = recv(serverfd, &idmessage, sizeof(idmessage), 0);
+ assert(nrecv != -1);
+ fprintf(stderr, "Got id: %lu\n", idmessage.id);
+ }
- u8 buf[text_len * sizeof(*text_wide) + TEXTMESSAGE_SIZE];
- bzero(buf, sizeof(buf));
- message = (TextMessage*)buf;
+ // convert text to wide string
+ u32 text_len = strlen(argv[2]) + 1;
+ u32 text_wide[text_len];
+ u32 size = mbstowcs((wchar_t*)text_wide, argv[2], text_len - 1);
+ assert(size == text_len - 1);
+ text_wide[text_len - 1] = 0;
- memcpy(message->author, argv[1], author_len);
- message->timestamp = time(NULL);
- message->len = text_len;
- memcpy(&message->text, text_wide, text_len * sizeof(*message->text));
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_TEXT);
+ TextMessage message;
+ bzero(&message, TEXTMESSAGE_SIZE);
+ message = (TextMessage){.id = id, .timestamp = time(NULL), .len = text_len};
nsend = send(serverfd, &header, sizeof(header), 0);
assert(nsend != -1);
- printf("header bytes sent: %d\n", nsend);
- nsend = send(serverfd, buf, sizeof(buf), 0);
+ fprintf(stderr, "header bytes sent: %d\n", nsend);
+ nsend = send(serverfd, &message, TEXTMESSAGE_SIZE, 0);
assert(nsend != -1);
+ fprintf(stderr, "message bytes sent: %d\n", nsend);
- printf("text length: %d\n", text_len);
- printf("buf size: %lu\n", sizeof(buf));
- printf("text size: %lu\n", sizeof(*text_wide) * text_len);
- printf("message bytes sent: %d\n", nsend);
+ u32 text_size = message.len * sizeof(*message.text);
+ nsend = send(serverfd, text_wide, text_size, 0);
+ fprintf(stderr, "text bytes sent: %d\n", nsend);
return 0;
}
diff --git a/server.c b/server.c
index 5c60244..03dbf4d 100644
--- a/server.c
+++ b/server.c
@@ -1,11 +1,15 @@
#include "chatty.h"
+#include "protocol.h"
#include <assert.h>
+#include <fcntl.h>
#include <netinet/in.h>
#include <poll.h>
+#include <signal.h>
#include <stdarg.h>
#include <string.h>
#include <sys/socket.h>
+#include <sys/stat.h>
#include <unistd.h>
// timeout on polling
@@ -14,276 +18,481 @@
#define MAX_CONNECTIONS 16
// Get number of connections from arena position
// NOTE: this is somewhat wrong, because of when disconnections happen
-#define FDS_SIZE (fdsArena->pos / sizeof(*fds))
+#define FDS_SIZE (fdsArena.pos / sizeof(struct pollfd))
+#define CLIENTS_SIZE (clientsArena.pos / sizeof(Client))
+
+// Enable/Disable saving clients permanently to file
+#define IMPORT_ID
+// Where to save clients
+#define CLIENTS_FILE "_clients"
+// Where to write logs
+#define LOGFILE "server.log"
+// Log to LOGFILE instead of stderr
+// #define LOGGING
// enum for indexing the fds array
enum { FDS_STDIN = 0,
FDS_SERVER,
FDS_CLIENTS };
-// Has information on clients
-// For each pollfd in fds there should be a matching client in clients
-// clients[i] <=> fds[i]
+// Client information
typedef struct {
u8 author[AUTHOR_LEN]; // matches author property on other message types
- Bool initialized; // boolean
+ ID id;
+ struct pollfd* pollunifd; // Index in fds array
+ struct pollfd* pollbifd; // Index in fds array
} Client;
+#define CLIENT_FMT "[%s](%lu)"
+#define CLIENT_ARG(client) client.author, client.id
+
+typedef enum {
+ UNIFD = 0,
+ BIFD
+} ClientFD;
+
+// TODO: remove
+// For handing out new ids to connections.
+global_variable u32 nclients = 0;
+
+// Returns client matching id in clients.
+// clientsArena is used to get an upper bound.
+// Returns 0 if there was no client found.
+Client*
+getClientByID(Arena* clientsArena, ID id)
+{
+ Client* clients = clientsArena->addr;
+ for (u32 i = 0; i < (clientsArena->pos / sizeof(*clients)); i++) {
+ if (clients[i].id == id)
+ return clients + i;
+ }
+ return 0;
+}
+
+// Print TextMessage prettily
+void
+printTextMessage(TextMessage* message, Client* client, u8 wide)
+{
+ u8 timestamp[TIMESTAMP_LEN] = {0};
+ formatTimestamp(timestamp, message->timestamp);
+
+ if (wide) {
+ setlocale(LC_ALL, "");
+ wprintf(L"TextMessage: %s [%s] %ls\n", timestamp, client->author, (wchar_t*)&message->text);
+ } else {
+ u8 str[message->len];
+ wcstombs((char*)str, (wchar_t*)&message->text, message->len * sizeof(*message->text));
+ loggingf("TextMessage: %s [%s] (%d)%s\n", timestamp, client->author, message->len, str);
+ }
+}
-// Send anyMessage to all clients in fds from fdsArena except for fds[i].
+// Send header and anyMessage to each connection in fds that is nfds number of connections except
+// for connfd.
+// Type will filter out only connections matching the type.
void
-sendToOthers(Arena* fdsArena, struct pollfd* fds, u32 i, HeaderMessage* header, void* anyMessage)
+sendToOthers(struct pollfd* fds, u32 nfds, s32 connfd, ClientFD type, HeaderMessage* header, void* anyMessage)
{
s32 nsend;
- for (u32 j = FDS_CLIENTS; j < FDS_SIZE; j++) {
- if (fds[j].fd == fds[i].fd) continue;
- if (fds[j].fd == -1) continue;
-
- // send header
- u32 nsend_total = 0;
- nsend = send(fds[j].fd, header, sizeof(*header), 0);
- assert(nsend != -1);
- assert(nsend == sizeof(*header));
- nsend_total += nsend;
-
- // send message
- switch (header->type) {
- case HEADER_TYPE_PRESENCE: {
- PresenceMessage* message = (PresenceMessage*)anyMessage;
- nsend = send(fds[j].fd, message, sizeof(*message), 0);
- assert(nsend != -1);
- assert(nsend == sizeof(*message));
- fprintf(stdout, " Notifying(%d->%d).\n", fds[i].fd, fds[j].fd);
- break;
- }
- case HEADER_TYPE_TEXT: {
- TextMessage* message = (TextMessage*)anyMessage;
- nsend = send(fds[j].fd, message, TEXTMESSAGE_SIZE, 0);
- assert(nsend != -1);
- assert(nsend == TEXTMESSAGE_SIZE);
- nsend_total += nsend;
- nsend = send(fds[j].fd, &message->text, message->len * sizeof(*message->text), 0);
- assert(nsend != -1);
- assert(nsend == (message->len * sizeof(*message->text)));
- nsend_total += nsend;
- break;
- }
- default:
- fprintf(stdout, " Cannot retransmit %s\n", headerTypeString(header->type));
- }
+ for (u32 i = FDS_CLIENTS + type; i < nfds; i += 2) {
+ if (fds[i].fd == connfd) continue;
+ if (fds[i].fd == -1) continue;
- fprintf(stdout, " Retransmitted(%d->%d) %d bytes.\n", fds[i].fd, fds[j].fd, nsend_total);
+ nsend = sendAnyMessage(fds[i].fd, header, anyMessage);
+ loggingf("sendToOthers(%d)|[%s]->%d %d bytes\n", connfd, headerTypeString(header->type), fds[i].fd, nsend);
}
}
+// Send header and anyMessage to each connection in fds that is nfds number of connections.
+// Type will filter out only connections matching the type.
void
-disconnect(Arena* fdsArena, struct pollfd* fds, u32 i, Client* client)
+sendToAll(struct pollfd* fds, u32 nfds, ClientFD type, HeaderMessage* header, void* anyMessage)
{
- fprintf(stdout, "Disconnected(%d). \n", fds[i].fd);
- shutdown(fds[i].fd, SHUT_RDWR);
- close(fds[i].fd); // send close to client
-
- // Send disconnection to other connected clients
- HeaderMessage header = HEADER_PRESENCEMESSAGE;
- PresenceMessage message = {
- .type = PRESENCE_TYPE_DISCONNECTED
- };
- memcpy(message.author, client->author, AUTHOR_LEN);
- sendToOthers(fdsArena, fds, i, &header, &message);
-
- fds[i].fd = -1; // ignore in the future
- client->initialized = False; // deinitialize client
+ for (u32 i = FDS_CLIENTS + type; i < nfds; i += 2) {
+ if (fds[i].fd == -1) continue;
+ s32 nsend = sendAnyMessage(fds[i].fd, header, anyMessage);
+ loggingf("sendToAll|[%s]->%d %d bytes\n", headerTypeString(header->type), fds[i].fd, nsend);
+ }
}
-// Initialize a client that connects for the first time or reconnects.
-// Receive HeaderMessage and PresenceMessage from fd and set client with the data from
-// PresenceMessage.
-// Notify fds in fdsArena.
-// TODO: handle wrong messages
+// Disconnect a client by closing the matching file descriptors
void
-initClient(Arena* fdsArena, struct pollfd* fds, s32 fd, Client* client)
+disconnect(struct pollfd* pollfd, Client* client)
{
- s32 nrecv = 0;
- s32 nsend = 0;
+ loggingf("Disconnecting "CLIENT_FMT"\n", CLIENT_ARG((*client)));
+ if (pollfd[UNIFD].fd != -1) {
+ close(pollfd[UNIFD].fd);
+ }
+ if (pollfd[BIFD].fd != -1) {
+ close(pollfd[BIFD].fd);
+ }
+ pollfd[UNIFD].fd = -1;
+ pollfd[BIFD].fd = -1;
+ // TODO: mark as free
+ if (client) {
+ client->pollunifd = 0;
+ client->pollbifd = 0;
+ }
+}
- fprintf(stdout, " Adding to clients(%d).\n", fd);
+// Disconnects fds+conn from fds with nfds connections, then send a PresenceMessage to other
+// clients about disconnection.
+void
+disconnectAndNotify(Client* client, struct pollfd* fds, u32 nfds, u32 conn)
+{
+ disconnect(fds + conn, client);
+
+ local_persist HeaderMessage header = HEADER_INIT(HEADER_TYPE_PRESENCE);
+ PresenceMessage message = {.id = client->id, .type = PRESENCE_TYPE_DISCONNECTED};
+ sendToAll(fds, nfds, UNIFD, &header, &message);
+}
+
+// Receive authentication from pollfd->fd and create client out of it. Look in
+// clientsArena if it already exists. Otherwise push a new onto the arena and write its information
+// to clients_file.
+// See "Authentication" in chatty.h
+Client*
+authenticate(Arena* clientsArena, s32 clients_file, struct pollfd* clientfds)
+{
+ s32 nrecv = 0;
+ Client* clients = clientsArena->addr;
HeaderMessage header;
- nrecv = recv(fd, &header, sizeof(header), 0);
- assert(nrecv != -1);
- assert(nrecv == sizeof(header));
- if (header.type != HEADER_TYPE_PRESENCE) {
- // reject connection
- close(fd);
- fprintf(stdout, " Got wrong header(%d).\n", fd);
- return;
+ nrecv = recv(clientfds[BIFD].fd, &header, sizeof(header), 0);
+ if (nrecv != sizeof(header)) {
+ loggingf("authenticate(%d)|err: %d/%lu bytes\n", clientfds[BIFD].fd, nrecv, sizeof(header));
+ return 0;
}
- fprintf(stdout, " Got header(%d).\n", fd);
-
- PresenceMessage message;
- nrecv = recv(fd, &message, sizeof(message), 0);
- assert(nrecv != -1);
- assert(nrecv == sizeof(message));
- fprintf(stdout, " Got presence message(%d).\n", fd);
-
- // Copy author from PresenceMessage.
- memcpy(client->author, message.author, AUTHOR_LEN);
-
- // Notify other clients from this new one
- // Reuse header and message
- for (u32 j = FDS_CLIENTS; j < FDS_SIZE; j++) {
- if (fds[j].fd == fd)
- continue;
- if (fds[j].fd == -1)
- continue;
- fprintf(stdout, " Notifying(%d->%d).\n", fd, fds[j].fd);
- nsend = send(fds[j].fd, &header, sizeof(header), 0);
- assert(nsend != -1);
- assert(nsend == sizeof(header));
- nsend = send(fds[j].fd, &message, sizeof(message), 0);
- assert(nsend != -1);
- assert(nsend == sizeof(message));
+ loggingf("authenticate(%d)|" HEADER_FMT "\n", clientfds[BIFD].fd, HEADER_ARG(header));
+
+ Client* client = 0;
+ // Scenario 1: Search for existing client
+ if (header.type == HEADER_TYPE_ID) {
+ IDMessage message;
+ nrecv = recv(clientfds[BIFD].fd, &message, sizeof(message), 0);
+ if (nrecv != sizeof(message)) {
+ loggingf("authenticate(%d)|err: %d/%lu bytes\n", clientfds[BIFD].fd, nrecv, sizeof(message));
+ return 0;
+ }
+
+ client = getClientByID(clientsArena, message.id);
+ if (client) {
+ loggingf("authenticate(%d)|found [%s](%lu)\n", clientfds[BIFD].fd, client->author, client->id);
+ header.type = HEADER_TYPE_ERROR;
+ // TODO: allow multiple connections
+ if (client->pollunifd != 0 || client->pollbifd != 0) {
+ loggingf("authenticate(%d)|err: already connected\n", clientfds[BIFD].fd);
+ ErrorMessage error_message = ERROR_INIT(ERROR_TYPE_ALREADYCONNECTED);
+ sendAnyMessage(clientfds[BIFD].fd, &header, &error_message);
+ return 0;
+ }
+ ErrorMessage error_message = ERROR_INIT(ERROR_TYPE_SUCCESS);
+ sendAnyMessage(clientfds[BIFD].fd, &header, &error_message);
+ } else {
+ loggingf("authenticate(%d)|notfound\n", clientfds[BIFD].fd);
+ header.type = HEADER_TYPE_ERROR;
+ ErrorMessage error_message = ERROR_INIT(ERROR_TYPE_NOTFOUND);
+ sendAnyMessage(clientfds[BIFD].fd, &header, &error_message);
+ return 0;
+ }
+ // Scenario 2: Create a new client
+ } else if (header.type == HEADER_TYPE_INTRODUCTION) {
+ IntroductionMessage message;
+ nrecv = recv(clientfds[BIFD].fd, &message, sizeof(message), 0);
+ if (nrecv != sizeof(message)) {
+ loggingf("authenticate(%d)|err: %d/%lu bytes\n", clientfds[BIFD].fd, nrecv, sizeof(message));
+ return 0;
+ }
+
+ // Copy metadata from IntroductionMessage
+ client = ArenaPush(clientsArena, sizeof(*clients));
+ memcpy(client->author, message.author, AUTHOR_LEN);
+ client->id = nclients;
+ nclients++;
+
+ // Save client
+#ifdef IMPORT_ID
+ write(clients_file, client, sizeof(*client));
+#endif
+ loggingf("authenticate(%d)|Added [%s](%lu)\n", clientfds[BIFD].fd, client->author, client->id);
+
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_ID);
+ IDMessage id_message = {.id = client->id};
+ sendAnyMessage(clientfds[BIFD].fd, &header, &id_message);
+ } else {
+ loggingf("authenticate(%d)|Wrong header expected %s or %s\n", clientfds[BIFD].fd, headerTypeString(HEADER_TYPE_INTRODUCTION), headerTypeString(HEADER_TYPE_ID));
+ return 0;
}
+ assert(client != 0);
+
+ client->pollunifd = clientfds;
+ client->pollbifd = clientfds + 1;
+
+ return client;
}
int
-main(void)
+main(int argc, char** argv)
{
- s32 err, serverfd, clientfd;
- u32 on = 1;
+ signal(SIGPIPE, SIG_IGN);
+
+ logfd = 2;
+ // optional logging
+ if (argc > 1) {
+ if (*argv[1] == '-')
+ if (argv[1][1] == 'l') {
+ logfd = open(LOGFILE, O_RDWR | O_CREAT | O_TRUNC, 0600);
+ assert(logfd != -1);
+ }
+ }
+ s32 serverfd;
// Start listening on the socket
{
- serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
+ s32 err;
+ u32 on = 1;
+ serverfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
assert(serverfd > 2);
err = setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, (u8*)&on, sizeof(on));
- assert(err == 0);
+ assert(!err);
const struct sockaddr_in address = {
AF_INET,
htons(PORT),
{0},
+ {0},
};
err = bind(serverfd, (const struct sockaddr*)&address, sizeof(address));
- assert(err == 0);
+ assert(!err);
err = listen(serverfd, MAX_CONNECTIONS);
- assert(err == 0);
+ assert(!err);
+ loggingf("Listening on :%d\n", PORT);
}
- Arena* msgsArena = ArenaAlloc(Megabytes(128)); // storing received messages
- // NOTE: sent messages?
- s32 nrecv = 0; // number of bytes received
-
- Arena* clientsArena = ArenaAlloc(MAX_CONNECTIONS * sizeof(Client));
- Arena* fdsArena = ArenaAlloc(MAX_CONNECTIONS * sizeof(struct pollfd));
- struct pollfd* fds = fdsArena->addr;
- Client* clients = clientsArena->addr;
+ Arena clientsArena;
+ Arena fdsArena;
+ Arena msgsArena;
+ ArenaAlloc(&clientsArena, MAX_CONNECTIONS * sizeof(Client));
+ ArenaAlloc(&fdsArena, MAX_CONNECTIONS * 2 * sizeof(struct pollfd));
+ ArenaAlloc(&msgsArena, Megabytes(128)); // storing received messages
+ struct pollfd* fds = fdsArena.addr;
+ Client* clients = clientsArena.addr;
+ // Initializing fds
struct pollfd* fdsAddr;
- struct pollfd newpollfd = {-1, POLLIN, 0};
-
+ struct pollfd newpollfd = {-1, POLLIN, 0}; // for copying with events already set
// initialize fds structure
newpollfd.fd = 0;
- fdsAddr = ArenaPush(fdsArena, sizeof(*fds));
+ fdsAddr = ArenaPush(&fdsArena, sizeof(*fds));
memcpy(fdsAddr, &newpollfd, sizeof(*fds));
// add serverfd
newpollfd.fd = serverfd;
- fdsAddr = ArenaPush(fdsArena, sizeof(*fds));
+ fdsAddr = ArenaPush(&fdsArena, sizeof(*fds));
memcpy(fdsAddr, &newpollfd, sizeof(*fds));
newpollfd.fd = -1;
+#ifdef IMPORT_ID
+ s32 clients_file = open(CLIENTS_FILE, O_RDWR | O_CREAT | O_APPEND, 0600);
+ assert(clients_file != -1);
+ struct stat statbuf;
+ assert(fstat(clients_file, &statbuf) != -1);
+
+ read(clients_file, clients, statbuf.st_size);
+ if (statbuf.st_size > 0) {
+ ArenaPush(&clientsArena, statbuf.st_size);
+ loggingf("Imported %lu client(s)\n", statbuf.st_size / sizeof(*clients));
+ nclients += statbuf.st_size / sizeof(*clients);
+ }
+ for (u32 i = 0; i < nclients; i++)
+ loggingf("Imported: " CLIENT_FMT "\n", CLIENT_ARG(clients[i]));
+#endif
+
// Initialize the rest of the fds array
for (u32 i = FDS_CLIENTS; i < MAX_CONNECTIONS; i++)
fds[i] = newpollfd;
+ // Reset file descriptors on imported clients
+ for (u32 i = 0; i < CLIENTS_SIZE; i++) {
+ clients[i].pollunifd = 0;
+ clients[i].pollbifd = 0;
+ }
while (1) {
- err = poll(fds, FDS_SIZE, TIMEOUT);
+ s32 err = poll(fds, FDS_SIZE, TIMEOUT);
assert(err != -1);
if (fds[FDS_STDIN].revents & POLLIN) {
- // helps for testing and exiting gracefully
- break;
+ u8 c; // exit on ctrl-d
+ if (!read(fds[FDS_STDIN].fd, &c, 1))
+ break;
} else if (fds[FDS_SERVER].revents & POLLIN) {
- clientfd = accept(serverfd, NULL, NULL);
- assert(clientfd != -1);
- assert(clientfd > serverfd);
- fprintf(stdout, "New connection(%d).\n", clientfd);
-
- // If there is a slot in fds with fds[found].fd == -1 use it instead, otherwise allocate
- // some space on the arena.
- u8 found;
- for (found = FDS_CLIENTS; found < FDS_SIZE; found++)
- if (fds[found].fd == -1)
- break;
- if (found == MAX_CONNECTIONS) {
- // TODO: reject connection
- close(clientfd);
- fprintf(stdout, "Max clients reached.");
- } else if (found == FDS_SIZE) {
- // no more space, allocate
- struct pollfd* pollfd = ArenaPush(fdsArena, sizeof(*pollfd));
- pollfd->fd = clientfd;
- pollfd->events = POLLIN;
+ // TODO: what if we are not aligned by 2 anymore?
+ s32 unifd = accept(serverfd, 0, 0);
+ s32 bifd = accept(serverfd, 0, 0);
+
+ if (unifd == -1 || bifd == -1) {
+ loggingf("Error while accepting connection (%d,%d)\n", unifd, bifd);
+ if (unifd != -1) close(unifd);
+ if (bifd != -1) close(bifd);
+ continue;
+ } else
+ loggingf("New connection(%d,%d)\n", unifd, bifd);
+
+ // TODO: find empty space in arena
+ if (nclients + 1 == MAX_CONNECTIONS) {
+ local_persist HeaderMessage header = HEADER_INIT(HEADER_TYPE_ERROR);
+ local_persist ErrorMessage message = ERROR_INIT(ERROR_TYPE_TOOMANYCONNECTIONS);
+ sendAnyMessage(unifd, &header, &message);
+ if (unifd != -1)
+ close(unifd);
+ if (bifd != -1)
+ close(bifd);
+ loggingf("Max clients reached. Rejected connection\n");
} else {
- // hole found
- fds[found].fd = clientfd;
- fds[found].events = POLLIN;
- fprintf(stdout, "Added pollfd(%d).\n", clientfd);
+ // no more space, allocate
+ struct pollfd* clientfds = ArenaPush(&fdsArena, 2 * sizeof(*clientfds));
+ clientfds[UNIFD].fd = unifd;
+ clientfds[UNIFD].events = POLLIN;
+ clientfds[BIFD].fd = bifd;
+ clientfds[BIFD].events = POLLIN;
+ loggingf("Added pollfd(%d,%d)\n", unifd, bifd);
}
}
- // Check for messages from clients
- for (u32 i = FDS_CLIENTS; i < FDS_SIZE; i++) {
- if (!(fds[i].revents & POLLIN))
- continue;
- assert(fds[i].fd != -1);
- fprintf(stdout, "Message(%d).\n", fds[i].fd);
- Client* client = clients + i;
-
- // Initialize the client if this is the first time
- if (!client->initialized) {
- initClient(fdsArena, fds, fds[i].fd, client);
- client->initialized = True;
- fprintf(stdout, " Added to clients(%d): %s\n", fds[i].fd, client->author);
+ // Check for messages from clients in their unifd
+ for (u32 conn = FDS_CLIENTS; conn < FDS_SIZE; conn += 2) {
+ if (!(fds[conn].revents & POLLIN)) continue;
+ if (fds[conn].fd == -1) continue;
+ loggingf("Message unifd (%d)\n", fds[conn].fd);
+
+ // Get client associated with connection
+ Client* client = 0;
+ for (u32 j = 0; j < CLIENTS_SIZE; j++) {
+ if (!clients[j].pollunifd)
+ continue;
+ if (clients[j].pollunifd == fds + conn) {
+ client = clients + j;
+ break;
+ }
+ }
+ if (!client) {
+ loggingf("No client associated(%d)\n", fds[conn].fd);
+ close(fds[conn].fd);
continue;
}
+ loggingf("Found client(%lu) [%s] (%d)\n", client->id, client->author, fds[conn].fd);
// We received a message, try to parse the header
HeaderMessage header;
- nrecv = recv(fds[i].fd, &header, sizeof(header), 0);
- assert(nrecv != -1);
-
+ s32 nrecv = recv(fds[conn].fd, &header, sizeof(header), 0);
if (nrecv == 0) {
- disconnect(fdsArena, fds, i, (clients + i));
+ disconnectAndNotify(client, fds, FDS_SIZE, conn);
+ loggingf("Disconnected(%lu) [%s]\n", client->id, client->author);
+ continue;
+ } else if (nrecv != sizeof(header)) {
+ disconnectAndNotify(client, fds, FDS_SIZE, conn);
+ loggingf("error(%lu) [%s] %d/%lu bytes\n", client->id, client->author, nrecv, sizeof(header));
continue;
}
-
- assert(nrecv == sizeof(header));
- fprintf(stderr, " Received(%d): %d bytes -> " PH_FMT "\n", fds[i].fd, nrecv, PH_ARG(header));
+ loggingf("Received(%d) -> " HEADER_FMT "\n", fds[conn].fd, HEADER_ARG(header));
switch (header.type) {
- case HEADER_TYPE_TEXT:;
- TextMessage* message;
- nrecv = recvTextMessage(msgsArena, fds[i].fd, &message);
- fprintf(stderr, " Received(%d): %d bytes -> ", fds[i].fd, nrecv);
- printTextMessage(message, 0);
+ case HEADER_TYPE_TEXT: {
+ TextMessage* text_message = recvTextMessage(&msgsArena, fds[conn].fd);
+ loggingf("Received(%d)", fds[conn].fd);
+ printTextMessage(text_message, client, 0);
+
+ sendToOthers(fds, FDS_SIZE, fds[conn].fd, UNIFD, &header, text_message);
+ } break;
+ // handle request for information about client id
+ default:
+ loggingf("Unhandled '%s' from client(%d)\n", headerTypeString(header.type), fds[conn].fd);
+ disconnectAndNotify(client, fds, FDS_SIZE, conn);
+ continue;
+ }
+ }
- // Send message to all other clients
- sendToOthers(fdsArena, fds, i, &header, message);
+ // Check for messages from clients in their bifd
+ for (u32 conn = FDS_CLIENTS + BIFD; conn < FDS_SIZE; conn += 2) {
+ if (!(fds[conn].revents & POLLIN)) continue;
+ if (fds[conn].fd == -1) continue;
+ loggingf("Message bifd (%d)\n", fds[conn].fd);
+
+ // Get client associated with connection
+ Client* client = 0;
+ for (u32 j = 0; j < CLIENTS_SIZE; j++) {
+ if (!clients[j].pollbifd)
+ continue;
+ if (clients[j].pollbifd == fds + conn) {
+ client = clients + j;
+ break;
+ }
+ }
+ if (!client) {
+ loggingf("No client for connection(%d)\n", fds[conn].fd);
+#ifdef IMPORT_ID
+ client = authenticate(&clientsArena, clients_file, fds + conn - 1);
+#else
+ client = authenticate(&clientsArena, 0, fds + conn - 1);
+#endif
+ // If the client sent an IDMessage but no ID was found authenticate() could return null
+ if (!client) {
+ loggingf("Could not initialize client\n");
+ disconnect(fds + conn, 0);
+ } else { // client was added/connected
+ local_persist HeaderMessage header = HEADER_INIT(HEADER_TYPE_PRESENCE);
+ PresenceMessage message = {.id = client->id, .type = PRESENCE_TYPE_CONNECTED};
+ sendToOthers(fds, FDS_SIZE, fds[conn - BIFD].fd, UNIFD, &header, &message);
+ }
+ continue;
+ }
+ loggingf("Found client(%lu) [%s] (%d)\n", client->id, client->author, fds[conn].fd);
- break;
+ // We received a message, try to parse the header
+ HeaderMessage header;
+ s32 nrecv = recv(fds[conn].fd, &header, sizeof(header), 0);
+ if (nrecv == 0) {
+ disconnectAndNotify(client, fds, FDS_SIZE, conn);
+ loggingf("Disconnected(%lu) [%s]\n", client->id, client->author);
+ continue;
+ } else if (nrecv != sizeof(header)) {
+ disconnectAndNotify(client, fds, FDS_SIZE, conn);
+ loggingf("error(%lu) [%s] %d/%lu bytes\n", client->id, client->author, nrecv, sizeof(header));
+ continue;
+ }
+ loggingf("Received(%d) -> " HEADER_FMT "\n", fds[conn].fd, HEADER_ARG(header));
+
+ switch (header.type) {
+ case HEADER_TYPE_ID: {
+ // handle request for information about client id
+ IDMessage id_message;
+ nrecv = recv(fds[conn].fd, &id_message, sizeof(id_message), 0);
+
+ Client* client = getClientByID(&clientsArena, id_message.id);
+ if (!client) {
+ local_persist HeaderMessage header = HEADER_INIT(HEADER_TYPE_ERROR);
+ local_persist ErrorMessage error_message = ERROR_INIT(ERROR_TYPE_NOTFOUND);
+ sendAnyMessage(fds[conn].fd, &header, &error_message);
+ loggingf("Could not find %lu\n", id_message.id);
+ break;
+ }
+ HeaderMessage header = HEADER_INIT(HEADER_TYPE_INTRODUCTION);
+ IntroductionMessage introduction_message;
+ memcpy(introduction_message.author, client->author, AUTHOR_LEN);
+
+ sendAnyMessage(fds[conn].fd, &header, &introduction_message);
+ } break;
default:
- fprintf(stdout, " Got unhandled message type '%s' from client %d", headerTypeString(header.type), fds[i].fd);
+ loggingf("Unhandled '%s' from client(%d)\n", headerTypeString(header.type), fds[conn].fd);
+ disconnectAndNotify(client, fds, FDS_SIZE, conn);
continue;
}
}
}
- ArenaRelease(clientsArena);
- ArenaRelease(fdsArena);
- ArenaRelease(msgsArena);
+#ifdef IMPORT_ID
+ close(clients_file);
+#endif
return 0;
}