From b9aeccef208d6d5b7d40b71886981723f1e14b95 Mon Sep 17 00:00:00 2001 From: Raymaekers Luca Date: Thu, 31 Oct 2024 00:32:07 +0100 Subject: Added ID system with 1 and 2-way communication Each client now has an ID that is permanently stored to ID_FILE location. To implement this each client now uses two connections to the server, one for bidirectional communication and one for unidirectional communication. This makes it easier to not receive unexpected message. Also each client and server now has a Client struct that represents a client and a clientsArena associated with it. Minor changes: - Added logging to LOGFILE, that can be turned with LOGGING macro. - Added more error types - Added error handling on server - Added error messages - Added convenience functions - Added disconnectAndNotify() function for convenience - Use recvTextMessageResult as multiple-value-return-type instead of ** - Separated protocol stuff into protocol.h - Added Result types when wanting to return multiple values - Do not allocate arena's with malloc - Added recvAnyMessageType for receiving messages that do not need to be stored - Add UNIFD and BIFD file descriptors for separating requests chatty.c: - Convert ID to string in screen_home() - Removed the fds global variable - Pass fds to threadReconnect - Implement faster sleep with nanosleep(2) - Close file descriptors when failed so we do not have too many file descriptors open server.c: - Send presence messages on disconnect & connect - renamed i to conn --- chatty.c | 393 ++++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 278 insertions(+), 115 deletions(-) (limited to 'chatty.c') diff --git a/chatty.c b/chatty.c index 3b59001..4746aa7 100644 --- a/chatty.c +++ b/chatty.c @@ -2,6 +2,7 @@ #include "termbox2.h" #include "chatty.h" +#include "protocol.h" #include #include @@ -14,19 +15,39 @@ // time to reconnect in seconds #define TIMEOUT_RECONNECT 1 #define INPUT_LIMIT 512 - -// must be of AUTHOR_LEN -1 -static u8 username[AUTHOR_LEN] = "(null)"; -// file descriptros for polling -static struct pollfd* fds = NULL; -// mutex for locking fds when in thread_reconnect() -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; - -enum { FDS_SERVER = 0, +// Filepath where user ID is stored +#define ID_FILE "_id" +// Import id from ID_FILE +#define IMPORT_ID +// Filepath where logged +#define LOGFILE "chatty.log" +// enable logging +#define LOGGING + +enum { FDS_UNI = 0, // for one-way communication with the server (eg. TextMessage) + FDS_BI, // For two-way communication with the server (eg. IDMessage) FDS_TTY, FDS_RESIZE, FDS_MAX }; +typedef struct { + u8 author[AUTHOR_LEN]; + ID id; +} Client; +#define CLIENT_FMT "[%s](%lu)" +#define CLIENT_ARG(client) client.author, client.id + +typedef struct { + s32 err; // Error while connecting + s32 unifd; + s32 bifd; +} ConnectionResult; + +// Client used by chatty +global_variable Client user = {0}; +// Address of chatty server +global_variable struct sockaddr_in address; + // fill str array with char void fillstr(u32* str, u32 ch, u32 len) @@ -44,39 +65,118 @@ popup(u32 fg, u32 bg, char* text) tb_print(global.width / 2 - len / 2, global.height / 2, fg, bg, text); } +// Returns client in clientsArena matching id +// Returns user if the id was the user's ID +// Returns 0 if nothing was found +Client* +getClientById(Arena* clientsArena, ID id) +{ + if (id == user.id) return &user; + + Client* clients = clientsArena->addr; + for (u64 i = 0; i < (clientsArena->pos / sizeof(*clients)); i++) { + if (clients[i].id == id) + return clients + i; + } + return 0; +} + +// Request information of client from fd byd id and add it to clientsArena +// Returns pointer to added client +Client* +addClientInfo(Arena* clientsArena, s32 fd, u64 id) +{ + // Request information about ID + HeaderMessage header = HEADER_INIT(HEADER_TYPE_ID); + IDMessage id_message = {.id = id}; + sendAnyMessage(fd, &header, &id_message); + + Client* client = ArenaPush(clientsArena, sizeof(*client)); + + // Wait for response + IntroductionMessage introduction_message; + recvAnyMessageType(fd, &header, &introduction_message, HEADER_TYPE_INTRODUCTION); + + // Add the information + memcpy(client->author, introduction_message.author, AUTHOR_LEN); + client->id = id; + + loggingf("Got " CLIENT_FMT "\n", CLIENT_ARG((*client))); + return client; +} + +// Tries to connect to address and populates resulting file descriptors in ConnectionResult. +ConnectionResult +getConnection(struct sockaddr_in* address) +{ + ConnectionResult result; + result.unifd = socket(AF_INET, SOCK_STREAM, 0); + result.bifd = socket(AF_INET, SOCK_STREAM, 0); + result.err = connect(result.unifd, (struct sockaddr*)address, sizeof(*address)); + if (result.err) return result; // We do not overwrite the error and return early so we can be + // certain of what error errno belongs to. + result.err = connect(result.bifd, (struct sockaddr*)address, sizeof(*address)); + return result; +} + // Connect to *address_ptr of type `struct sockaddr_in*`. If it failed wait for TIMEOUT_RECONNECT // seconds. // This function is meant to be run by a thread. // An offline server means fds[FDS_SERVER] is set to -1. When online // it is set to with the appropriate file descriptor. -// Returns NULL. +// Returns 0. +#define Miliseconds(s) (s*1000*1000) void* -thread_reconnect(void* address_ptr) +threadReconnect(void* fds_ptr) { - u32 serverfd, err; - struct sockaddr_in* address = address_ptr; - + struct pollfd* fds = fds_ptr; + ConnectionResult result; + struct timespec t = { 0, Miliseconds(300) }; // 300 miliseconds + loggingf("Trying to reconnect\n"); while (1) { - serverfd = socket(AF_INET, SOCK_STREAM, 0); - assert(serverfd > 2); // greater than STDERR - err = connect(serverfd, (struct sockaddr*)address, sizeof(*address)); - if (err == 0) - break; - assert(errno == ECONNREFUSED); - // TODO: faster reconnection? (too many open files) - sleep(TIMEOUT_RECONNECT); + nanosleep(&t, &t); + result = getConnection(&address); + if (result.err) { + // loggingf("err: %d\n", result.err); + loggingf("errno: %d\n", errno); + } else if (result.unifd != -1 && result.bifd != -1) { + loggingf("Reconnect succeeded (%d, %d), authenticating\n", result.unifd, result.bifd); + // We assume that we already have an ID + // TODO: could there be a problem if a message is received at the same time? + // - not on server restart, but what if we lost connection? + HeaderMessage header = HEADER_INIT(HEADER_TYPE_ID); + IDMessage id_message = {.id = user.id}; + sendAnyMessage(result.bifd, &header, &id_message); + + ErrorMessage error_message; + s32 nrecv = recvAnyMessageType(result.bifd, &header, &error_message, HEADER_TYPE_ERROR); + if (nrecv == -1 || nrecv == 0) { + loggingf("Error on receive, retrying...\n"); + continue; + } + + assert(header.type == HEADER_TYPE_ERROR); + if (error_message.type == ERROR_TYPE_SUCCESS) { + loggingf("Reconnected\n"); + break; + } else { + loggingf("err: %s\n", errorTypeString(error_message.type)); + } + } + if (result.unifd != -1) + close(result.unifd); + if (result.bifd != -1) + close(result.bifd); + loggingf("Failed, retrying..\n"); } - // if the server would send a disconnect again and the polling catches up there could be two - // threads accessing fds. - pthread_mutex_lock(&mutex); - fds[FDS_SERVER].fd = serverfd; - pthread_mutex_unlock(&mutex); + fds[FDS_BI].fd = result.bifd; + fds[FDS_UNI].fd = result.unifd; - // ask to redraw screen + // Redraw screen raise(SIGWINCH); - return NULL; + return 0; } // Print `text` wrapped to limit_x. It will print no more than limit_y lines. x, y, fg and @@ -85,9 +185,8 @@ thread_reconnect(void* address_ptr) // this is useful when for example: printing messages and wanting to have consistent // timestamp+author name. // Returns the number of lines printed. -// TODO: add y limit -// TODO:(bug) text after pfx is wrapped one too soon -// TODO: text == NULL to know how many lines *would* be printed +// TODO: (bug) text after pfx is wrapped one too soon +// TODO: text == 0 to know how many lines *would* be printed // - no this should be a separate function // TODO: check if text[i] goes out of bounds u32 @@ -107,7 +206,7 @@ tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, u32* text, s32 text_len, u32 fg_pfx u32 failed = 0; // NOTE: We can assume that we need to wrap, therefore print a newline after the prefix string - if (pfx != NULL) { + if (pfx != 0) { tb_printf(x, ly, fg_pfx, bg_pfx, "%s", pfx); // If the text fits on one line print the text and return @@ -174,7 +273,7 @@ tb_printf_wrap(u32 x, u32 y, u32 fg, u32 bg, u32* text, s32 text_len, u32 fg_pfx // it displays a prompt with the user input of input_len wide characters // and the received messages from msgsArena void -screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len) +screen_home(Arena* msgsArena, u32 nmessages, Arena* clientsArena, struct pollfd* fds, u32 input[], u32 input_len) { // config options const s32 box_max_len = 80; @@ -205,7 +304,7 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len) goto draw_prompt; u8* addr = msgsArena->addr; - assert(addr != NULL); + assert(addr != 0); // on what line to print the current message, used for scrolling u32 msg_y = 0; @@ -238,46 +337,62 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len) HeaderMessage* header = (HeaderMessage*)addr; addr += sizeof(*header); + // Get Client for message + ID* id; + Client* client; + switch (header->type) { + case HEADER_TYPE_TEXT: + id = &((TextMessage*)addr)->id; + case HEADER_TYPE_PRESENCE: + id = &((PresenceMessage*)addr)->id; + client = getClientById(clientsArena, *id); + if (!client) { + loggingf("Client not known, requesting from server\n"); + client = addClientInfo(clientsArena, fds[FDS_BI].fd, *id); + } + assert(client); + break; + } + switch (header->type) { case HEADER_TYPE_TEXT: { TextMessage* message = (TextMessage*)addr; + // Color own messages u32 fg = 0; - if (strncmp((char*)username, (char*)message->author, AUTHOR_LEN) == 0) { + if (user.id == message->id) { fg = TB_CYAN; } else { fg = TB_MAGENTA; } - // prefix is of format "HH:MM:SS [] ", so + + // prefix is of format "HH:MM:SS [] ", create it u8 pfx[AUTHOR_LEN - 1 + TIMESTAMP_LEN - 1 + 4 + 1] = {0}; u8 timestamp[TIMESTAMP_LEN]; formatTimestamp(timestamp, message->timestamp); - sprintf((char*)pfx, "%s [%s] ", timestamp, message->author); - // TODO: y_limit + sprintf((char*)pfx, "%s [%s] ", timestamp, client->author); + msg_y += tb_printf_wrap(0, msg_y, TB_WHITE, 0, (u32*)&message->text, message->len, fg, 0, pfx, global.width, free_y - msg_y); u32 message_size = TEXTMESSAGE_SIZE + message->len * sizeof(*message->text); addr += message_size; - break; - } + } break; case HEADER_TYPE_PRESENCE: { PresenceMessage* message = (PresenceMessage*)addr; - tb_printf(0, msg_y, 0, 0, " [%s] *%s*", message->author, presenceTypeString(message->type)); + tb_printf(0, msg_y, 0, 0, " [%s] *%s*", client->author, presenceTypeString(message->type)); msg_y++; addr += sizeof(*message); - break; - } + } break; case HEADER_TYPE_HISTORY: { HistoryMessage* message = (HistoryMessage*)addr; addr += sizeof(*message); // TODO: implement - } - default: { + } break; + default: tb_printf(0, msg_y, 0, 0, "%s", headerTypeString(header->type)); msg_y++; break; } - } } draw_prompt: @@ -340,7 +455,7 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len) } } - if (fds[FDS_SERVER].fd == -1) { + if (fds[FDS_UNI].fd == -1 || fds[FDS_BI].fd == -1) { // show error popup popup(TB_RED, TB_BLACK, "Server disconnected."); } @@ -350,77 +465,128 @@ screen_home(Arena* msgsArena, u32 nmessages, u32 input[], u32 input_len) int main(int argc, char** argv) { - // Use first argument as username - if (argc > 1) { - u32 arg_len = strlen(argv[1]); - assert(arg_len <= AUTHOR_LEN - 1); - memcpy(username, argv[1], arg_len); - username[arg_len] = '\0'; + if (argc < 2) { + fprintf(stderr, "usage: chatty \n"); + return 1; } + u32 arg_len = strlen(argv[1]); + assert(arg_len <= AUTHOR_LEN - 1); + memcpy(user.author, argv[1], arg_len); + user.author[arg_len] = '\0'; + s32 err = 0; // error code for functions - Arena* msgsArena = ArenaAlloc(Megabytes(64)); // Messages received & sent - u32 nmessages = 0; // Number of messages in msgsArena - s32 nrecv = 0; // number of bytes received - s32 nsend = 0; // number of bytes sent + u32 nmessages = 0; // Number of messages in msgsArena + s32 nrecv = 0; // number of bytes received u32 input[INPUT_LIMIT] = {0}; // input buffer u32 ninput = 0; // number of characters in input + Arena msgsArena; + Arena clientsArena; + ArenaAlloc(&msgsArena, Megabytes(64)); // Messages received & sent + ArenaAlloc(&clientsArena, Megabytes(1)); // Arena for storing clients + struct tb_event ev; // event fork keypress & resize u8 quit = 0; // boolean to indicate if we want to quit the main loop - u8* quitmsg = NULL; // this string will be printed before returning from main + u8* quitmsg = 0; // this string will be printed before returning from main pthread_t thr_rec; // thread for reconnecting to server when disconnected +#ifdef LOGGING + logfd = open(LOGFILE, O_RDWR | O_CREAT | O_TRUNC, 0600); + assert(logfd != -1); +#else + logfd = 2; // stderr +#endif + // poopoo C cannot infer type - fds = (struct pollfd[FDS_MAX]){ - {-1, POLLIN, 0}, // FDS_SERVER + struct pollfd fds[FDS_MAX] = { + {-1, POLLIN, 0}, // FDS_UNI + {-1, POLLIN, 0}, // FDS_BI {-1, POLLIN, 0}, // FDS_TTY {-1, POLLIN, 0}, // FDS_RESIZE }; - const struct sockaddr_in address = { + address = (struct sockaddr_in){ AF_INET, htons(PORT), {0}, {0}, }; - // Connecting to server - { - s32 serverfd; - serverfd = socket(AF_INET, SOCK_STREAM, 0); - assert(serverfd > 2); // greater than STDERR - - err = connect(serverfd, (struct sockaddr*)&address, sizeof(address)); - if (err != 0) { - perror("Server"); + ConnectionResult result = getConnection(&address); + if (result.err) { + perror("Server"); + return 1; + } + assert(result.unifd != -1); + assert(result.bifd != -1); + assert(!result.err); + fds[FDS_BI].fd = result.bifd; + fds[FDS_UNI].fd = result.unifd; + +#ifdef IMPORT_ID + // File for storing the user's ID. + u32 idfile = open(ID_FILE, O_RDWR | O_CREAT, 0600); + s32 nread = read(idfile, &user.id, sizeof(user.id)); + assert(nread != -1); + // see "Authentication" in chatty.h + if (nread == sizeof(user.id)) { + // Scenario 1: We know our id + + // Send IDMessage and check if it is correct + HeaderMessage header = HEADER_INIT(HEADER_TYPE_ID); + IDMessage message = {.id = user.id}; + sendAnyMessage(fds[FDS_BI].fd, &header, &message); + + ErrorMessage error_message = {0}; + recvAnyMessageType(fds[FDS_BI].fd, &header, &error_message, HEADER_TYPE_ERROR); + + switch (error_message.type) { + case ERROR_TYPE_SUCCESS: break; + case ERROR_TYPE_NOTFOUND: + printf("Server does not know our ID. Consider removing '" ID_FILE "'\n"); + return 1; + default: + printf("Server: %s\n", errorTypeString(error_message.type)); return 1; } - fds[FDS_SERVER].fd = serverfd; - - // Introduce ourselves - HeaderMessage header = HEADER_PRESENCEMESSAGE; - PresenceMessage message = {.type = PRESENCE_TYPE_CONNECTED}; - memcpy(message.author, username, AUTHOR_LEN); - nsend = send(serverfd, &header, sizeof(header), 0); - assert(nsend != -1); - assert(nsend == sizeof(header)); - nsend = send(serverfd, &message, sizeof(message), 0); - assert(nsend != -1); - assert(nsend == sizeof(message)); + } else { +#else + if (1) { +#endif + // Scenario 2: We do not have an ID + HeaderMessage header = HEADER_INIT(HEADER_TYPE_INTRODUCTION); + IntroductionMessage message = {0}; + // copy user data into message + memcpy(message.author, user.author, AUTHOR_LEN); + + // Send the introduction message + sendAnyMessage(fds[FDS_BI].fd, &header, &message); + + IDMessage id_message = {0}; + // Receive the response IDMessage + recvAnyMessageType(fds[FDS_BI].fd, &header, &id_message, HEADER_TYPE_ID); + assert(header.type == HEADER_TYPE_ID); + user.id = id_message.id; +#ifdef IMPORT_ID + // Save permanently + write(idfile, &user.id, sizeof(user.id)); + close(idfile); +#endif } + loggingf("Got ID: %lu\n", user.id); // for wide character printing - assert(setlocale(LC_ALL, "") != NULL); + assert(setlocale(LC_ALL, "") != 0); // init tb_init(); tb_get_fds(&fds[FDS_TTY].fd, &fds[FDS_RESIZE].fd); - screen_home(msgsArena, nmessages, input, ninput); + screen_home(&msgsArena, nmessages, &clientsArena, fds, input, ninput); tb_present(); // main loop @@ -431,43 +597,45 @@ main(int argc, char** argv) tb_clear(); - if (fds[FDS_SERVER].revents & POLLIN) { + if (fds[FDS_UNI].revents & POLLIN) { // got data from server HeaderMessage header; - nrecv = recv(fds[FDS_SERVER].fd, &header, sizeof(header), 0); + nrecv = recv(fds[FDS_UNI].fd, &header, sizeof(header), 0); assert(nrecv != -1); // Server disconnects if (nrecv == 0) { // close diconnected server's socket - err = close(fds[FDS_SERVER].fd); + err = close(fds[FDS_UNI].fd); assert(err == 0); - fds[FDS_SERVER].fd = -1; // ignore + fds[FDS_UNI].fd = -1; // ignore // start trying to reconnect in a thread - err = pthread_create(&thr_rec, NULL, &thread_reconnect, (void*)&address); + err = pthread_create(&thr_rec, 0, &threadReconnect, (void*)fds); assert(err == 0); } else { - // TODO: validate version - // if (header.version == PROTOCOL_VERSION) - // continue; + if (header.version != PROTOCOL_VERSION) { + loggingf("Header received does not match version\n"); + continue; + } - void* addr = ArenaPush(msgsArena, sizeof(header)); + void* addr = ArenaPush(&msgsArena, sizeof(header)); memcpy(addr, &header, sizeof(header)); + // Messages handled from server switch (header.type) { case HEADER_TYPE_TEXT: - recvTextMessage(msgsArena, fds[FDS_SERVER].fd, NULL); + recvTextMessage(&msgsArena, fds[FDS_UNI].fd); nmessages++; break; case HEADER_TYPE_PRESENCE:; - PresenceMessage* message = ArenaPush(msgsArena, sizeof(*message)); - nrecv = recv(fds[FDS_SERVER].fd, message, sizeof(*message), 0); + PresenceMessage* message = ArenaPush(&msgsArena, sizeof(*message)); + nrecv = recv(fds[FDS_UNI].fd, message, sizeof(*message), 0); assert(nrecv != -1); assert(nrecv == sizeof(*message)); nmessages++; break; default: - // TODO: log + loggingf("Got unhandled message: %s\n", headerTypeString(header.type)); break; } } @@ -512,7 +680,7 @@ main(int argc, char** argv) if (ninput == 0) // do not send empty message break; - if (fds[FDS_SERVER].fd == -1) + if (fds[FDS_UNI].fd == -1) // do not send message to disconnected server break; @@ -521,25 +689,21 @@ main(int argc, char** argv) ninput++; // Save header - HeaderMessage header = HEADER_TEXTMESSAGE; - void* addr = ArenaPush(msgsArena, sizeof(header)); + HeaderMessage header = HEADER_INIT(HEADER_TYPE_TEXT); + void* addr = ArenaPush(&msgsArena, sizeof(header)); memcpy(addr, &header, sizeof(header)); // Save message - TextMessage* sendmsg = ArenaPush(msgsArena, TEXTMESSAGE_SIZE); - memcpy(sendmsg->author, username, AUTHOR_LEN); - sendmsg->timestamp = time(NULL); + TextMessage* sendmsg = ArenaPush(&msgsArena, TEXTMESSAGE_SIZE); + sendmsg->id = user.id; + sendmsg->timestamp = time(0); sendmsg->len = ninput; u32 text_size = ninput * sizeof(*input); - ArenaPush(msgsArena, text_size); + ArenaPush(&msgsArena, text_size); memcpy(&sendmsg->text, input, text_size); - // Send message - nsend = send(fds[FDS_SERVER].fd, &header, sizeof(header), 0); - assert(nsend != -1); - nsend = send(fds[FDS_SERVER].fd, sendmsg, TEXTMESSAGE_SIZE + TEXTMESSAGE_TEXT_SIZE((*sendmsg)), 0); - assert(nsend != -1); + sendAnyMessage(fds[FDS_UNI].fd, &header, sendmsg); nmessages++; // also clear input @@ -550,7 +714,8 @@ main(int argc, char** argv) default: if (ev.ch == 0) break; - // TODO: logging + + // TODO: show error if (ninput == INPUT_LIMIT - 1) // last byte reserved for \0 break; @@ -568,17 +733,15 @@ main(int argc, char** argv) tb_poll_event(&ev); } - screen_home(msgsArena, nmessages, input, ninput); + screen_home(&msgsArena, nmessages, &clientsArena, fds, input, ninput); tb_present(); } tb_shutdown(); - if (quitmsg != NULL) + if (quitmsg != 0) printf("%s\n", quitmsg); - ArenaRelease(msgsArena); - return 0; } -- cgit v1.2.3