From 595e16fceee8dc1eee6a267a34067c446934bc5b Mon Sep 17 00:00:00 2001 From: Zhijiang Hu Date: Thu, 6 Apr 2017 23:21:42 -0400 Subject: jasmine(Just A Simple Multicast engINE) Initial merge Change-Id: I7a543019c8d92314ef549bf72369b7276f39577d Signed-off-by: Zhijiang Hu --- code/jasmine/server-tcp.c | 550 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 550 insertions(+) create mode 100755 code/jasmine/server-tcp.c (limited to 'code/jasmine/server-tcp.c') diff --git a/code/jasmine/server-tcp.c b/code/jasmine/server-tcp.c new file mode 100755 index 00000000..f5c83b96 --- /dev/null +++ b/code/jasmine/server-tcp.c @@ -0,0 +1,550 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "buffer.h" +#include "tcp-common.h" +#include "udp-common.h" +#include "tcp-queue.h" +#include "misc.h" +#include "server.h" + +#define MAX_CLIENTS 128 /* Clients per TCP server */ +#define TCP_BUFF_SIZE 65536 + +struct cdata { + struct tcpq *tx; + struct tcpq *rx; + int await; /* non-zero if we are in the middle of receiving clients requests */ + struct sockaddr_in peer; +}; + +struct server_status_data { + int state; /* state machine */ + int sync_count; + int ccount; + struct semlink sl; + pthread_t pchild; + + struct pollfd ds[MAX_CLIENTS + 1]; + struct cdata cd[MAX_CLIENTS + 1]; + int cindex; + int last_buff_pkt_count; +}; + +void init_sdata(struct server_status_data *sdata, void *thread_args) +{ + int i; + + sdata->sl.parent = (struct semaphores *)thread_args; + sdata->sl.this = NULL; + + sdata->state = S_PREP; + sdata->last_buff_pkt_count = 1; + sdata->sync_count = 0; + + for (i = 0; i < MAX_CLIENTS + 1; i++) { + sdata->ds[i].fd = -1; + sdata->ds[i].events = POLLIN; + sdata->cd[i].tx = tcpq_queue_init(); + sdata->cd[i].rx = tcpq_queue_init(); + sdata->cd[i].await = 0; + } +} + +/* disconnect one client and compress table */ +void kill_client(struct server_status_data *sdata) +{ + struct pollfd *ds = sdata->ds; + struct cdata *cd = sdata->cd; + + close((ds + sdata->cindex)->fd); + tcpq_queue_free((cd + sdata->cindex)->rx); + tcpq_queue_free((cd + sdata->cindex)->tx); + + /* Move last slot to this free slot and release the last slot */ + *(ds + sdata->cindex) = *(ds + sdata->ccount - 1); + *(cd + sdata->cindex) = *(cd + sdata->ccount - 1); + sdata->cindex--; + sdata->ccount--; + client_count--; +} + +void send_buffctl_to_all_clients(struct server_status_data *sdata) +{ + int i; + long send_sz; + void *cpy; + + /* copy header to all queues */ + send_sz = sizeof(struct buffer_ctl); + for (i = 1; i < sdata->ccount; i++) { + cpy = wrapper_malloc(send_sz); + memcpy(cpy, &buffctl, send_sz); + tcpq_queue_tail(sdata->cd[i].tx, cpy, send_sz); + sdata->ds[i].events |= POLLOUT; + } +} + +void send_sent_to_all_clients(struct server_status_data *sdata) +{ + int i; + void *cpy; + uint8_t byte = SERVER_SENT; + + /* Push data to clients */ + for (i = 1; i < sdata->ccount; i++) { + cpy = wrapper_malloc(1); + memcpy(cpy, &byte, 1); + tcpq_queue_tail(sdata->cd[i].tx, cpy, 1); + sdata->ds[i].events |= POLLOUT; + } +} +void accept_clients(struct server_status_data *sdata) +{ + int res; + int new_fd; + socklen_t socklen; + int poll_events; + int timeout = -1; + struct pollfd *ds = sdata->ds; + struct cdata *cd = sdata->cd; + + ds->events = POLLIN; + ds->revents = 0; + sdata->ccount = 1; + + while (sdata->ccount <= MAX_CLIENTS && client_count < wait_count) { + poll_events = poll(ds, 1, timeout); + if (poll_events < 0) { + crit("poll() failed"); + } else if (poll_events == 0) { + log(2, "poll() returned with no results!"); + continue; + } + + log(9, "poll: %d events", poll_events); + + if (ds->revents) { + + /* new connections come in */ + if (ds->revents & (POLLIN|POLLPRI)) { + + do { + socklen = sizeof((cd + sdata->ccount)->peer); +retry_accept: + new_fd = accept(ds->fd, + (struct sockaddr *)&((cd + sdata->ccount)->peer), + &socklen); + if (new_fd == -1 && errno == EINTR) { + goto retry_accept; + } + + if (new_fd == -1) { + log(2, "accept() returned with error %d", errno); + break; + } + + /* Send group info, before set non block */ + res = write(new_fd, &mcinfo, sizeof(struct mc_info)); + if (res < 0) { + log(3, "Error opening connection: %s", + inet_ntoa((cd + sdata->ccount)->peer.sin_addr)); + close(new_fd); + continue; + } + + /* Can set to non block becasue we will poll it in future. */ + res = fcntl(new_fd, F_SETFL, O_NONBLOCK); + if (res == -1) { + log(2, "fcntl() returned with error %d", errno); + close(new_fd); + continue; + } + + (ds + sdata->ccount)->fd = new_fd; + + log(5, "New connection %d: %s", + sdata->ccount, inet_ntoa((cd + sdata->ccount)->peer.sin_addr)); + sdata->ccount++; + client_count++; + } while (sdata->ccount <= MAX_CLIENTS); + + } else { + log(2, "Error on tcp socket"); + } + + poll_events--; + ds->revents = 0; + } + } + + ds->events = 0; +} + +void accept_clients_may_spawn(struct server_status_data *sdata) +{ + /* Wait the parent to let us accpet clients */ + sl_wait_parent(&(sdata->sl)); + + sdata->ds[0].fd = sfd; + accept_clients(sdata); + if (client_count < wait_count) { + sdata->sl.this = spawn_thread(&sdata->pchild, tcp_server_main, 1); + + /* Let child TCP servers accept connections */ + sl_release_child(&(sdata->sl)); + /* Wait until all childern TCP servers got all their clients */ + sl_wait_child(&(sdata->sl)); + } + + /* Tell parent all client are accepted/connected */ + sl_release_parent(&(sdata->sl)); +} + +void keep_on_receiving_client_request(struct server_status_data *sdata) +{ + char buf[TCP_BUFF_SIZE]; + void *cpy; + struct request_ctl *req; + uint32_t *rqb; + struct packet_ctl *ans; + long rq_index; + long total_sz; + + long read_out; + struct pollfd *ds; + struct cdata *cd; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + read_out = read(ds->fd, buf, cd->await); + if (read_out <= 0) { + log(5, "Client disconnected (r): %s", + inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; + } + + log(7, "New data (%ld + %ld) on conn %d: %s", + read_out, tcpq_queue_dsize(cd->rx), sdata->cindex, + inet_ntoa(cd->peer.sin_addr)); + cpy = wrapper_malloc(read_out); + memcpy(cpy, buf, read_out); + tcpq_queue_tail(cd->rx, cpy, read_out); + cd->await -= read_out; + + /* Full header received */ + if (tcpq_queue_dsize(cd->rx) == sizeof(struct request_ctl)) { + cpy = tcpq_queue_flat_peek(cd->rx, &read_out); + req = (struct request_ctl *)cpy; + cd->await = req->req_count * sizeof(uint32_t); + log(6, "Client request for %u packets on %d: %s", + req->req_count, sdata->cindex, inet_ntoa(cd->peer.sin_addr)); + /* req->rqc may be zero? So do not return from here */ + } + + /* Whole request(struct request_ctl + all rq blocks) received */ + if (cd->await == 0) { + cpy = tcpq_dqueue_flat(cd->rx, &read_out); + req = (struct request_ctl *)cpy; + rqb = (uint32_t *) (cpy + sizeof(struct request_ctl)); + for (rq_index = 0; rq_index < req->req_count; rq_index++) { + ans = packet_get(*(rqb + rq_index)); + total_sz = ans->data_size + sizeof(struct packet_ctl); + log(6, "Send packet %u (%u bytes) on %d", + rqb[rq_index], ans->data_size, sdata->cindex); + cpy = wrapper_malloc(total_sz); + memcpy(cpy, ans, total_sz); + tcpq_queue_tail(cd->tx, cpy, total_sz); + } + + if (rq_index > 0) { + /* Data need to be sent out */ + ds->events |= POLLOUT; + } + } +} + +void handle_error_event(struct server_status_data *sdata) +{ + struct cdata *cd; + + cd = &(sdata->cd[sdata->cindex]); + log(5, "Closing connection %d: %s", + sdata->cindex, inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); +} + +void handle_client_ready(struct server_status_data *sdata) +{ + if (sdata->state == S_SYNC) { + sdata->sync_count++; + + log(7, "Client SYNC %d of %d", + sdata->sync_count, sdata->ccount - 1); + + if (sdata->sync_count == sdata->ccount - 1) { + /* All client SYNC done */ + sdata->state = S_SEND; + + /* Wait child ready */ + sl_wait_child(&(sdata->sl)); + /* Tell parent I am ready, release parent */ + sl_release_parent(&(sdata->sl)); + + /* Wait parent to send mcast */ + sl_wait_parent(&(sdata->sl)); + /* tell child mcast done, release child */ + sl_release_child(&(sdata->sl)); + + send_sent_to_all_clients(sdata); + } + } +} + +void handle_client_done(struct server_status_data *sdata) +{ + if (sdata->state == S_SEND) { + log(7, "Client START %d of %d", + sdata->sync_count, sdata->ccount - 1); + + sdata->sync_count--; + if (sdata->sync_count == 0) { + /* Got all client START */ + sdata->state = S_PREP; + log(7, "All received, now tell UDP Server prepare next buffer"); + /* Remeber old buffctl.pkt_count before it change */ + sdata->last_buff_pkt_count = buffctl.pkt_count; + + /* Wait child clients done */ + sl_wait_child(&(sdata->sl)); + /* Tell parent all our clients are done, release parent */ + sl_release_parent(&(sdata->sl)); + } + } +} + +void handle_client_events(struct server_status_data *sdata) +{ + struct pollfd *ds; + struct cdata *cd; + int read_out; + uint8_t msgtype; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + /* read message type */ + read_out = read(ds->fd, &msgtype, 1); + if (read_out <= 0) { + log(5, "Client disconnected due to read error %d, ret:%d (r): %s", + errno, read_out, inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; /* continue to check other clients */ + } + + switch (msgtype) { + case CLIENT_READY: + handle_client_ready(sdata); + break; + case CLIENT_DONE: + handle_client_done(sdata); + break; + case CLIENT_REQ: + /* wait a whole struct request_ctl */ + cd->await = sizeof(struct request_ctl); + break; + default: + log(4, "Wrong message type from %s", + inet_ntoa(cd->peer.sin_addr)); + break; + } +} + +void handle_pullin_event(struct server_status_data *sdata) +{ + struct cdata *cd; + + cd = &(sdata->cd[sdata->cindex]); + + /* Await is set only for repeat request */ + if (cd->await) { + keep_on_receiving_client_request(sdata); + } else { + handle_client_events(sdata); + } +} + +void handle_pullout_event(struct server_status_data *sdata) +{ + char buf[TCP_BUFF_SIZE]; + struct pollfd *ds; + struct cdata *cd; + long transmit_sz; + void *cpy; + long written_in; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + if (cd->tx->count == 0) { + ds->events = POLLIN; /* Just make sure */ + return; + } + + log(7, "handle_pullout_event servs No.%d conn", sdata->cindex); + + /* If there is some data to be sent, try to do it now */ + cpy = tcpq_dequeue_head(cd->tx, &transmit_sz); + memcpy(buf, cpy, transmit_sz); + free(cpy); + + written_in = write(ds->fd, buf, transmit_sz); + if (written_in <= 0) { + log(5, "Client disconnected (w): %s", + inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; + } + + if (written_in != transmit_sz) { + log(6, "Partial wrote: %ld out of %ld bytes sent", + written_in, transmit_sz); + cpy = wrapper_malloc(transmit_sz - written_in); + memcpy(cpy, buf + written_in, transmit_sz - written_in); + tcpq_queue_head(cd->tx, cpy, transmit_sz - written_in); + } else { + log(7, "Sent %ld bytes to %s", + written_in, inet_ntoa(cd->peer.sin_addr)); + if (cd->tx->count == 0) { + /* Do not listen pollout event anymore */ + ds->events = POLLIN; + } + } +} + +void check_clients(struct server_status_data *sdata) +{ + int poll_events; + struct pollfd *ds; + int timeout = -1; + + if (sdata->ccount == 1) { + log(5, "No more clients, start exiting. s,c,p:%d,%d,%d", + sdata->state, sdata->ccount, buffctl.pkt_count); + /* No client existed */ + switch(sdata->state) { + case S_SYNC: + /* Wait all children ready */ + sl_wait_child(&(sdata->sl)); + /* Tell parent I am ready, release parent */ + sl_release_parent(&(sdata->sl)); + + /* Wait parent to send mcast */ + sl_wait_parent(&(sdata->sl)); + /* Tell child mcast done, release child */ + sl_release_child(&(sdata->sl)); + case S_SEND: + sdata->state = S_PREP; + /* Remeber old buffctl.pkt_count before it change */ + sdata->last_buff_pkt_count = buffctl.pkt_count; + + /* Wait child clients done */ + sl_wait_child(&(sdata->sl)); + /* Tell parent all our clients are done */ + sl_release_parent(&(sdata->sl)); + } + /* return to main loop to finish the dummy run */ + return; + } + + /* poll clients only */ + poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout); + if (poll_events < 0) { + crit("poll() failed"); + } else if (poll_events == 0) { + log(2, "poll() returned with no results!"); + return; + } + + log(9, "poll clients: %d events", poll_events); + + /* check all connected clients */ + for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) { + ds = &(sdata->ds[sdata->cindex]); + if (!ds->revents) { + continue; + } + + if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) { + handle_error_event(sdata); + } else if (ds->revents & (POLLIN|POLLPRI)) { + handle_pullin_event(sdata); + } else if (ds->revents & POLLOUT) { + handle_pullout_event(sdata); + } + } +} + +void * tcp_server_main(void *args) +{ + int i; + struct server_status_data ssdata; + struct server_status_data *sdata; + + sdata = &ssdata; + init_sdata(sdata, args); + + accept_clients_may_spawn(sdata); + + while (1) { + if (sdata->state == S_PREP) { + /* Wait UDP server to prepare the buffctl of this round to send */ + sl_wait_parent(&(sdata->sl)); + log(6, "After waiting UDP server preparation. s,c,p:%d,%d,%d", + sdata->state, sdata->ccount - 1, buffctl.pkt_count); + /* Tell children buffer pareparation done, release child */ + sl_release_child(&(sdata->sl)); + send_buffctl_to_all_clients(sdata); + sdata->state = S_SYNC; + } + + check_clients(sdata); + + if (sdata->state == S_PREP && sdata->last_buff_pkt_count == 0) { + log(5, "No more output, TCP server finished"); + if (sdata->sl.this && pthread_join(sdata->pchild, 0) < 0) { + crit("pthread_join()"); + } + + /* shutdown conn to clients */ + for (i = sdata->ccount - 1; i > 0; i--) { + shutdown((sdata->ds[i]).fd, 2); + } + + if (!sdata->sl.this) { + /* leaf server */ + close(sfd); + sfd = -1; + } + return 0; + } + } +} -- cgit 1.2.3-korg