1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
#include "arena.h"
#include "common.h"
#include <assert.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdarg.h>
#include <sys/socket.h>
// timeout on polling
#define TIMEOUT 60 * 1000
// max pending connections
#define PENDING_MAX 16
// the size of pollfd element in the fdsArena
// note: clientsArena and pollfd_size must have been initialisezd
#define FDS_SIZE fdsArena->pos / pollfd_size
// enum for indexing the fds array
enum { FDS_STDIN = 0,
FDS_SERVER,
FDS_CLIENTS };
int main(void)
{
u32 err, serverfd, clientfd;
u16 nclient = 0;
u32 on = 1;
// Start listening on the socket
{
serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
assert(serverfd > 2);
err = setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, (u8 *)&on, sizeof(on));
assert(err == 0);
const struct sockaddr_in address = {
AF_INET,
htons(PORT),
{0},
};
err = bind(serverfd, (const struct sockaddr *)&address, sizeof(address));
assert(err == 0);
err = listen(serverfd, PENDING_MAX);
assert(err == 0);
}
Arena *msgTextArena = ArenaAlloc(); // allocating text in messages that have a dynamic sized
Message mrecv = {0}; // message used for receiving messages from clients
u32 nrecv = 0; // number of bytes received
u32 recv_len; // Number of bytes of the message received over stream
u32 nsend = 0; // number of bytes sent
Arena *bufArena = ArenaAlloc(); // data in buf
u8 *buf = ArenaPush(bufArena, STREAM_LIMIT); // temporary buffer for receiving and sending data
Message *mbuf = (Message *)buf; // pointer for indexing buf as a message
Arena *fdsArena = ArenaAlloc(); // arena for fds to accomodate multiple clients
struct pollfd *fds = fdsArena->memory; // helper for indexing memory
struct pollfd c = {0, POLLIN, 0}; // helper client structure fore reusing
struct pollfd *fdsAddr; // used for copying clients
const u64 pollfd_size = sizeof(struct pollfd);
// initialize fds structure
// add stdin (c.fd == 0)
fdsAddr = ArenaPush(fdsArena, pollfd_size);
memcpy(fdsAddr, &c, pollfd_size);
// add serverfd
c.fd = serverfd;
fdsAddr = ArenaPush(fdsArena, pollfd_size);
memcpy(fdsAddr, &c, pollfd_size);
while (1) {
err = poll(fds, FDS_SIZE, TIMEOUT);
assert(err != -1);
if (fds[FDS_STDIN].revents & POLLIN) {
// helps for testing and exiting gracefully
break;
} else if (fds[FDS_SERVER].revents & POLLIN) {
clientfd = accept(serverfd, NULL, NULL);
assert(clientfd != -1);
assert(clientfd > serverfd);
// fill up a hole
u8 found = 0;
for (u32 i = FDS_CLIENTS; i < FDS_SIZE; i++) {
if (fds[i].fd == -1) {
fds[i].fd = clientfd;
// note we do not have to reset .revents because poll will set it to 0 next time
found = 1;
break;
}
}
// allocate an extra client because there was no empty spot in the fds array
if (!found) {
// add client to arena
fdsAddr = ArenaPush(fdsArena, pollfd_size);
c.fd = clientfd;
memcpy(fdsAddr, &c, pollfd_size);
}
nclient++;
fprintf(stdout, "connected(%d).\n", clientfd - serverfd);
}
for (u32 i = FDS_CLIENTS; i < (FDS_SIZE); i++) {
if (!(fds[i].revents & POLLIN))
continue;
if (fds[i].fd == -1)
continue;
nrecv = recv(fds[i].fd, buf, bufArena->pos, 0);
assert(nrecv != -1);
if (nrecv == 0) {
fprintf(stdout, "disconnected(%d). \n", fds[i].fd - serverfd);
shutdown(fds[i].fd, SHUT_RDWR);
close(fds[i].fd); // send close to client
fds[i].fd = -1; // ignore in the future
continue;
}
recv_len = sizeof(*mbuf) - sizeof(mbuf->text) + mbuf->text_len * sizeof(*mbuf->text);
if (recv_len > nrecv) {
// allocate needed space for buf
if (recv_len > bufArena->pos)
ArenaPush(bufArena, recv_len - bufArena->pos);
// receive remaining bytes
u32 nr = recv(fds[i].fd, buf + nrecv, recv_len - nrecv, 0);
assert(nr != -1);
nrecv += nr;
assert(nrecv == recv_len);
}
// TODO: Do not print the message in the logs
fprintf(stdout, "message(%d): %d bytes.\n", fds[i].fd - serverfd, nrecv);
for (u32 j = FDS_CLIENTS; j < (FDS_SIZE); j++) {
if (j == i)
continue;
if (fds[j].fd == -1)
continue;
nsend = send(fds[j].fd, buf, nrecv, 0);
assert(nsend != 1);
assert(nsend == nrecv);
fprintf(stdout, "retransmitted(%d->%d).\n", fds[i].fd - serverfd, fds[j].fd - serverfd);
}
ArenaPop(msgTextArena, mrecv.text_len);
}
}
ArenaRelease(fdsArena);
ArenaRelease(msgTextArena);
return 0;
}
|