From 595e16fceee8dc1eee6a267a34067c446934bc5b Mon Sep 17 00:00:00 2001 From: Zhijiang Hu Date: Thu, 6 Apr 2017 23:21:42 -0400 Subject: jasmine(Just A Simple Multicast engINE) Initial merge Change-Id: I7a543019c8d92314ef549bf72369b7276f39577d Signed-off-by: Zhijiang Hu --- code/jasmine/Makefile.am | 84 +++++ code/jasmine/README | 17 + code/jasmine/autogen.sh | 5 + code/jasmine/buffer.c | 152 +++++++++ code/jasmine/buffer.h | 64 ++++ code/jasmine/build-aux/git-version-gen | 170 ++++++++++ code/jasmine/client.c | 313 +++++++++++++++++++ code/jasmine/configure.ac | 65 ++++ code/jasmine/jasmine.spec.in | 33 ++ code/jasmine/misc.c | 73 +++++ code/jasmine/misc.h | 39 +++ code/jasmine/server-tcp.c | 550 +++++++++++++++++++++++++++++++++ code/jasmine/server-udp.c | 95 ++++++ code/jasmine/server.c | 123 ++++++++ code/jasmine/server.h | 63 ++++ code/jasmine/tcp-common.c | 60 ++++ code/jasmine/tcp-common.h | 18 ++ code/jasmine/tcp-queue.c | 178 +++++++++++ code/jasmine/tcp-queue.h | 35 +++ code/jasmine/udp-common.c | 97 ++++++ code/jasmine/udp-common.h | 42 +++ 21 files changed, 2276 insertions(+) create mode 100755 code/jasmine/Makefile.am create mode 100644 code/jasmine/README create mode 100755 code/jasmine/autogen.sh create mode 100755 code/jasmine/buffer.c create mode 100755 code/jasmine/buffer.h create mode 100755 code/jasmine/build-aux/git-version-gen create mode 100755 code/jasmine/client.c create mode 100755 code/jasmine/configure.ac create mode 100755 code/jasmine/jasmine.spec.in create mode 100755 code/jasmine/misc.c create mode 100755 code/jasmine/misc.h create mode 100755 code/jasmine/server-tcp.c create mode 100755 code/jasmine/server-udp.c create mode 100755 code/jasmine/server.c create mode 100755 code/jasmine/server.h create mode 100755 code/jasmine/tcp-common.c create mode 100755 code/jasmine/tcp-common.h create mode 100755 code/jasmine/tcp-queue.c create mode 100755 code/jasmine/tcp-queue.h create mode 100755 code/jasmine/udp-common.c create mode 100755 code/jasmine/udp-common.h diff --git a/code/jasmine/Makefile.am b/code/jasmine/Makefile.am new file mode 100755 index 00000000..dc95a03e --- /dev/null +++ b/code/jasmine/Makefile.am @@ -0,0 +1,84 @@ +# Copyright (c) 2015 ZTE, Inc. + +SPEC = $(PACKAGE_NAME).spec + +TARFILE = $(PACKAGE_NAME)-$(VERSION).tar.gz + +EXTRA_DIST = autogen.sh $(SPEC).in \ + build-aux/git-version-gen \ + .version + +AUTOMAKE_OPTIONS = foreign + +ACLOCAL_AMFLAGS = -I m4 + +MAINTAINERCLEANFILES = Makefile.in aclocal.m4 configure depcomp \ + config.guess config.sub missing install-sh \ + autoheader automake autoconf \ + autoscan.log configure.scan ltmain.sh test-driver config.h.in + +noinst_HEADERS = buffer.h misc.h server.h tcp-common.h tcp-queue.h udp-common.h + +dist-clean-local: + rm -f autoconf automake autoheader + +clean-generic: + rm -rf $(SPEC) $(TARFILE) + +## make rpm/srpm section. + +$(SPEC): $(SPEC).in + rm -f $@-t $@ + ver="$(VERSION)" && \ + sed \ + -e "s#@version@#$$ver#g" \ + $< > $@-t; \ + chmod a-w $@-t + mv $@-t $@ + +$(TARFILE): + $(MAKE) dist + +RPMBUILDOPTS = --define "_sourcedir $(abs_builddir)" \ + --define "_specdir $(abs_builddir)" \ + --define "_builddir $(abs_builddir)" \ + --define "_srcrpmdir $(abs_builddir)" \ + --define "_rpmdir $(abs_builddir)" + +srpm: clean + $(MAKE) $(SPEC) $(TARFILE) + rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) --nodeps -bs $(SPEC) + +rpm: clean _version + $(MAKE) $(SPEC) $(TARFILE) + rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) -ba $(SPEC) + +# release/versioning +BUILT_SOURCES = .version +.version: + echo $(VERSION) > $@-t && mv $@-t $@ + +dist-hook: + echo $(VERSION) > $(distdir)/.tarball-version + +.PHONY: _version + +_version: + cd $(srcdir) && rm -rf autom4te.cache .version && autoreconf -i + $(MAKE) $(AM_MAKEFLAGS) Makefile + +maintainer-clean-local: + rm -rf m4 + + +### + +bin_PROGRAMS = jasmines jasminec + +DEFAULT_INCLUDES = -I. -I/usr/include + +jasmines_SOURCES = udp-common.c tcp-common.c buffer.c misc.c server.c server-udp.c server-tcp.c tcp-queue.c +jasminec_SOURCES = udp-common.c tcp-common.c buffer.c misc.c client.c + +jasmines_LDADD = -lpthread +jasminec_LDADD = -lpthread diff --git a/code/jasmine/README b/code/jasmine/README new file mode 100644 index 00000000..221caea0 --- /dev/null +++ b/code/jasmine/README @@ -0,0 +1,17 @@ +jasmine: Just A Small Multicast engINE + +Installation +------------ + +./autogen.sh +./configure +make && make install + +Usage +----- +jasmine server: +Usage: jasmines local_ip num_of_clients [port] + +jasmine client: +Usage: jasminec [port] + diff --git a/code/jasmine/autogen.sh b/code/jasmine/autogen.sh new file mode 100755 index 00000000..5bf25ecc --- /dev/null +++ b/code/jasmine/autogen.sh @@ -0,0 +1,5 @@ +#!/bin/sh +# Run this to generate all the initial makefiles, etc. +mkdir -p m4 +echo Building configuration system... +autoreconf -i diff --git a/code/jasmine/buffer.c b/code/jasmine/buffer.c new file mode 100755 index 00000000..0901d302 --- /dev/null +++ b/code/jasmine/buffer.c @@ -0,0 +1,152 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include + +#include "buffer.h" +#include "misc.h" + +struct buffer_ctl buffctl; +struct packet_ctl *packetctl[PACKETS_PER_BUFFER]; +struct packet_ctl empty; + +void buffer_init() +{ + int i; + + for (i = 0; i < PACKETS_PER_BUFFER; i++) { + packetctl[i] = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE); + memset(packetctl[i], 0, PACKET_SIZE); + packetctl[i]->data_size = 0; + packetctl[i]->seq = 0; + } + + buffctl.buffer_id = 0; + buffctl.packet_id_base = 0; + buffctl.pkt_count = 0; + empty.data_size = 0; +} + +void packetctl_precheck() +{ + int i; + + for (i = 0; i < PACKETS_PER_BUFFER; i++) { + if (packetctl[i]->data_size != 0) { + crit("Error precheck packet slot %d,%d", i, packetctl[i]->data_size); + } + } +} + +// Calculate packet checksum +uint32_t packet_csum(uint8_t *data, int len) +{ + uint32_t *item, sum = 0; + int i = 0; + + while (len % 4) { + data[len] = 0; len++; + } + + while (i < len) { + item = (uint32_t *)((char *)data + i); + sum += *item; + i += 4; + } + + return sum; +} + +// Fill buffers from file descriptor +long buffer_fill(int fd) +{ + int s = 0; + int r = PACKET_PAYLOAD_SIZE; + + buffctl.buffer_id++; + buffctl.packet_id_base += buffctl.pkt_count; + buffctl.buffer_size = 0; + while (r > 0 && s < PACKETS_PER_BUFFER) { + if ((r = read(fd, packetctl[s]->data, PACKET_PAYLOAD_SIZE)) < 0) { + crit("Error reading data from stdin"); + } + + // r == 0 means EOF + + if (r > 0) { + buffctl.buffer_size += r; + packetctl[s]->data_size = r; + packetctl[s]->seq = buffctl.packet_id_base + s; + packetctl[s]->crc = packet_csum(packetctl[s]->data, r); + s++; + } + } + + log(6, "input %d bytes of data in %d packets", buffctl.buffer_size, s); + buffctl.pkt_count = s; + return s; +} + +long buffer_flush(int fd) +{ + int s = 0; + while (buffctl.pkt_count--) { + write(fd, packetctl[s]->data, packetctl[s]->data_size); + buffctl.buffer_size -= packetctl[s]->data_size; + packetctl[s]->data_size = 0; + packetctl[s]->seq = 0; + packetctl[s]->crc = 0; + s++; + } + + return s; +} + +/* Caller should use new_pkt as packet buffer again if this returns NULL */ +struct packet_ctl *packet_put(struct packet_ctl *new_pkt) +{ + struct packet_ctl *freed_pkt; + uint32_t i; + + if (new_pkt->seq < buffctl.packet_id_base || + new_pkt->seq - buffctl.packet_id_base >= buffctl.pkt_count) { + return NULL; + } + + if (new_pkt->data_size == 0) { + return NULL; + } + + i = packet_csum(new_pkt->data, new_pkt->data_size); + if (new_pkt->crc != i) { + return NULL; + } + + i = new_pkt->seq - buffctl.packet_id_base; + freed_pkt = packetctl[i]; + packetctl[i] = new_pkt; + return freed_pkt; +} + +struct packet_ctl *packet_get(uint32_t seq) +{ + empty.seq = seq; + + if (seq < buffctl.packet_id_base || + seq - buffctl.packet_id_base >= buffctl.pkt_count) { + return ∅ + } + + if (packetctl[seq - buffctl.packet_id_base]->data_size == 0) { + return ∅ + } + + return packetctl[seq - buffctl.packet_id_base]; +} diff --git a/code/jasmine/buffer.h b/code/jasmine/buffer.h new file mode 100755 index 00000000..e899fe62 --- /dev/null +++ b/code/jasmine/buffer.h @@ -0,0 +1,64 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_BUFFER_H +#define _MCAST_BUFFER_H + +#include +#include +#include + +#define MTU 1500 + +#define PACKET_SIZE ((MTU) - sizeof(struct iphdr) - sizeof(struct udphdr)) + +/* Exclude padding */ +#define PACKET_PAYLOAD_SIZE (((PACKET_SIZE) - sizeof(struct packet_ctl)) & ~0x3) + +#define PACKETS_PER_BUFFER 1024 + +#define DEF_PORT 18383 /* for both UDP and TCP */ + +/* Buffer header (align to 4 Byte) */ +struct buffer_ctl { + uint32_t buffer_id; + uint32_t buffer_size; + uint32_t packet_id_base; + uint32_t pkt_count; +}; + +/* Packet header (align to 4 Byte) */ +struct packet_ctl { + uint32_t seq; + uint32_t crc; + uint32_t data_size; + uint8_t data[0]; +}; + +#define CLIENT_READY 0x1 +#define CLIENT_REQ 0x2 +#define CLIENT_DONE 0x4 +#define SERVER_SENT 0x8 + +/* Retransmition Request Header (align to 4 Byte) */ +struct request_ctl { + uint32_t req_count; /* Requested packet slot count */ +}; + +extern struct buffer_ctl buffctl; +extern struct packet_ctl *packetctl[PACKETS_PER_BUFFER]; + +void buffer_init(); +long buffer_fill(int fd); +long buffer_flush(int fd); +struct packet_ctl *packet_put(struct packet_ctl *new_pkt); +struct packet_ctl *packet_get(uint32_t seq); +void packetctl_precheck(); + +#endif diff --git a/code/jasmine/build-aux/git-version-gen b/code/jasmine/build-aux/git-version-gen new file mode 100755 index 00000000..c3d53f36 --- /dev/null +++ b/code/jasmine/build-aux/git-version-gen @@ -0,0 +1,170 @@ +#!/bin/sh +# Print a version string. +scriptversion=2010-10-13.20; # UTC + +# Copyright (C) 2007-2010 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# This script is derived from GIT-VERSION-GEN from GIT: http://git.or.cz/. +# It may be run two ways: +# - from a git repository in which the "git describe" command below +# produces useful output (thus requiring at least one signed tag) +# - from a non-git-repo directory containing a .tarball-version file, which +# presumes this script is invoked like "./git-version-gen .tarball-version". + +# In order to use intra-version strings in your project, you will need two +# separate generated version string files: +# +# .tarball-version - present only in a distribution tarball, and not in +# a checked-out repository. Created with contents that were learned at +# the last time autoconf was run, and used by git-version-gen. Must not +# be present in either $(srcdir) or $(builddir) for git-version-gen to +# give accurate answers during normal development with a checked out tree, +# but must be present in a tarball when there is no version control system. +# Therefore, it cannot be used in any dependencies. GNUmakefile has +# hooks to force a reconfigure at distribution time to get the value +# correct, without penalizing normal development with extra reconfigures. +# +# .version - present in a checked-out repository and in a distribution +# tarball. Usable in dependencies, particularly for files that don't +# want to depend on config.h but do want to track version changes. +# Delete this file prior to any autoconf run where you want to rebuild +# files to pick up a version string change; and leave it stale to +# minimize rebuild time after unrelated changes to configure sources. +# +# It is probably wise to add these two files to .gitignore, so that you +# don't accidentally commit either generated file. +# +# Use the following line in your configure.ac, so that $(VERSION) will +# automatically be up-to-date each time configure is run (and note that +# since configure.ac no longer includes a version string, Makefile rules +# should not depend on configure.ac for version updates). +# +# AC_INIT([GNU project], +# m4_esyscmd([build-aux/git-version-gen .tarball-version]), +# [bug-project@example]) +# +# Then use the following lines in your Makefile.am, so that .version +# will be present for dependencies, and so that .tarball-version will +# exist in distribution tarballs. +# +# BUILT_SOURCES = $(top_srcdir)/.version +# $(top_srcdir)/.version: +# echo $(VERSION) > $@-t && mv $@-t $@ +# dist-hook: +# echo $(VERSION) > $(distdir)/.tarball-version + +case $# in + 1|2) ;; + *) echo 1>&2 "Usage: $0 \$srcdir/.tarball-version" \ + '[TAG-NORMALIZATION-SED-SCRIPT]' + exit 1;; +esac + +tarball_version_file=$1 +tag_sed_script="${2:-s/x/x/}" +nl=' +' + +# Avoid meddling by environment variable of the same name. +v= + +svn log --non-interactive -q --limit 1 > /dev/null 2>&1 +svnwork=$? + +# First see if there is a tarball-only version file. +# then try "git describe", then default. +if test -f $tarball_version_file +then + v=`cat $tarball_version_file` || exit 1 + case $v in + *$nl*) v= ;; # reject multi-line output + [0-9]*) ;; + *) v= ;; + esac + test -z "$v" \ + && echo "$0: WARNING: $tarball_version_file seems to be damaged" 1>&2 +fi + +if test -n "$v" +then + : # use $v +# Otherwise, if there is at least one git commit involving the working +# directory, and "git describe" output looks sensible, use that to +# derive a version string. +elif test "`git log -1 --pretty=format:x . 2>&1`" = x \ + && v=`git describe --abbrev=4 --match='v*' HEAD 2>/dev/null \ + || git describe --abbrev=4 HEAD 2>/dev/null` \ + && v=`printf '%s\n' "$v" | sed "$tag_sed_script"` \ + && case $v in + v[0-9]*) ;; + *) (exit 1) ;; + esac +then + # Is this a new git that lists number of commits since the last + # tag or the previous older version that did not? + # Newer: v6.10-77-g0f8faeb + # Older: v6.10-g0f8faeb + case $v in + *-*-*) : git describe is okay three part flavor ;; + *-*) + : git describe is older two part flavor + # Recreate the number of commits and rewrite such that the + # result is the same as if we were using the newer version + # of git describe. + vtag=`echo "$v" | sed 's/-.*//'` + numcommits=`git rev-list "$vtag"..HEAD | wc -l` + v=`echo "$v" | sed "s/\(.*\)-\(.*\)/\1-$numcommits-\2/"`; + ;; + esac + + # Change the first '-' to a '.', so version-comparing tools work properly. + # Remove the "g" in git describe's output string, to save a byte. + v=`echo "$v" | sed 's/-/./;s/\(.*\)-g/\1-/'`; +elif [ "$svnwork" -eq "0" ] ; +then + vtag="1.0.0" # we do not have git tag in svn, so here just hard coded it. + v=`svn log -q --limit 1 | grep "|" | awk '{print $1}'` + v=`echo "$v" |sed 's/^r//'` + v="$vtag.$v" +else + v=UNKNOWN +fi + +v=`echo "$v" |sed 's/^v//'` + +# Don't declare a version "dirty" merely because a time stamp has changed. +git update-index --refresh > /dev/null 2>&1 + +dirty=`sh -c 'git diff-index --name-only HEAD' 2>/dev/null` || dirty= +case "$dirty" in + '') ;; + *) # Append the suffix only if there isn't one already. + case $v in + *-dirty) ;; + *) v="$v-dirty" ;; + esac ;; +esac + +# Omit the trailing newline, so that m4_esyscmd can use the result directly. +echo "$v" | tr -d "$nl" + +# Local variables: +# eval: (add-hook 'write-file-hooks 'time-stamp) +# time-stamp-start: "scriptversion=" +# time-stamp-format: "%:y-%02m-%02d.%02H" +# time-stamp-time-zone: "UTC" +# time-stamp-end: "; # UTC" +# End: diff --git a/code/jasmine/client.c b/code/jasmine/client.c new file mode 100755 index 00000000..7beda75d --- /dev/null +++ b/code/jasmine/client.c @@ -0,0 +1,313 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include +#include +#include +#include +#include + +#include "buffer.h" +#include "udp-common.h" +#include "tcp-common.h" +#include "misc.h" + +/* Global statistics */ +long tot_dups = 0; +long tot_reps = 0; +long tot_pkts = 0; +long long tot_size = 0; + +void recv_mcinfo(int tcp_socket) +{ + int res; + + res = read_all(tcp_socket, &mcinfo, sizeof(struct mc_info)); + if (res != sizeof(struct mc_info)) { + crit("Error while reading initial data"); + } +} + +void recv_buffctl(int tcp_socket) +{ + int res; + + res = read_all(tcp_socket, &buffctl, sizeof(struct buffer_ctl)); + if (res != sizeof(struct buffer_ctl)) { + crit("Error reading buffer control"); + } +} + +void send_client_ready(int tcp_socket) +{ + uint8_t cmd; + + /* Tell TCP server we are ready to receive */ + cmd = CLIENT_READY; + write(tcp_socket, &cmd, 1); +} + +/* Read out the SERVER_SENT signal */ +void recv_server_sent(int tcp_socket) +{ + uint8_t cmd; + int res; + + res = read_all(tcp_socket, &cmd, 1); + if (res != 1) { + //if (read(tcp_socket, &cmd, 1) < 1) { + crit("Error reading SERVER_SENT"); + } + + if (cmd != SERVER_SENT) { + crit("Error reading SERVER_SENT %d", cmd); + } +} + +void send_client_request(int tcp_socket, struct request_ctl *req) +{ + uint8_t cmd; + + /* Tell TCP server we are ready to receive */ + cmd = CLIENT_REQ; + write(tcp_socket, &cmd, 1); + write(tcp_socket, req, + sizeof(struct request_ctl) + (req->req_count) * sizeof(uint32_t)); +} + +void recv_server_ack(int tcp_socket, struct packet_ctl *pkt) +{ + int res; + + res = read_all(tcp_socket, pkt, sizeof(struct packet_ctl)); + if (res != sizeof(struct packet_ctl)) { + crit("Error on tcp socket"); + } + + res = read_all(tcp_socket, + ((char *)pkt) + sizeof(struct packet_ctl), + pkt->data_size); + if (res != pkt->data_size) { + crit("Error on tcp socket received %d of %d", + res, pkt->data_size); + } +} + +void tcp_retransmition(int tcp_socket, + struct packet_ctl **curr_pkt, + struct packet_ctl **freed_pkt) +{ + struct request_ctl *reqctl; + uint32_t *reqbody; + uint8_t rqbuf[sizeof(struct request_ctl) + PACKETS_PER_BUFFER * sizeof(uint32_t)]; + uint32_t l; + + reqctl = (struct request_ctl *)rqbuf; + reqbody = (uint32_t *)(rqbuf + sizeof(struct request_ctl)); + + reqctl->req_count = 0; + for (l = 0; l < buffctl.pkt_count; l++) { + if (!packetctl[l]->data_size) { + log(6, "Requesting packet %u", l + buffctl.packet_id_base); + reqbody[reqctl->req_count] = l + buffctl.packet_id_base; + reqctl->req_count++; + } + } + + if (reqctl->req_count > 0) { + send_client_request(tcp_socket, reqctl); + + /* read retransmitted blocks via TCP */ + for (l = 0; l < reqctl->req_count; l++) { + if (*freed_pkt) { + *curr_pkt = *freed_pkt; + } + + recv_server_ack(tcp_socket, *curr_pkt); + + *freed_pkt = packet_put(*curr_pkt); + if (!(*freed_pkt)) { + crit("Malformed packet on tcp socket"); + } + if ((*freed_pkt)->data_size != 0) { + crit("Malformed free packet slot or TCP data"); + } + + log(6, "Received retran packet %u", (*curr_pkt)->seq); + } + + tot_reps += reqctl->req_count; + } +} + +/* Returns how many good packets received from UDP */ +int recv_mcast(int tcp_socket, int udp_socket, + struct packet_ctl **curr_pkt, + struct packet_ctl **freed_pkt) +{ + int rcv_pkt_count; + int got_sent; + int maxfd; + fd_set rfds; + struct timeval tv; + int res; + + maxfd = tcp_socket; + if (maxfd < udp_socket) { + maxfd = udp_socket; + } + maxfd++; + FD_ZERO(&rfds); + + rcv_pkt_count = 0; + got_sent = 0; + + if (buffctl.pkt_count != 0) { + do { + FD_SET(tcp_socket, &rfds); + FD_SET(udp_socket, &rfds); + tv.tv_sec = 5; + tv.tv_usec = 0; + + res = select(maxfd, &rfds, 0, 0, &tv); + if (res < 0) { + crit("select error"); + } + + if (res == 0) { + crit("select timed out"); + } + + /* Read multicast packet */ + if (FD_ISSET(udp_socket, &rfds)) { + log(7, "Reading multicast packet"); + if (*freed_pkt) { + *curr_pkt = *freed_pkt; + } + + res = recv(udp_socket, *curr_pkt, PACKET_SIZE, 0); + if (res <= 0) { + crit("error on multicast socket"); + } + + if (res < sizeof(struct packet_ctl) ) { + log(7, "Truncated packet received (%d bytes)", res); + } else if (res != (*curr_pkt)->data_size + sizeof(struct packet_ctl)) { + log(7, + "Truncated packet received (%d of %ld bytes)", + res, (*curr_pkt)->data_size + sizeof(struct packet_ctl)); + } else { + log(9, "Normal packet seq:%d", (*curr_pkt)->seq); + (*freed_pkt) = packet_put((*curr_pkt)); + if (!(*freed_pkt)) { + log(5, "Malformed packet"); + } else { + if ((*freed_pkt)->data_size == 0) { + rcv_pkt_count++; + } else { + log(6, "Duplicated packet"); + tot_dups++; + } + } + } + } else if (FD_ISSET(tcp_socket, &rfds)) { + /* Check TCP, only if there was no more data from UDP */ + log(6, "No more data path1"); + recv_server_sent(tcp_socket); + got_sent = 1; + break; + } + } while (rcv_pkt_count < buffctl.pkt_count); + } + + if (got_sent == 0) { + log(6, "No more data path2"); + recv_server_sent(tcp_socket); + } + + return rcv_pkt_count; +} + +void send_client_done(int tcp_socket) +{ + uint8_t cmd; + + /* Tell TCP server we are ready to receive */ + cmd = CLIENT_DONE; + write(tcp_socket, &cmd, 1); +} + +int main(int argc, char *argv[]) +{ + int ms, ts; + struct packet_ctl *alloc_pkt, *curr_pkt, *freed_pkt; + int udp_rcv_count; + u_short port = DEF_PORT; + struct in_addr local_addr; + + if (argc < 3) { + printf("Usage: %s [port]\n", argv[0]); + return 0; + } + + if (!inet_aton(argv[1], &local_addr)) { + crit("can not resolve address: %s", argv[1]); + } + + if (argc > 3) { + port = atoi(argv[3]); + if (!port) { + port = DEF_PORT; + } + } + + buffer_init(); + /* Init first time packet slot */ + alloc_pkt = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE); + memset(alloc_pkt, 0, PACKET_SIZE); + freed_pkt = curr_pkt = alloc_pkt; + + ts = init_tcp_client_socket(argv[2], port); + recv_mcinfo(ts); + ms = init_mcast_socket(&local_addr, &mcinfo.group); + /* Do we need set_nonblock(ms)??? ; */ + + /* Will do dummy run even if buffctl.pkt_count is zero at the first round */ + do { /* one buffer a round */ + packetctl_precheck(); + recv_buffctl(ts); + send_client_ready(ts); + + udp_rcv_count = recv_mcast(ts, ms, &curr_pkt,&freed_pkt); + if (udp_rcv_count == buffctl.pkt_count) { + log(6, "All packets of current buffer received from UDP"); + } else { + tcp_retransmition(ts, &curr_pkt, &freed_pkt); + } + + tot_pkts += buffctl.pkt_count; + tot_size += buffctl.buffer_size; + log(1, "\rBuffer received %lld Bytes, %ld Packets(%ld Repeats %ld Dups)", + tot_size, tot_pkts, tot_reps, tot_dups); + + if (buffctl.pkt_count) { + buffer_flush(STDOUT_FILENO); + } + + send_client_done(ts); + } while (buffctl.pkt_count != 0); + + shutdown(ts, 2); + close(ts); + close(ms); + log(1, "All buffers receive done.\n"); + return 0; +} diff --git a/code/jasmine/configure.ac b/code/jasmine/configure.ac new file mode 100755 index 00000000..8f42bab2 --- /dev/null +++ b/code/jasmine/configure.ac @@ -0,0 +1,65 @@ +# -*- Autoconf -*- +# Process this file with autoconf to produce a configure script. + +# bootstrap / init +AC_PREREQ([2.61]) + +AC_INIT([jasmine], + m4_esyscmd([build-aux/git-version-gen .tarball-version]), + [hu.zhijiang@zte.com.cn]) + +AC_USE_SYSTEM_EXTENSIONS + +AM_INIT_AUTOMAKE([-Wno-portability]) + +AC_CONFIG_HEADER([config.h]) + +AC_CONFIG_MACRO_DIR([m4]) + +AC_SUBST(WITH_LIST, [""]) + +dnl Fix default variables - "prefix" variable if not specified +if test "$prefix" = "NONE"; then + prefix="/usr" + + dnl Fix "localstatedir" variable if not specified + if test "$localstatedir" = "\${prefix}/var"; then + localstatedir="/var" + fi + dnl Fix "sysconfdir" variable if not specified + if test "$sysconfdir" = "\${prefix}/etc"; then + sysconfdir="/etc" + fi + dnl Fix "libdir" variable if not specified + if test "$libdir" = "\${exec_prefix}/lib"; then + if test -e /usr/lib64; then + libdir="/usr/lib64" + else + libdir="/usr/lib" + fi + fi +fi + +# Checks for programs. +AC_PATH_PROG([BASHPATH], [bash]) + +AC_CONFIG_FILES([Makefile]) + +PACKAGE_FEATURES="" + +ENV_CFLAGS="$CFLAGS" +OPT_CFLAGS="" +GDB_FLAGS="" +EXTRA_WARNINGS="-Wall" +CFLAGS="$ENV_CFLAGS $OPT_CFLAGS $GDB_FLAGS $EXTRA_WARNINGS" + +# substitute what we need: +AC_SUBST([BASHPATH]) + +AC_OUTPUT + +AC_MSG_RESULT([ Version = ${PACKAGE_VERSION}]) +AC_MSG_RESULT([ Final CFLAGS = ${CFLAGS}]) +AC_MSG_RESULT([ Final CPPFLAGS = ${CPPFLAGS}]) +AC_MSG_RESULT([ Final LDFLAGS = ${LDFLAGS}]) +AC_MSG_RESULT([ Features = ${PACKAGE_FEATURES}]) diff --git a/code/jasmine/jasmine.spec.in b/code/jasmine/jasmine.spec.in new file mode 100755 index 00000000..73172f31 --- /dev/null +++ b/code/jasmine/jasmine.spec.in @@ -0,0 +1,33 @@ +Summary: Just A Small Multicast engINE +Name: jasmine +Version: @version@ +Release: 1%{?dist} +Vendor: ZTE +License: Apache-2.0 +Group: System Environment/Base +URL: https://wiki.opnfv.org/display/DAIS +Source: %{name}-%{version}%{?gittarver}.tar.gz + +BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX) + +%description +jasmine is used to distribute files over UDP multicast for installers + +%prep +%setup -q -n %{name}-%{version}%{?gittarver} + +%build +if [ ! -f configure ]; then + ./autogen.sh +fi + +./configure + +%install +rm -rf %{buildroot} +make install DESTDIR=%{buildroot} + +%files +%{_bindir}/jasmines +%{_bindir}/jasminec + diff --git a/code/jasmine/misc.c b/code/jasmine/misc.c new file mode 100755 index 00000000..fefdf690 --- /dev/null +++ b/code/jasmine/misc.c @@ -0,0 +1,73 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include +#include +#include + +#include "misc.h" + +struct sockaddr_in make_addr(char *addr, unsigned short port) +{ + struct sockaddr_in sin; + + sin.sin_family = AF_INET; + if (!addr) { + sin.sin_addr.s_addr = htonl(INADDR_ANY); + } else { + if (!inet_aton(addr, &sin.sin_addr)) + crit("cant resolve address: %s", addr); + } + sin.sin_port = htons(port); + return sin; +} + +/* Better than using a macro */ +void set_nonblock(int s) +{ + if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) { + crit("set_nonblock failed, can't set O_NONBLOCK"); + } +} + +void * wrapper_malloc(size_t size) +{ + void * res; + + if (size == 0) { + crit("wrapper_malloc: malloc 0 size not allowed"); + } + + res = malloc(size); + if (res == NULL) { + crit("wrapper_malloc: malloc failed"); + } + + return res; +} + +size_t read_all(int fd, void *buf, size_t count) +{ + size_t t = 0; + size_t r = 0; + + while (count > 0) { + r = read(fd, buf + t, count); + if (r <= 0) { + return t ? t : r; + } + + t += r; + count -= r; + } + + return t; +} diff --git a/code/jasmine/misc.h b/code/jasmine/misc.h new file mode 100755 index 00000000..e60d0b63 --- /dev/null +++ b/code/jasmine/misc.h @@ -0,0 +1,39 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_MISC_H +#define _MCAST_MISC_H + +#include +#include +#include +#include +#include +#include +#define gettid() syscall(__NR_gettid) + +#define level 6 + +#define crit(x, args...) do { \ + fprintf(stderr, "\nERROR: "); fprintf(stderr, x, ##args); \ + error(-1,errno, "\nERROR: [%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \ +} while (0); + +#define log(l, f, args...) if (l < level) { \ + fprintf(stderr, "\n[%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \ + fprintf(stderr, f, ##args); \ + fprintf(stderr, "\n"); \ +} + +struct sockaddr_in make_addr(char *addr, unsigned short port); +void * wrapper_malloc(size_t size); +void set_nonblock(int s); +size_t read_all(int fd, void *buf, size_t count); + +#endif diff --git a/code/jasmine/server-tcp.c b/code/jasmine/server-tcp.c new file mode 100755 index 00000000..f5c83b96 --- /dev/null +++ b/code/jasmine/server-tcp.c @@ -0,0 +1,550 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "buffer.h" +#include "tcp-common.h" +#include "udp-common.h" +#include "tcp-queue.h" +#include "misc.h" +#include "server.h" + +#define MAX_CLIENTS 128 /* Clients per TCP server */ +#define TCP_BUFF_SIZE 65536 + +struct cdata { + struct tcpq *tx; + struct tcpq *rx; + int await; /* non-zero if we are in the middle of receiving clients requests */ + struct sockaddr_in peer; +}; + +struct server_status_data { + int state; /* state machine */ + int sync_count; + int ccount; + struct semlink sl; + pthread_t pchild; + + struct pollfd ds[MAX_CLIENTS + 1]; + struct cdata cd[MAX_CLIENTS + 1]; + int cindex; + int last_buff_pkt_count; +}; + +void init_sdata(struct server_status_data *sdata, void *thread_args) +{ + int i; + + sdata->sl.parent = (struct semaphores *)thread_args; + sdata->sl.this = NULL; + + sdata->state = S_PREP; + sdata->last_buff_pkt_count = 1; + sdata->sync_count = 0; + + for (i = 0; i < MAX_CLIENTS + 1; i++) { + sdata->ds[i].fd = -1; + sdata->ds[i].events = POLLIN; + sdata->cd[i].tx = tcpq_queue_init(); + sdata->cd[i].rx = tcpq_queue_init(); + sdata->cd[i].await = 0; + } +} + +/* disconnect one client and compress table */ +void kill_client(struct server_status_data *sdata) +{ + struct pollfd *ds = sdata->ds; + struct cdata *cd = sdata->cd; + + close((ds + sdata->cindex)->fd); + tcpq_queue_free((cd + sdata->cindex)->rx); + tcpq_queue_free((cd + sdata->cindex)->tx); + + /* Move last slot to this free slot and release the last slot */ + *(ds + sdata->cindex) = *(ds + sdata->ccount - 1); + *(cd + sdata->cindex) = *(cd + sdata->ccount - 1); + sdata->cindex--; + sdata->ccount--; + client_count--; +} + +void send_buffctl_to_all_clients(struct server_status_data *sdata) +{ + int i; + long send_sz; + void *cpy; + + /* copy header to all queues */ + send_sz = sizeof(struct buffer_ctl); + for (i = 1; i < sdata->ccount; i++) { + cpy = wrapper_malloc(send_sz); + memcpy(cpy, &buffctl, send_sz); + tcpq_queue_tail(sdata->cd[i].tx, cpy, send_sz); + sdata->ds[i].events |= POLLOUT; + } +} + +void send_sent_to_all_clients(struct server_status_data *sdata) +{ + int i; + void *cpy; + uint8_t byte = SERVER_SENT; + + /* Push data to clients */ + for (i = 1; i < sdata->ccount; i++) { + cpy = wrapper_malloc(1); + memcpy(cpy, &byte, 1); + tcpq_queue_tail(sdata->cd[i].tx, cpy, 1); + sdata->ds[i].events |= POLLOUT; + } +} +void accept_clients(struct server_status_data *sdata) +{ + int res; + int new_fd; + socklen_t socklen; + int poll_events; + int timeout = -1; + struct pollfd *ds = sdata->ds; + struct cdata *cd = sdata->cd; + + ds->events = POLLIN; + ds->revents = 0; + sdata->ccount = 1; + + while (sdata->ccount <= MAX_CLIENTS && client_count < wait_count) { + poll_events = poll(ds, 1, timeout); + if (poll_events < 0) { + crit("poll() failed"); + } else if (poll_events == 0) { + log(2, "poll() returned with no results!"); + continue; + } + + log(9, "poll: %d events", poll_events); + + if (ds->revents) { + + /* new connections come in */ + if (ds->revents & (POLLIN|POLLPRI)) { + + do { + socklen = sizeof((cd + sdata->ccount)->peer); +retry_accept: + new_fd = accept(ds->fd, + (struct sockaddr *)&((cd + sdata->ccount)->peer), + &socklen); + if (new_fd == -1 && errno == EINTR) { + goto retry_accept; + } + + if (new_fd == -1) { + log(2, "accept() returned with error %d", errno); + break; + } + + /* Send group info, before set non block */ + res = write(new_fd, &mcinfo, sizeof(struct mc_info)); + if (res < 0) { + log(3, "Error opening connection: %s", + inet_ntoa((cd + sdata->ccount)->peer.sin_addr)); + close(new_fd); + continue; + } + + /* Can set to non block becasue we will poll it in future. */ + res = fcntl(new_fd, F_SETFL, O_NONBLOCK); + if (res == -1) { + log(2, "fcntl() returned with error %d", errno); + close(new_fd); + continue; + } + + (ds + sdata->ccount)->fd = new_fd; + + log(5, "New connection %d: %s", + sdata->ccount, inet_ntoa((cd + sdata->ccount)->peer.sin_addr)); + sdata->ccount++; + client_count++; + } while (sdata->ccount <= MAX_CLIENTS); + + } else { + log(2, "Error on tcp socket"); + } + + poll_events--; + ds->revents = 0; + } + } + + ds->events = 0; +} + +void accept_clients_may_spawn(struct server_status_data *sdata) +{ + /* Wait the parent to let us accpet clients */ + sl_wait_parent(&(sdata->sl)); + + sdata->ds[0].fd = sfd; + accept_clients(sdata); + if (client_count < wait_count) { + sdata->sl.this = spawn_thread(&sdata->pchild, tcp_server_main, 1); + + /* Let child TCP servers accept connections */ + sl_release_child(&(sdata->sl)); + /* Wait until all childern TCP servers got all their clients */ + sl_wait_child(&(sdata->sl)); + } + + /* Tell parent all client are accepted/connected */ + sl_release_parent(&(sdata->sl)); +} + +void keep_on_receiving_client_request(struct server_status_data *sdata) +{ + char buf[TCP_BUFF_SIZE]; + void *cpy; + struct request_ctl *req; + uint32_t *rqb; + struct packet_ctl *ans; + long rq_index; + long total_sz; + + long read_out; + struct pollfd *ds; + struct cdata *cd; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + read_out = read(ds->fd, buf, cd->await); + if (read_out <= 0) { + log(5, "Client disconnected (r): %s", + inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; + } + + log(7, "New data (%ld + %ld) on conn %d: %s", + read_out, tcpq_queue_dsize(cd->rx), sdata->cindex, + inet_ntoa(cd->peer.sin_addr)); + cpy = wrapper_malloc(read_out); + memcpy(cpy, buf, read_out); + tcpq_queue_tail(cd->rx, cpy, read_out); + cd->await -= read_out; + + /* Full header received */ + if (tcpq_queue_dsize(cd->rx) == sizeof(struct request_ctl)) { + cpy = tcpq_queue_flat_peek(cd->rx, &read_out); + req = (struct request_ctl *)cpy; + cd->await = req->req_count * sizeof(uint32_t); + log(6, "Client request for %u packets on %d: %s", + req->req_count, sdata->cindex, inet_ntoa(cd->peer.sin_addr)); + /* req->rqc may be zero? So do not return from here */ + } + + /* Whole request(struct request_ctl + all rq blocks) received */ + if (cd->await == 0) { + cpy = tcpq_dqueue_flat(cd->rx, &read_out); + req = (struct request_ctl *)cpy; + rqb = (uint32_t *) (cpy + sizeof(struct request_ctl)); + for (rq_index = 0; rq_index < req->req_count; rq_index++) { + ans = packet_get(*(rqb + rq_index)); + total_sz = ans->data_size + sizeof(struct packet_ctl); + log(6, "Send packet %u (%u bytes) on %d", + rqb[rq_index], ans->data_size, sdata->cindex); + cpy = wrapper_malloc(total_sz); + memcpy(cpy, ans, total_sz); + tcpq_queue_tail(cd->tx, cpy, total_sz); + } + + if (rq_index > 0) { + /* Data need to be sent out */ + ds->events |= POLLOUT; + } + } +} + +void handle_error_event(struct server_status_data *sdata) +{ + struct cdata *cd; + + cd = &(sdata->cd[sdata->cindex]); + log(5, "Closing connection %d: %s", + sdata->cindex, inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); +} + +void handle_client_ready(struct server_status_data *sdata) +{ + if (sdata->state == S_SYNC) { + sdata->sync_count++; + + log(7, "Client SYNC %d of %d", + sdata->sync_count, sdata->ccount - 1); + + if (sdata->sync_count == sdata->ccount - 1) { + /* All client SYNC done */ + sdata->state = S_SEND; + + /* Wait child ready */ + sl_wait_child(&(sdata->sl)); + /* Tell parent I am ready, release parent */ + sl_release_parent(&(sdata->sl)); + + /* Wait parent to send mcast */ + sl_wait_parent(&(sdata->sl)); + /* tell child mcast done, release child */ + sl_release_child(&(sdata->sl)); + + send_sent_to_all_clients(sdata); + } + } +} + +void handle_client_done(struct server_status_data *sdata) +{ + if (sdata->state == S_SEND) { + log(7, "Client START %d of %d", + sdata->sync_count, sdata->ccount - 1); + + sdata->sync_count--; + if (sdata->sync_count == 0) { + /* Got all client START */ + sdata->state = S_PREP; + log(7, "All received, now tell UDP Server prepare next buffer"); + /* Remeber old buffctl.pkt_count before it change */ + sdata->last_buff_pkt_count = buffctl.pkt_count; + + /* Wait child clients done */ + sl_wait_child(&(sdata->sl)); + /* Tell parent all our clients are done, release parent */ + sl_release_parent(&(sdata->sl)); + } + } +} + +void handle_client_events(struct server_status_data *sdata) +{ + struct pollfd *ds; + struct cdata *cd; + int read_out; + uint8_t msgtype; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + /* read message type */ + read_out = read(ds->fd, &msgtype, 1); + if (read_out <= 0) { + log(5, "Client disconnected due to read error %d, ret:%d (r): %s", + errno, read_out, inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; /* continue to check other clients */ + } + + switch (msgtype) { + case CLIENT_READY: + handle_client_ready(sdata); + break; + case CLIENT_DONE: + handle_client_done(sdata); + break; + case CLIENT_REQ: + /* wait a whole struct request_ctl */ + cd->await = sizeof(struct request_ctl); + break; + default: + log(4, "Wrong message type from %s", + inet_ntoa(cd->peer.sin_addr)); + break; + } +} + +void handle_pullin_event(struct server_status_data *sdata) +{ + struct cdata *cd; + + cd = &(sdata->cd[sdata->cindex]); + + /* Await is set only for repeat request */ + if (cd->await) { + keep_on_receiving_client_request(sdata); + } else { + handle_client_events(sdata); + } +} + +void handle_pullout_event(struct server_status_data *sdata) +{ + char buf[TCP_BUFF_SIZE]; + struct pollfd *ds; + struct cdata *cd; + long transmit_sz; + void *cpy; + long written_in; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + if (cd->tx->count == 0) { + ds->events = POLLIN; /* Just make sure */ + return; + } + + log(7, "handle_pullout_event servs No.%d conn", sdata->cindex); + + /* If there is some data to be sent, try to do it now */ + cpy = tcpq_dequeue_head(cd->tx, &transmit_sz); + memcpy(buf, cpy, transmit_sz); + free(cpy); + + written_in = write(ds->fd, buf, transmit_sz); + if (written_in <= 0) { + log(5, "Client disconnected (w): %s", + inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; + } + + if (written_in != transmit_sz) { + log(6, "Partial wrote: %ld out of %ld bytes sent", + written_in, transmit_sz); + cpy = wrapper_malloc(transmit_sz - written_in); + memcpy(cpy, buf + written_in, transmit_sz - written_in); + tcpq_queue_head(cd->tx, cpy, transmit_sz - written_in); + } else { + log(7, "Sent %ld bytes to %s", + written_in, inet_ntoa(cd->peer.sin_addr)); + if (cd->tx->count == 0) { + /* Do not listen pollout event anymore */ + ds->events = POLLIN; + } + } +} + +void check_clients(struct server_status_data *sdata) +{ + int poll_events; + struct pollfd *ds; + int timeout = -1; + + if (sdata->ccount == 1) { + log(5, "No more clients, start exiting. s,c,p:%d,%d,%d", + sdata->state, sdata->ccount, buffctl.pkt_count); + /* No client existed */ + switch(sdata->state) { + case S_SYNC: + /* Wait all children ready */ + sl_wait_child(&(sdata->sl)); + /* Tell parent I am ready, release parent */ + sl_release_parent(&(sdata->sl)); + + /* Wait parent to send mcast */ + sl_wait_parent(&(sdata->sl)); + /* Tell child mcast done, release child */ + sl_release_child(&(sdata->sl)); + case S_SEND: + sdata->state = S_PREP; + /* Remeber old buffctl.pkt_count before it change */ + sdata->last_buff_pkt_count = buffctl.pkt_count; + + /* Wait child clients done */ + sl_wait_child(&(sdata->sl)); + /* Tell parent all our clients are done */ + sl_release_parent(&(sdata->sl)); + } + /* return to main loop to finish the dummy run */ + return; + } + + /* poll clients only */ + poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout); + if (poll_events < 0) { + crit("poll() failed"); + } else if (poll_events == 0) { + log(2, "poll() returned with no results!"); + return; + } + + log(9, "poll clients: %d events", poll_events); + + /* check all connected clients */ + for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) { + ds = &(sdata->ds[sdata->cindex]); + if (!ds->revents) { + continue; + } + + if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) { + handle_error_event(sdata); + } else if (ds->revents & (POLLIN|POLLPRI)) { + handle_pullin_event(sdata); + } else if (ds->revents & POLLOUT) { + handle_pullout_event(sdata); + } + } +} + +void * tcp_server_main(void *args) +{ + int i; + struct server_status_data ssdata; + struct server_status_data *sdata; + + sdata = &ssdata; + init_sdata(sdata, args); + + accept_clients_may_spawn(sdata); + + while (1) { + if (sdata->state == S_PREP) { + /* Wait UDP server to prepare the buffctl of this round to send */ + sl_wait_parent(&(sdata->sl)); + log(6, "After waiting UDP server preparation. s,c,p:%d,%d,%d", + sdata->state, sdata->ccount - 1, buffctl.pkt_count); + /* Tell children buffer pareparation done, release child */ + sl_release_child(&(sdata->sl)); + send_buffctl_to_all_clients(sdata); + sdata->state = S_SYNC; + } + + check_clients(sdata); + + if (sdata->state == S_PREP && sdata->last_buff_pkt_count == 0) { + log(5, "No more output, TCP server finished"); + if (sdata->sl.this && pthread_join(sdata->pchild, 0) < 0) { + crit("pthread_join()"); + } + + /* shutdown conn to clients */ + for (i = sdata->ccount - 1; i > 0; i--) { + shutdown((sdata->ds[i]).fd, 2); + } + + if (!sdata->sl.this) { + /* leaf server */ + close(sfd); + sfd = -1; + } + return 0; + } + } +} diff --git a/code/jasmine/server-udp.c b/code/jasmine/server-udp.c new file mode 100755 index 00000000..e8a62c25 --- /dev/null +++ b/code/jasmine/server-udp.c @@ -0,0 +1,95 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include +#include +#include +#include +#include + +#include "buffer.h" +#include "udp-common.h" +#include "misc.h" +#include "server.h" + +void * udp_server_main(void *args) +{ + pthread_t pchild; + struct sockaddr_in maddr; + int ms; + int i; + struct semlink sl; + char adr[128]; + long long total = 0; + + sprintf(adr, "%s%s", MCAST_ADDR_BASE, MCAST_ADDR_SUFFIX); + maddr = make_addr(adr, server_port); + mcinfo.group = maddr; + ms = init_mcast_socket(&local_addr, &maddr); + + sl.this = spawn_thread(&pchild, tcp_server_main, 1); + sl.parent = NULL; + + buffer_init(); + + /* Let TCP servers accept connections */ + sl_release_child(&sl); + /* Wait until all TCP servers got all their clients */ + sl_wait_child(&sl); + + log(7, "UDP Server: All clients were accepted"); + + do { + /* one buffer round */ + + i = buffer_fill(STDIN_FILENO); + + if (!client_count) { + /* No need to send, make it the last run of main loop, also let + TCP Server to do dummy run right away(not waiting until all + data sent). */ + i = 0; + buffctl.pkt_count = 0; + } + + log(7, "Signal TCP Server to send buffctl"); + + /* Tell tcp to Send headers, release child */ + sl_release_child(&sl); + /* Wait all clients ready to start receiving */ + sl_wait_child(&sl); + + /* multicast data */ + while (i--) { + sendto(ms, packetctl[i], + packetctl[i]->data_size + sizeof(struct packet_ctl), 0, + (struct sockaddr *)&maddr, sizeof(maddr)); + } + + log(7, "send finish"); + /* Tell tcp to Send SERVER_SENT, release child */ + sl_release_child(&sl); + /* wait tcp to send SERVER_SENT */ + sl_wait_child(&sl); + + total = total + buffctl.buffer_size; + log(1, "Buffer Done: sent %lld bytes", total); + } while (buffctl.pkt_count); + + + if (pthread_join(pchild, 0) < 0) { + crit("pthread_join() TCP Server"); + } + + log(1, "All Done"); + close(ms); + return 0; +} diff --git a/code/jasmine/server.c b/code/jasmine/server.c new file mode 100755 index 00000000..cf5d398e --- /dev/null +++ b/code/jasmine/server.c @@ -0,0 +1,123 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include +#include +#include +#include + +#include "buffer.h" +#include "tcp-common.h" +#include "misc.h" +#include "server.h" + +/* Global variables shared among server*.c */ +int wait_count = 0; +int client_count = 0; +int server_port = DEF_PORT; /* Currently for both UDP and TCP */ +struct in_addr local_addr; +/* listen socket - global for all TCP threads */ +int sfd = -1; + +void init_semaphores(struct semaphores *sp) +{ + if (sem_init(&sp->wait_parent, 0, 0) < 0) { + crit("sem_init() wait_parent"); + } + + if (sem_init(&sp->wait_child, 0, 0) < 0) { + crit("sem_init() wait_child"); + } +} + +void sl_wait_child(struct semlink *sl) +{ + if (sl->this) { + P(sl->this->wait_child); + } +} + +void sl_release_child(struct semlink *sl) +{ + if (sl->this) { + V(sl->this->wait_parent); + } +} + +void sl_wait_parent(struct semlink *sl) +{ + P(sl->parent->wait_parent); +} + +void sl_release_parent(struct semlink *sl) +{ + V(sl->parent->wait_child); +} + +/* Wrapper for pthread_create, additionaly may initialize/pass thru + a semaphores parameter */ +struct semaphores * spawn_thread(pthread_t *pchild, + void *(*entry)(void *), + int create_sems) +{ + struct semaphores *sp = NULL; + + if (create_sems != 0) { + sp = (struct semaphores *) wrapper_malloc(sizeof(struct semaphores)); + init_semaphores(sp); + } + + if (pthread_create(pchild, 0, entry, (void *)sp) != 0) { + crit("pthread_create"); + } + return sp; +} + +int main(int argc, char *argv[]) +{ + pthread_t pchild; + + log(9, "buffer size:%ld, buffer head size:%ld, data size:%ld", + PACKET_SIZE, sizeof(struct packet_ctl), PACKET_PAYLOAD_SIZE); + + if (argc < 3) { + printf("Usage: %s local_ip num_of_clients [port]\n", argv[0]); + return -1; + } + + if (!inet_aton(argv[1], &local_addr)) { + crit("can not resolve address: %s", argv[1]); + } + + wait_count = atoi(argv[2]); + if (!wait_count) { + crit("can not serv to 0 client\n"); + } + + if (argc > 3) { + server_port = atoi(argv[3]); + if (!server_port) { + server_port = DEF_PORT; + } + } + + /* sfd is shared by all TCP threads as sdata->ds[0].fd */ + sfd = init_tcp_server_socket(0, server_port); + set_nonblock(sfd); + + /* start UDP servers thread */ + (void)spawn_thread(&pchild, udp_server_main, 0); + if (pthread_join(pchild, 0) < 0) { + crit("pthread_join() UDP Server"); + } + + return 0; +} diff --git a/code/jasmine/server.h b/code/jasmine/server.h new file mode 100755 index 00000000..1e7a6409 --- /dev/null +++ b/code/jasmine/server.h @@ -0,0 +1,63 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_SERVER_H +#define _MCAST_SERVER_H + +#include +#include + +struct semaphores { + sem_t wait_parent; + sem_t wait_child; +}; + +struct semlink { + struct semaphores *this; /* used by parent to point to the struct semaphores + which it created during spawn child. */ + struct semaphores *parent; /* used by child to point to the struct + semaphores which it created by parent */ +}; + +void sl_wait_child(struct semlink *sl); +void sl_release_child(struct semlink *sl); +void sl_wait_parent(struct semlink *sl); +void sl_release_parent(struct semlink *sl); + +/* Server state machine */ +#define S_PREP 0 +#define S_SYNC 1 +#define S_SEND 2 + +extern int wait_count; +extern int client_count; +extern int server_port; +extern struct in_addr local_addr; +extern int sfd; + +void *udp_server_main(void *args); +void *tcp_server_main(void *args); +void init_semaphores(struct semaphores *sp); +struct semaphores * spawn_thread(pthread_t *pchild, + void *(*entry)(void *), + int create_sems); + +#define V(x) do { \ + if (sem_post(&x) < 0) { \ + crit("sem_post()"); \ + } \ +} while(0) + +#define P(x) do { \ + if (sem_wait(&x) != 0) { \ + crit("sem_wait()"); \ + } \ +} while(0) + +#endif diff --git a/code/jasmine/tcp-common.c b/code/jasmine/tcp-common.c new file mode 100755 index 00000000..62e2bb26 --- /dev/null +++ b/code/jasmine/tcp-common.c @@ -0,0 +1,60 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include + +#include "buffer.h" +#include "tcp-common.h" +#include "misc.h" + +int init_tcp_server_socket(char *addr, unsigned short port) +{ + int s; + struct sockaddr_in sin; + + port = port ? port : TCP_DPORT; + sin = make_addr(addr, port); + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + crit("socket() failed"); + } + + if (bind(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) { + crit("bind() failed, error binding tcp socket"); + } + + if (listen(s, 5) < 0) { + crit("listen() failed, error listening on socket"); + } + + log(4, "init_tcp_server_socket %s:%d", inet_ntoa(sin.sin_addr), port); + return s; +} + +int init_tcp_client_socket(char *addr, unsigned short port) +{ + int s; + struct sockaddr_in sin; + + port = port ? port : TCP_DPORT; + sin = make_addr(addr, port); + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + crit("socket() failed"); + } + + if (connect(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) { + crit("connect() failed"); + } + + log(4, "init_tcp_client_socket %s:%d", inet_ntoa(sin.sin_addr), port); + return s; +} diff --git a/code/jasmine/tcp-common.h b/code/jasmine/tcp-common.h new file mode 100755 index 00000000..6ac48e1c --- /dev/null +++ b/code/jasmine/tcp-common.h @@ -0,0 +1,18 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_TCP_COMMON_H +#define _MCAST_TCP_COMMON_H + +#define TCP_DPORT DEF_PORT + +int init_tcp_server_socket(char *addr, unsigned short port); +int init_tcp_client_socket(char *addr, unsigned short port); + +#endif diff --git a/code/jasmine/tcp-queue.c b/code/jasmine/tcp-queue.c new file mode 100755 index 00000000..fc0323f9 --- /dev/null +++ b/code/jasmine/tcp-queue.c @@ -0,0 +1,178 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +/* TCP poll message single linked queue */ +#include +#include + +#include "misc.h" +#include "tcp-queue.h" + +void tcpq_queue_tail(struct tcpq *q, void *data, long size) +{ + struct qmsg *tmp; + + if (!q) { + return; + } + + tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg)); + tmp->next = NULL; + + if (!q->head) { + q->head = q->tail = tmp; + } else { + q->tail->next = tmp; + q->tail = tmp; + } + + q->tail->data = data; + q->tail->size = size; + q->size += size; + q->count++; +} + +void tcpq_queue_head(struct tcpq *q, void *data, long size) +{ + struct qmsg *tmp; + + if (!q) { + return; + } + + tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg)); + tmp->next = NULL; + + if (!q->head) { + q->head = q->tail = tmp; + } else { + tmp->next = q->head; + q->head = tmp; + } + + q->head->data = data; + q->head->size = size; + q->size += size; + q->count++; +} + +void * tcpq_dequeue_head(struct tcpq *q, long *size) +{ + void *res = NULL; + struct qmsg *tmp; + + if (q && q->head) { + res = q->head->data; + *size = q->head->size; + + tmp = q->head; + q->head = q->head->next; + if (!q->head) { + q->tail = NULL; + } + + free(tmp); + q->count--; + q->size -= *size; + } + return res; +} + +void * tcpq_queue_peek(struct tcpq *q, long *size) +{ + if (q && q->head) { + *size = q->head->size; + return q->head->data; + } + + return NULL; +} + +long tcpq_queue_dsize(struct tcpq *q) +{ + if (q) { + return q->size; + } + + return 0; +} + +void tcpq_queue_free(struct tcpq *q) +{ + struct qmsg *tmp; + + if (!q) { + return; + } + + while (q->head) { + tmp = q->head; + q->head = tmp->next; + free(tmp->data); + free(tmp); + } + + q->count = 0; + q->size = 0; + q->head = q->tail = NULL; +} + +struct tcpq * tcpq_queue_init(void) +{ + struct tcpq *q = wrapper_malloc(sizeof(struct tcpq)); + + q->count = 0; + q->size = 0; + q->head = q->tail = NULL; + return q; +} + +void * tcpq_dqueue_flat(struct tcpq *q, long *size) +{ + void *res; + struct qmsg *tmp; + long offs = 0; + + if (!q || q->count == 0) { + return NULL; + } + + res = wrapper_malloc(q->size); + *size = q->size; + + while (q->head) { + memcpy(res + offs, q->head->data, q->head->size); + offs += q->head->size; + + tmp = q->head; + q->head = tmp->next; + free(tmp->data); + free(tmp); + } + tcpq_queue_free(q); + return res; +} + +void * tcpq_queue_flat_peek(struct tcpq *q, long *size) +{ + void *cpy; + + if (!q) { + return NULL; + } + + if (q->count > 1) { + cpy = tcpq_dqueue_flat(q, size); + tcpq_queue_tail(q, cpy, *size); /* use tcpq_queue_head is also OK */ + } else { + cpy = tcpq_queue_peek(q, size); + } + + return cpy; +} diff --git a/code/jasmine/tcp-queue.h b/code/jasmine/tcp-queue.h new file mode 100755 index 00000000..4f096f85 --- /dev/null +++ b/code/jasmine/tcp-queue.h @@ -0,0 +1,35 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_TCP_QUEUE_H +#define _MCAST_TCP_QUEUE_H + +struct tcpq { + struct qmsg *head, *tail; + long count; /* message count in a queue */ + long size; /* Total data size of a queue */ +}; + +struct qmsg { + struct qmsg *next; + void *data; + long size; +}; + +struct tcpq * tcpq_queue_init(void); +void tcpq_queue_free(struct tcpq *q); +long tcpq_queue_dsize(struct tcpq *q); +void tcpq_queue_tail(struct tcpq *q, void *data, long size); +void tcpq_queue_head(struct tcpq *q, void *data, long size); +void * tcpq_dequeue_head(struct tcpq *q, long *size); +void * tcpq_queue_peek(struct tcpq *q, long *size); +void * tcpq_dqueue_flat(struct tcpq *q, long *size); +void * tcpq_queue_flat_peek(struct tcpq *q, long *size); + +#endif diff --git a/code/jasmine/udp-common.c b/code/jasmine/udp-common.c new file mode 100755 index 00000000..317b816b --- /dev/null +++ b/code/jasmine/udp-common.c @@ -0,0 +1,97 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include +#include +#include +#include +#include +#include + +#include "buffer.h" +#include "udp-common.h" +#include "misc.h" + +struct mc_info mcinfo; + +int init_mcast_socket(struct in_addr *local_addr, + struct sockaddr_in *maddr) +{ + struct ip_mreqn mreqn; /* multicast request new */ + int ms; /* multicast socket */ + u_short msock_port; + struct sockaddr_in bind_addr; + + int msock_reuse = MCAST_REUSE; + u_char msock_loop = MCAST_LOOP; + u_char msock_ttl = MCAST_TTL; + + if ((ms = socket(PF_INET, SOCK_DGRAM, 0)) < 0) { + crit("socket() failed, error allocating multicast socket"); + } + + // In order to let client and server on same host to use same port + if (setsockopt(ms, SOL_SOCKET, SO_REUSEADDR, + &msock_reuse, sizeof(msock_reuse)) < 0) { + crit("setsockopt() failed, can't set reuse flag"); + } + + if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_TTL, + &msock_ttl,sizeof(msock_ttl)) < 0) { + crit("setsockopt() failed, can't set ttl value"); + } + + if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_LOOP, + &msock_loop, sizeof(msock_loop)) < 0) { + crit("setsockopt() failed, can't set multicast packet looping"); + } + + /* TODO: Do we neet non-block? */ + + log(4, "Using multicast address: %s:%d", + inet_ntoa(maddr->sin_addr), ntohs(maddr->sin_port)); + + mreqn.imr_multiaddr = maddr->sin_addr; + mreqn.imr_address = *local_addr; + mreqn.imr_ifindex = 0; + + /* Interface and remote mcast address choosing */ + + /* This tell interface which has mreqn.imr_address to join + mreqn.imr_multiaddr group, it has nothing to do with socket, port, and + mreqn.imr_address. */ + if (setsockopt(ms, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreqn, sizeof(mreqn)) < 0) { + crit("setsockopt() failed, %s can't join multicast group", + inet_ntoa(*local_addr)); + } + + /* local address choosing */ + + /* This tell kernel to use interface which has mreqn.imr_address as device + and mreqn.imr_address as source addr when sending any multicast through + socket. */ + if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_IF, + &mreqn, sizeof(mreqn)) < 0) { + crit("setsockopt() failed to IP_MULTICAST_IF.\n"); + } + + /* Local port choosing, remote port is choosed when calling sendto()! */ + + /* This let us uses specific udp port number as source port when sending + any multicast datagrams. So ip in maddr is useless here and should be + INADDR_ANY, otherwise, bind will return Invalid argument. */ + msock_port = ntohs(maddr->sin_port); + bind_addr = make_addr(0, msock_port); + if (bind(ms, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) { + crit("bind() error, can't bind mcast port"); + } + + return ms; +} diff --git a/code/jasmine/udp-common.h b/code/jasmine/udp-common.h new file mode 100755 index 00000000..861545b4 --- /dev/null +++ b/code/jasmine/udp-common.h @@ -0,0 +1,42 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_UDP_COMMON_H +#define _MCAST_UDP_COMMON_H + +#include +#include +#include + +#define MCAST_DPORT DEF_PORT + +#define MCAST_TTL 8 +/* + * Force reuse + */ +#define MCAST_REUSE 1 + +/* + * Loop to let server host itself can receive data too. + */ +#define MCAST_LOOP 1 +#define MCAST_ADDR_BASE "224.238.0." +#define MCAST_ADDR_SUFFIX "31" + +// Initial data +struct mc_info { + struct sockaddr_in group; +}; + +extern struct mc_info mcinfo; + +// How to capture udp packets: tcpdump -i ethx udp -vv -n + +int init_mcast_socket(struct in_addr *local_addr, struct sockaddr_in *maddr); +#endif -- cgit 1.2.3-korg