aboutsummaryrefslogtreecommitdiff
path: root/v2/server.c
blob: c86964e931911cdc4f93eabccaa10c6f460e2926 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#include "arena.h"
#include "common.h"

#include <assert.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdarg.h>
#include <sys/socket.h>

// timeout on polling
#define TIMEOUT 60 * 1000
// max pending connections
#define PENDING_MAX 16

// the size of pollfd element in the fdsArena
// note: clientsArena and pollfd_size must have been initialisezd
#define FDS_SIZE fdsArena->pos / pollfd_size

// enum for indexing the fds array
enum { FDS_STDIN = 0,
       FDS_SERVER,
       FDS_CLIENTS };

int main(void)
{
    u32 err, serverfd, clientfd;
    u16 nclient = 0;
    u32 on = 1;

    // Start listening on the socket
    {
        serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
        assert(serverfd > 2);

        err = setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, (u8 *)&on, sizeof(on));
        assert(err == 0);

        const struct sockaddr_in address = {
            AF_INET,
            htons(PORT),
            {0},
        };

        err = bind(serverfd, (const struct sockaddr *)&address, sizeof(address));
        assert(err == 0);

        err = listen(serverfd, PENDING_MAX);
        assert(err == 0);
    }

    Arena *msgTextArena = ArenaAlloc(); // allocating text in messages that have a dynamic sized
    Message mrecv = {0};                // message used for receiving messages from clients
    u32 nrecv = 0;                      // number of bytes received
    u32 nsend = 0;                      // number of bytes sent
    u8 buf[STREAM_BUF] = {0};           // temporary buffer for received data, NOTE: this buffer
                                        // is also use for retransmitting received messages to other
                                        // clients.

    Arena *fdsArena = ArenaAlloc();
    struct pollfd *fds = fdsArena->memory; // helper for indexing memory
    struct pollfd c = {0, POLLIN, 0};      // helper client structure fore reusing
    struct pollfd *fdsAddr;                // used for copying clients
    const u64 pollfd_size = sizeof(struct pollfd);

    // initialize fds structure
    // add stdin (c.fd == 0)
    fdsAddr = ArenaPush(fdsArena, pollfd_size);
    memcpy(fdsAddr, &c, pollfd_size);
    // add serverfd
    c.fd = serverfd;
    fdsAddr = ArenaPush(fdsArena, pollfd_size);
    memcpy(fdsAddr, &c, pollfd_size);

    while (1) {
        err = poll(fds, FDS_SIZE, TIMEOUT);
        assert(err != -1);

        if (fds[FDS_STDIN].revents & POLLIN) {
            // helps for testing and exiting gracefully
            break;
        } else if (fds[FDS_SERVER].revents & POLLIN) {
            clientfd = accept(serverfd, NULL, NULL);
            assert(clientfd != -1);
            assert(clientfd > serverfd);

            // fill up a hole
            u8 found = 0;
            for (u32 i = FDS_CLIENTS; i < FDS_SIZE; i++) {
                if (fds[i].fd == -1) {
                    fds[i].fd = clientfd;
                    // note we do not have to reset .revents because poll will set it to 0 next time
                    found = 1;
                    break;
                }
            }

            // allocate an extra client because there was no empty spot in the fds array
            if (!found) {
                // add client to arena
                fdsAddr = ArenaPush(fdsArena, pollfd_size);
                c.fd = clientfd;
                memcpy(fdsAddr, &c, pollfd_size);
            }

            nclient++;
            fprintf(stdout, "connected(%d).\n", clientfd - serverfd);
        }

        for (u32 i = FDS_CLIENTS; i < (FDS_SIZE); i++) {
            if (!(fds[i].revents & POLLIN))
                continue;
            if (fds[i].fd == -1)
                continue;

            nrecv = recv(fds[i].fd, buf, STREAM_LIMIT, 0);
            assert(nrecv >= 0);

            if (nrecv == 0) {
                fprintf(stdout, "disconnected(%d). \n", fds[i].fd - serverfd);
                shutdown(fds[i].fd, SHUT_RDWR);
                close(fds[i].fd); // send close to client
                fds[i].fd = -1;   // ignore in the future
                continue;
            }

            // TODO: Do not print the message in the logs
            fprintf(stdout, "message(%d): %d bytes.\n", fds[i].fd - serverfd, nrecv); 

            for (u32 j = FDS_CLIENTS; j < (FDS_SIZE); j++) {
                if (j == i)
                    continue;
                if (fds[j].fd == -1)
                    continue;

                nsend = send(fds[j].fd, buf, nrecv, 0);
                assert(nsend != 1);
                assert(nsend == nrecv);
                fprintf(stdout, "retransmitted(%d->%d).\n", fds[i].fd - serverfd, fds[j].fd - serverfd);
            }

            ArenaPop(msgTextArena, mrecv.text_len);
        }
    }

    ArenaRelease(fdsArena);
    ArenaRelease(msgTextArena);

    return 0;
}