summaryrefslogtreecommitdiffstats
path: root/docs/developer/design/multicast.rst
blob: 89422fe60311686252b09011efef7f370dabd128 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
Detailed Design
===============

Protocol Design
---------------

1. All Protocol headers are 1 byte long or align to 4 bytes.
2. Packet size should not exceed above 1500(MTU) bytes including UDP/IP header and should
be align to 4 bytes. In future, MTU can be modified larger than 1500(Jumbo Frame) through
cmd line option to enlarge the data throughput.

/* Packet header definition (align to 4 bytes) */
struct packet_ctl {
    uint32_t seq; // packet seq number start from 0, unique in server life cycle.
    uint32_t crc; // checksum
    uint32_t data_size; // payload length
    uint8_t data[0];
};

/* Buffer info definition (align to 4 bytes) */
struct buffer_ctl {
    uint32_t buffer_id; // buffer seq number start from 0, unique in server life cycle.
    uint32_t buffer_size; // payload total length of a buffer
    uint32_t packet_id_base; // seq number of the first packet in this buffer.
    uint32_t pkt_count; // number of packet in this buffer, 0 means EOF.
};


3. 1-byte-long header definition

Signals such as the four below are 1 byte long, to simplify the receive process(since it
cannot be spitted ).

#define CLIENT_READY 0x1
#define CLIENT_REQ 0x2
#define CLIENT_DONE 0x4
#define SERVER_SENT 0x8

Note: Please see the collaboration diagram for their meanings.

4. Retransmission Request Header

/* Retransmition Request Header (align to 4 bytes) */
struct request_ctl {
    uint32_t req_count; // How many seqs below.
    uint32_t seqs[0]; // packet seqs.
};

5. Buffer operations

void buffer_init(); // Init the buffer_ctl structure and all(say 1024) packet_ctl
structures. Allocate buffer memory.
long buffer_fill(int fd); // fill a buffer from fd, such as stdin
long buffer_flush(int fd); // flush a buffer to fd, say stdout
struct packet_ctl *packet_put(struct packet_ctl *new_pkt);// put a packet to a buffer
and return a free memory slot for the next packet.
struct packet_ctl *packet_get(uint32_t seq);// get a packet data in buffer by
indicating the packet seq.


How to sync between server threads
----------------------------------

If children's aaa() operation need to wait the parents's init() to be done, then do it
literally like this:

   UDP Server
   TCP Server1 = spawn( )----> TCP Server1
    init()
                               TCP Server2 = spawn( )-----> TCP Server2
    V(sem)----------------------> P(sem)                // No child any more
                                  V(sem)---------------------> P(sem)
                                  aaa()           // No need to V(sem), for no child
                                                               aaa()

If parent's send() operation need to wait the children's ready() done, then do it
literally too, but is a reverse way:

   UDP Server                  TCP Server1                  TCP Server2
                                                       // No child any more
                                 ready()                       ready()
                                 P(sem) <--------------------- V(sem)
    P(sem) <------------------   V(sem)
    send()

Note that the aaa() and ready() operations above run in parallel. If this is not the
case due to race condition, the sequence above can be modified into this below:

   UDP Server                  TCP Server1                  TCP Server2
                                                       // No child any more
                                                               ready()
                                 P(sem) <--------------------- V(sem)
                                 ready()
    P(sem) <-------------------  V(sem)
    send()


In order to implement such chained/zipper sync pattern, a pair of semaphores is
needed between the parent and the child. One is used by child to wait parent , the
other is used by parent to wait child. semaphore pair can be allocated by parent
and pass the pointer to the child over spawn() operation such as pthread_create().

/* semaphore pair definition */
struct semaphores {
    sem_t wait_parent;
    sem_t wait_child;
};

Then the semaphore pair can be recorded by threads by using the semlink struct below:
struct semlink {
    struct semaphores *this; /* used by parent to point to the struct semaphores
                                which it created during spawn child. */
    struct semaphores *parent; /* used by child to point to the struct
                                  semaphores which it created by parent */
};

chained/zipper sync API:

void sl_wait_child(struct semlink *sl);
void sl_release_child(struct semlink *sl);
void sl_wait_parent(struct semlink *sl);
void sl_release_parent(struct semlink *sl);

API usage is like this.

Thread1(root parent)          Thread2(child)               Thread3(grandchild)
sl_wait_parent(noop op)
sl_release_child
                +---------->sl_wait_parent
                            sl_release_child
                                           +-----------> sl_wait_parent
                                                         sl_release_child(noop op)
                                                         ...
                                                         sl_wait_child(noop op)
                                                       + sl_release_parent
                            sl_wait_child <-------------
                          + sl_release_parent
sl_wait_child <------------
sl_release_parent(noop op)

API implementation:

void sl_wait_child(struct semlink *sl)
{
    if (sl->this) {
        P(sl->this->wait_child);
    }
}

void sl_release_child(struct semlink *sl)
{
    if (sl->this) {
        V(sl->this->wait_parent);
    }
}

void sl_wait_parent(struct semlink *sl)
{
    if (sl->parent) {
        P(sl->parent->wait_parent);
    }
}

void sl_release_parent(struct semlink *sl)
{
    if (sl->parent) {
        V(sl->parent->wait_child);
    }
}

Client flow chart
-----------------
See Collaboration Diagram

UDP thread flow chart
---------------------
See Collaboration Diagram

TCP thread flow chart
---------------------


S_INIT --- (UDP initialized) --->  S_ACCEPT --- (accept clients) --+
                                                                   |
  /----------------------------------------------------------------/
  V
S_PREP --- (UDP prepared abuffer)
  ^               |
  |               \--> S_SYNC --- (clients ClIENT_READY)
  |                                        |
  |                                        \--> S_SEND --- (clients CLIENT_DONE)
  |                                                                |
  |                                                                V
  \---------------(bufferctl.pkt_count != 0)-----------------------+
                                                                   |
                                                                   V
                                             exit() <--- (bufferctl.pkt_count == 0)


TCP using poll and message queue
--------------------------------

TCP uses poll() to sync with client's events as well as output event from itself, so
that we can use non-block socket operations to reduce the latency. POLLIN means there
are message from client and POLLOUT means we are ready to send message/retransmission
packets to client.

poll main loop pseudo code:
void check_clients(struct server_status_data *sdata)
{
    poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout);

    /* check all connected clients */
    for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) {
        ds = &(sdata->ds[sdata->cindex]);
        if (!ds->revents) {
            continue;
        }

        if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) {
            handle_error_event(sdata);
        } else if (ds->revents & (POLLIN|POLLPRI)) {
            handle_pullin_event(sdata);  // may set POLLOUT into ds->events
                                         // to trigger handle_pullout_event().
        } else if (ds->revents & POLLOUT) {
            handle_pullout_event(sdata);
        }
    }
}

For TCP, since the message from client may not complete and send data may be also
interrupted due to non-block fashion, there should be one send message queue and a
receive message queue on the server side for each client (client do not use non-block
operations).

TCP message queue definition:

struct tcpq {
    struct qmsg *head, *tail;
    long count; /* message count in a queue */
    long size; /* Total data size of a queue */
};

TCP message queue item definition:

struct qmsg {
    struct qmsg *next;
    void *data;
    long size;
};

TCP message queue API:

// Allocate and init a queue.
struct tcpq * tcpq_queue_init(void);

// Free a queue.
void tcpq_queue_free(struct tcpq *q);

// Return queue length.
long tcpq_queue_dsize(struct tcpq *q);

// queue new message to tail.
void tcpq_queue_tail(struct tcpq *q, void *data, long size);

// queue message that cannot be sent currently back to queue head.
void tcpq_queue_head(struct tcpq *q, void *data, long size);

// get one piece from queue head.
void * tcpq_dequeue_head(struct tcpq *q, long *size);

// Serialize all pieces of a queue, and move it out of queue, to ease the further
//operation on it.
void * tcpq_dqueue_flat(struct tcpq *q, long *size);

// Serialize all pieces of a queue, do not move it out of queue, to ease the further
//operation on it.
void * tcpq_queue_flat_peek(struct tcpq *q, long *size);