summaryrefslogtreecommitdiffstats
path: root/code/jasmine
diff options
context:
space:
mode:
Diffstat (limited to 'code/jasmine')
-rwxr-xr-xcode/jasmine/Makefile.am84
-rw-r--r--code/jasmine/README17
-rwxr-xr-xcode/jasmine/autogen.sh5
-rwxr-xr-xcode/jasmine/buffer.c152
-rwxr-xr-xcode/jasmine/buffer.h64
-rwxr-xr-xcode/jasmine/build-aux/git-version-gen170
-rwxr-xr-xcode/jasmine/client.c313
-rwxr-xr-xcode/jasmine/configure.ac65
-rwxr-xr-xcode/jasmine/jasmine.spec.in33
-rwxr-xr-xcode/jasmine/misc.c73
-rwxr-xr-xcode/jasmine/misc.h39
-rwxr-xr-xcode/jasmine/server-tcp.c550
-rwxr-xr-xcode/jasmine/server-udp.c95
-rwxr-xr-xcode/jasmine/server.c123
-rwxr-xr-xcode/jasmine/server.h63
-rwxr-xr-xcode/jasmine/tcp-common.c60
-rwxr-xr-xcode/jasmine/tcp-common.h18
-rwxr-xr-xcode/jasmine/tcp-queue.c178
-rwxr-xr-xcode/jasmine/tcp-queue.h35
-rwxr-xr-xcode/jasmine/udp-common.c97
-rwxr-xr-xcode/jasmine/udp-common.h42
21 files changed, 2276 insertions, 0 deletions
diff --git a/code/jasmine/Makefile.am b/code/jasmine/Makefile.am
new file mode 100755
index 00000000..dc95a03e
--- /dev/null
+++ b/code/jasmine/Makefile.am
@@ -0,0 +1,84 @@
+# Copyright (c) 2015 ZTE, Inc.
+
+SPEC = $(PACKAGE_NAME).spec
+
+TARFILE = $(PACKAGE_NAME)-$(VERSION).tar.gz
+
+EXTRA_DIST = autogen.sh $(SPEC).in \
+ build-aux/git-version-gen \
+ .version
+
+AUTOMAKE_OPTIONS = foreign
+
+ACLOCAL_AMFLAGS = -I m4
+
+MAINTAINERCLEANFILES = Makefile.in aclocal.m4 configure depcomp \
+ config.guess config.sub missing install-sh \
+ autoheader automake autoconf \
+ autoscan.log configure.scan ltmain.sh test-driver config.h.in
+
+noinst_HEADERS = buffer.h misc.h server.h tcp-common.h tcp-queue.h udp-common.h
+
+dist-clean-local:
+ rm -f autoconf automake autoheader
+
+clean-generic:
+ rm -rf $(SPEC) $(TARFILE)
+
+## make rpm/srpm section.
+
+$(SPEC): $(SPEC).in
+ rm -f $@-t $@
+ ver="$(VERSION)" && \
+ sed \
+ -e "s#@version@#$$ver#g" \
+ $< > $@-t; \
+ chmod a-w $@-t
+ mv $@-t $@
+
+$(TARFILE):
+ $(MAKE) dist
+
+RPMBUILDOPTS = --define "_sourcedir $(abs_builddir)" \
+ --define "_specdir $(abs_builddir)" \
+ --define "_builddir $(abs_builddir)" \
+ --define "_srcrpmdir $(abs_builddir)" \
+ --define "_rpmdir $(abs_builddir)"
+
+srpm: clean
+ $(MAKE) $(SPEC) $(TARFILE)
+ rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) --nodeps -bs $(SPEC)
+
+rpm: clean _version
+ $(MAKE) $(SPEC) $(TARFILE)
+ rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) -ba $(SPEC)
+
+# release/versioning
+BUILT_SOURCES = .version
+.version:
+ echo $(VERSION) > $@-t && mv $@-t $@
+
+dist-hook:
+ echo $(VERSION) > $(distdir)/.tarball-version
+
+.PHONY: _version
+
+_version:
+ cd $(srcdir) && rm -rf autom4te.cache .version && autoreconf -i
+ $(MAKE) $(AM_MAKEFLAGS) Makefile
+
+maintainer-clean-local:
+ rm -rf m4
+
+
+###
+
+bin_PROGRAMS = jasmines jasminec
+
+DEFAULT_INCLUDES = -I. -I/usr/include
+
+jasmines_SOURCES = udp-common.c tcp-common.c buffer.c misc.c server.c server-udp.c server-tcp.c tcp-queue.c
+jasminec_SOURCES = udp-common.c tcp-common.c buffer.c misc.c client.c
+
+jasmines_LDADD = -lpthread
+jasminec_LDADD = -lpthread
diff --git a/code/jasmine/README b/code/jasmine/README
new file mode 100644
index 00000000..221caea0
--- /dev/null
+++ b/code/jasmine/README
@@ -0,0 +1,17 @@
+jasmine: Just A Small Multicast engINE
+
+Installation
+------------
+
+./autogen.sh
+./configure
+make && make install
+
+Usage
+-----
+jasmine server:
+Usage: jasmines local_ip num_of_clients [port]
+
+jasmine client:
+Usage: jasminec <local_ip> <server_ip> [port]
+
diff --git a/code/jasmine/autogen.sh b/code/jasmine/autogen.sh
new file mode 100755
index 00000000..5bf25ecc
--- /dev/null
+++ b/code/jasmine/autogen.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+# Run this to generate all the initial makefiles, etc.
+mkdir -p m4
+echo Building configuration system...
+autoreconf -i
diff --git a/code/jasmine/buffer.c b/code/jasmine/buffer.c
new file mode 100755
index 00000000..0901d302
--- /dev/null
+++ b/code/jasmine/buffer.c
@@ -0,0 +1,152 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "buffer.h"
+#include "misc.h"
+
+struct buffer_ctl buffctl;
+struct packet_ctl *packetctl[PACKETS_PER_BUFFER];
+struct packet_ctl empty;
+
+void buffer_init()
+{
+ int i;
+
+ for (i = 0; i < PACKETS_PER_BUFFER; i++) {
+ packetctl[i] = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE);
+ memset(packetctl[i], 0, PACKET_SIZE);
+ packetctl[i]->data_size = 0;
+ packetctl[i]->seq = 0;
+ }
+
+ buffctl.buffer_id = 0;
+ buffctl.packet_id_base = 0;
+ buffctl.pkt_count = 0;
+ empty.data_size = 0;
+}
+
+void packetctl_precheck()
+{
+ int i;
+
+ for (i = 0; i < PACKETS_PER_BUFFER; i++) {
+ if (packetctl[i]->data_size != 0) {
+ crit("Error precheck packet slot %d,%d", i, packetctl[i]->data_size);
+ }
+ }
+}
+
+// Calculate packet checksum
+uint32_t packet_csum(uint8_t *data, int len)
+{
+ uint32_t *item, sum = 0;
+ int i = 0;
+
+ while (len % 4) {
+ data[len] = 0; len++;
+ }
+
+ while (i < len) {
+ item = (uint32_t *)((char *)data + i);
+ sum += *item;
+ i += 4;
+ }
+
+ return sum;
+}
+
+// Fill buffers from file descriptor
+long buffer_fill(int fd)
+{
+ int s = 0;
+ int r = PACKET_PAYLOAD_SIZE;
+
+ buffctl.buffer_id++;
+ buffctl.packet_id_base += buffctl.pkt_count;
+ buffctl.buffer_size = 0;
+ while (r > 0 && s < PACKETS_PER_BUFFER) {
+ if ((r = read(fd, packetctl[s]->data, PACKET_PAYLOAD_SIZE)) < 0) {
+ crit("Error reading data from stdin");
+ }
+
+ // r == 0 means EOF
+
+ if (r > 0) {
+ buffctl.buffer_size += r;
+ packetctl[s]->data_size = r;
+ packetctl[s]->seq = buffctl.packet_id_base + s;
+ packetctl[s]->crc = packet_csum(packetctl[s]->data, r);
+ s++;
+ }
+ }
+
+ log(6, "input %d bytes of data in %d packets", buffctl.buffer_size, s);
+ buffctl.pkt_count = s;
+ return s;
+}
+
+long buffer_flush(int fd)
+{
+ int s = 0;
+ while (buffctl.pkt_count--) {
+ write(fd, packetctl[s]->data, packetctl[s]->data_size);
+ buffctl.buffer_size -= packetctl[s]->data_size;
+ packetctl[s]->data_size = 0;
+ packetctl[s]->seq = 0;
+ packetctl[s]->crc = 0;
+ s++;
+ }
+
+ return s;
+}
+
+/* Caller should use new_pkt as packet buffer again if this returns NULL */
+struct packet_ctl *packet_put(struct packet_ctl *new_pkt)
+{
+ struct packet_ctl *freed_pkt;
+ uint32_t i;
+
+ if (new_pkt->seq < buffctl.packet_id_base ||
+ new_pkt->seq - buffctl.packet_id_base >= buffctl.pkt_count) {
+ return NULL;
+ }
+
+ if (new_pkt->data_size == 0) {
+ return NULL;
+ }
+
+ i = packet_csum(new_pkt->data, new_pkt->data_size);
+ if (new_pkt->crc != i) {
+ return NULL;
+ }
+
+ i = new_pkt->seq - buffctl.packet_id_base;
+ freed_pkt = packetctl[i];
+ packetctl[i] = new_pkt;
+ return freed_pkt;
+}
+
+struct packet_ctl *packet_get(uint32_t seq)
+{
+ empty.seq = seq;
+
+ if (seq < buffctl.packet_id_base ||
+ seq - buffctl.packet_id_base >= buffctl.pkt_count) {
+ return &empty;
+ }
+
+ if (packetctl[seq - buffctl.packet_id_base]->data_size == 0) {
+ return &empty;
+ }
+
+ return packetctl[seq - buffctl.packet_id_base];
+}
diff --git a/code/jasmine/buffer.h b/code/jasmine/buffer.h
new file mode 100755
index 00000000..e899fe62
--- /dev/null
+++ b/code/jasmine/buffer.h
@@ -0,0 +1,64 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_BUFFER_H
+#define _MCAST_BUFFER_H
+
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/udp.h>
+
+#define MTU 1500
+
+#define PACKET_SIZE ((MTU) - sizeof(struct iphdr) - sizeof(struct udphdr))
+
+/* Exclude padding */
+#define PACKET_PAYLOAD_SIZE (((PACKET_SIZE) - sizeof(struct packet_ctl)) & ~0x3)
+
+#define PACKETS_PER_BUFFER 1024
+
+#define DEF_PORT 18383 /* for both UDP and TCP */
+
+/* Buffer header (align to 4 Byte) */
+struct buffer_ctl {
+ uint32_t buffer_id;
+ uint32_t buffer_size;
+ uint32_t packet_id_base;
+ uint32_t pkt_count;
+};
+
+/* Packet header (align to 4 Byte) */
+struct packet_ctl {
+ uint32_t seq;
+ uint32_t crc;
+ uint32_t data_size;
+ uint8_t data[0];
+};
+
+#define CLIENT_READY 0x1
+#define CLIENT_REQ 0x2
+#define CLIENT_DONE 0x4
+#define SERVER_SENT 0x8
+
+/* Retransmition Request Header (align to 4 Byte) */
+struct request_ctl {
+ uint32_t req_count; /* Requested packet slot count */
+};
+
+extern struct buffer_ctl buffctl;
+extern struct packet_ctl *packetctl[PACKETS_PER_BUFFER];
+
+void buffer_init();
+long buffer_fill(int fd);
+long buffer_flush(int fd);
+struct packet_ctl *packet_put(struct packet_ctl *new_pkt);
+struct packet_ctl *packet_get(uint32_t seq);
+void packetctl_precheck();
+
+#endif
diff --git a/code/jasmine/build-aux/git-version-gen b/code/jasmine/build-aux/git-version-gen
new file mode 100755
index 00000000..c3d53f36
--- /dev/null
+++ b/code/jasmine/build-aux/git-version-gen
@@ -0,0 +1,170 @@
+#!/bin/sh
+# Print a version string.
+scriptversion=2010-10-13.20; # UTC
+
+# Copyright (C) 2007-2010 Free Software Foundation, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# This script is derived from GIT-VERSION-GEN from GIT: http://git.or.cz/.
+# It may be run two ways:
+# - from a git repository in which the "git describe" command below
+# produces useful output (thus requiring at least one signed tag)
+# - from a non-git-repo directory containing a .tarball-version file, which
+# presumes this script is invoked like "./git-version-gen .tarball-version".
+
+# In order to use intra-version strings in your project, you will need two
+# separate generated version string files:
+#
+# .tarball-version - present only in a distribution tarball, and not in
+# a checked-out repository. Created with contents that were learned at
+# the last time autoconf was run, and used by git-version-gen. Must not
+# be present in either $(srcdir) or $(builddir) for git-version-gen to
+# give accurate answers during normal development with a checked out tree,
+# but must be present in a tarball when there is no version control system.
+# Therefore, it cannot be used in any dependencies. GNUmakefile has
+# hooks to force a reconfigure at distribution time to get the value
+# correct, without penalizing normal development with extra reconfigures.
+#
+# .version - present in a checked-out repository and in a distribution
+# tarball. Usable in dependencies, particularly for files that don't
+# want to depend on config.h but do want to track version changes.
+# Delete this file prior to any autoconf run where you want to rebuild
+# files to pick up a version string change; and leave it stale to
+# minimize rebuild time after unrelated changes to configure sources.
+#
+# It is probably wise to add these two files to .gitignore, so that you
+# don't accidentally commit either generated file.
+#
+# Use the following line in your configure.ac, so that $(VERSION) will
+# automatically be up-to-date each time configure is run (and note that
+# since configure.ac no longer includes a version string, Makefile rules
+# should not depend on configure.ac for version updates).
+#
+# AC_INIT([GNU project],
+# m4_esyscmd([build-aux/git-version-gen .tarball-version]),
+# [bug-project@example])
+#
+# Then use the following lines in your Makefile.am, so that .version
+# will be present for dependencies, and so that .tarball-version will
+# exist in distribution tarballs.
+#
+# BUILT_SOURCES = $(top_srcdir)/.version
+# $(top_srcdir)/.version:
+# echo $(VERSION) > $@-t && mv $@-t $@
+# dist-hook:
+# echo $(VERSION) > $(distdir)/.tarball-version
+
+case $# in
+ 1|2) ;;
+ *) echo 1>&2 "Usage: $0 \$srcdir/.tarball-version" \
+ '[TAG-NORMALIZATION-SED-SCRIPT]'
+ exit 1;;
+esac
+
+tarball_version_file=$1
+tag_sed_script="${2:-s/x/x/}"
+nl='
+'
+
+# Avoid meddling by environment variable of the same name.
+v=
+
+svn log --non-interactive -q --limit 1 > /dev/null 2>&1
+svnwork=$?
+
+# First see if there is a tarball-only version file.
+# then try "git describe", then default.
+if test -f $tarball_version_file
+then
+ v=`cat $tarball_version_file` || exit 1
+ case $v in
+ *$nl*) v= ;; # reject multi-line output
+ [0-9]*) ;;
+ *) v= ;;
+ esac
+ test -z "$v" \
+ && echo "$0: WARNING: $tarball_version_file seems to be damaged" 1>&2
+fi
+
+if test -n "$v"
+then
+ : # use $v
+# Otherwise, if there is at least one git commit involving the working
+# directory, and "git describe" output looks sensible, use that to
+# derive a version string.
+elif test "`git log -1 --pretty=format:x . 2>&1`" = x \
+ && v=`git describe --abbrev=4 --match='v*' HEAD 2>/dev/null \
+ || git describe --abbrev=4 HEAD 2>/dev/null` \
+ && v=`printf '%s\n' "$v" | sed "$tag_sed_script"` \
+ && case $v in
+ v[0-9]*) ;;
+ *) (exit 1) ;;
+ esac
+then
+ # Is this a new git that lists number of commits since the last
+ # tag or the previous older version that did not?
+ # Newer: v6.10-77-g0f8faeb
+ # Older: v6.10-g0f8faeb
+ case $v in
+ *-*-*) : git describe is okay three part flavor ;;
+ *-*)
+ : git describe is older two part flavor
+ # Recreate the number of commits and rewrite such that the
+ # result is the same as if we were using the newer version
+ # of git describe.
+ vtag=`echo "$v" | sed 's/-.*//'`
+ numcommits=`git rev-list "$vtag"..HEAD | wc -l`
+ v=`echo "$v" | sed "s/\(.*\)-\(.*\)/\1-$numcommits-\2/"`;
+ ;;
+ esac
+
+ # Change the first '-' to a '.', so version-comparing tools work properly.
+ # Remove the "g" in git describe's output string, to save a byte.
+ v=`echo "$v" | sed 's/-/./;s/\(.*\)-g/\1-/'`;
+elif [ "$svnwork" -eq "0" ] ;
+then
+ vtag="1.0.0" # we do not have git tag in svn, so here just hard coded it.
+ v=`svn log -q --limit 1 | grep "|" | awk '{print $1}'`
+ v=`echo "$v" |sed 's/^r//'`
+ v="$vtag.$v"
+else
+ v=UNKNOWN
+fi
+
+v=`echo "$v" |sed 's/^v//'`
+
+# Don't declare a version "dirty" merely because a time stamp has changed.
+git update-index --refresh > /dev/null 2>&1
+
+dirty=`sh -c 'git diff-index --name-only HEAD' 2>/dev/null` || dirty=
+case "$dirty" in
+ '') ;;
+ *) # Append the suffix only if there isn't one already.
+ case $v in
+ *-dirty) ;;
+ *) v="$v-dirty" ;;
+ esac ;;
+esac
+
+# Omit the trailing newline, so that m4_esyscmd can use the result directly.
+echo "$v" | tr -d "$nl"
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-time-zone: "UTC"
+# time-stamp-end: "; # UTC"
+# End:
diff --git a/code/jasmine/client.c b/code/jasmine/client.c
new file mode 100755
index 00000000..7beda75d
--- /dev/null
+++ b/code/jasmine/client.c
@@ -0,0 +1,313 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "tcp-common.h"
+#include "misc.h"
+
+/* Global statistics */
+long tot_dups = 0;
+long tot_reps = 0;
+long tot_pkts = 0;
+long long tot_size = 0;
+
+void recv_mcinfo(int tcp_socket)
+{
+ int res;
+
+ res = read_all(tcp_socket, &mcinfo, sizeof(struct mc_info));
+ if (res != sizeof(struct mc_info)) {
+ crit("Error while reading initial data");
+ }
+}
+
+void recv_buffctl(int tcp_socket)
+{
+ int res;
+
+ res = read_all(tcp_socket, &buffctl, sizeof(struct buffer_ctl));
+ if (res != sizeof(struct buffer_ctl)) {
+ crit("Error reading buffer control");
+ }
+}
+
+void send_client_ready(int tcp_socket)
+{
+ uint8_t cmd;
+
+ /* Tell TCP server we are ready to receive */
+ cmd = CLIENT_READY;
+ write(tcp_socket, &cmd, 1);
+}
+
+/* Read out the SERVER_SENT signal */
+void recv_server_sent(int tcp_socket)
+{
+ uint8_t cmd;
+ int res;
+
+ res = read_all(tcp_socket, &cmd, 1);
+ if (res != 1) {
+ //if (read(tcp_socket, &cmd, 1) < 1) {
+ crit("Error reading SERVER_SENT");
+ }
+
+ if (cmd != SERVER_SENT) {
+ crit("Error reading SERVER_SENT %d", cmd);
+ }
+}
+
+void send_client_request(int tcp_socket, struct request_ctl *req)
+{
+ uint8_t cmd;
+
+ /* Tell TCP server we are ready to receive */
+ cmd = CLIENT_REQ;
+ write(tcp_socket, &cmd, 1);
+ write(tcp_socket, req,
+ sizeof(struct request_ctl) + (req->req_count) * sizeof(uint32_t));
+}
+
+void recv_server_ack(int tcp_socket, struct packet_ctl *pkt)
+{
+ int res;
+
+ res = read_all(tcp_socket, pkt, sizeof(struct packet_ctl));
+ if (res != sizeof(struct packet_ctl)) {
+ crit("Error on tcp socket");
+ }
+
+ res = read_all(tcp_socket,
+ ((char *)pkt) + sizeof(struct packet_ctl),
+ pkt->data_size);
+ if (res != pkt->data_size) {
+ crit("Error on tcp socket received %d of %d",
+ res, pkt->data_size);
+ }
+}
+
+void tcp_retransmition(int tcp_socket,
+ struct packet_ctl **curr_pkt,
+ struct packet_ctl **freed_pkt)
+{
+ struct request_ctl *reqctl;
+ uint32_t *reqbody;
+ uint8_t rqbuf[sizeof(struct request_ctl) + PACKETS_PER_BUFFER * sizeof(uint32_t)];
+ uint32_t l;
+
+ reqctl = (struct request_ctl *)rqbuf;
+ reqbody = (uint32_t *)(rqbuf + sizeof(struct request_ctl));
+
+ reqctl->req_count = 0;
+ for (l = 0; l < buffctl.pkt_count; l++) {
+ if (!packetctl[l]->data_size) {
+ log(6, "Requesting packet %u", l + buffctl.packet_id_base);
+ reqbody[reqctl->req_count] = l + buffctl.packet_id_base;
+ reqctl->req_count++;
+ }
+ }
+
+ if (reqctl->req_count > 0) {
+ send_client_request(tcp_socket, reqctl);
+
+ /* read retransmitted blocks via TCP */
+ for (l = 0; l < reqctl->req_count; l++) {
+ if (*freed_pkt) {
+ *curr_pkt = *freed_pkt;
+ }
+
+ recv_server_ack(tcp_socket, *curr_pkt);
+
+ *freed_pkt = packet_put(*curr_pkt);
+ if (!(*freed_pkt)) {
+ crit("Malformed packet on tcp socket");
+ }
+ if ((*freed_pkt)->data_size != 0) {
+ crit("Malformed free packet slot or TCP data");
+ }
+
+ log(6, "Received retran packet %u", (*curr_pkt)->seq);
+ }
+
+ tot_reps += reqctl->req_count;
+ }
+}
+
+/* Returns how many good packets received from UDP */
+int recv_mcast(int tcp_socket, int udp_socket,
+ struct packet_ctl **curr_pkt,
+ struct packet_ctl **freed_pkt)
+{
+ int rcv_pkt_count;
+ int got_sent;
+ int maxfd;
+ fd_set rfds;
+ struct timeval tv;
+ int res;
+
+ maxfd = tcp_socket;
+ if (maxfd < udp_socket) {
+ maxfd = udp_socket;
+ }
+ maxfd++;
+ FD_ZERO(&rfds);
+
+ rcv_pkt_count = 0;
+ got_sent = 0;
+
+ if (buffctl.pkt_count != 0) {
+ do {
+ FD_SET(tcp_socket, &rfds);
+ FD_SET(udp_socket, &rfds);
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+
+ res = select(maxfd, &rfds, 0, 0, &tv);
+ if (res < 0) {
+ crit("select error");
+ }
+
+ if (res == 0) {
+ crit("select timed out");
+ }
+
+ /* Read multicast packet */
+ if (FD_ISSET(udp_socket, &rfds)) {
+ log(7, "Reading multicast packet");
+ if (*freed_pkt) {
+ *curr_pkt = *freed_pkt;
+ }
+
+ res = recv(udp_socket, *curr_pkt, PACKET_SIZE, 0);
+ if (res <= 0) {
+ crit("error on multicast socket");
+ }
+
+ if (res < sizeof(struct packet_ctl) ) {
+ log(7, "Truncated packet received (%d bytes)", res);
+ } else if (res != (*curr_pkt)->data_size + sizeof(struct packet_ctl)) {
+ log(7,
+ "Truncated packet received (%d of %ld bytes)",
+ res, (*curr_pkt)->data_size + sizeof(struct packet_ctl));
+ } else {
+ log(9, "Normal packet seq:%d", (*curr_pkt)->seq);
+ (*freed_pkt) = packet_put((*curr_pkt));
+ if (!(*freed_pkt)) {
+ log(5, "Malformed packet");
+ } else {
+ if ((*freed_pkt)->data_size == 0) {
+ rcv_pkt_count++;
+ } else {
+ log(6, "Duplicated packet");
+ tot_dups++;
+ }
+ }
+ }
+ } else if (FD_ISSET(tcp_socket, &rfds)) {
+ /* Check TCP, only if there was no more data from UDP */
+ log(6, "No more data path1");
+ recv_server_sent(tcp_socket);
+ got_sent = 1;
+ break;
+ }
+ } while (rcv_pkt_count < buffctl.pkt_count);
+ }
+
+ if (got_sent == 0) {
+ log(6, "No more data path2");
+ recv_server_sent(tcp_socket);
+ }
+
+ return rcv_pkt_count;
+}
+
+void send_client_done(int tcp_socket)
+{
+ uint8_t cmd;
+
+ /* Tell TCP server we are ready to receive */
+ cmd = CLIENT_DONE;
+ write(tcp_socket, &cmd, 1);
+}
+
+int main(int argc, char *argv[])
+{
+ int ms, ts;
+ struct packet_ctl *alloc_pkt, *curr_pkt, *freed_pkt;
+ int udp_rcv_count;
+ u_short port = DEF_PORT;
+ struct in_addr local_addr;
+
+ if (argc < 3) {
+ printf("Usage: %s <local_ip> <server_ip> [port]\n", argv[0]);
+ return 0;
+ }
+
+ if (!inet_aton(argv[1], &local_addr)) {
+ crit("can not resolve address: %s", argv[1]);
+ }
+
+ if (argc > 3) {
+ port = atoi(argv[3]);
+ if (!port) {
+ port = DEF_PORT;
+ }
+ }
+
+ buffer_init();
+ /* Init first time packet slot */
+ alloc_pkt = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE);
+ memset(alloc_pkt, 0, PACKET_SIZE);
+ freed_pkt = curr_pkt = alloc_pkt;
+
+ ts = init_tcp_client_socket(argv[2], port);
+ recv_mcinfo(ts);
+ ms = init_mcast_socket(&local_addr, &mcinfo.group);
+ /* Do we need set_nonblock(ms)??? ; */
+
+ /* Will do dummy run even if buffctl.pkt_count is zero at the first round */
+ do { /* one buffer a round */
+ packetctl_precheck();
+ recv_buffctl(ts);
+ send_client_ready(ts);
+
+ udp_rcv_count = recv_mcast(ts, ms, &curr_pkt,&freed_pkt);
+ if (udp_rcv_count == buffctl.pkt_count) {
+ log(6, "All packets of current buffer received from UDP");
+ } else {
+ tcp_retransmition(ts, &curr_pkt, &freed_pkt);
+ }
+
+ tot_pkts += buffctl.pkt_count;
+ tot_size += buffctl.buffer_size;
+ log(1, "\rBuffer received %lld Bytes, %ld Packets(%ld Repeats %ld Dups)",
+ tot_size, tot_pkts, tot_reps, tot_dups);
+
+ if (buffctl.pkt_count) {
+ buffer_flush(STDOUT_FILENO);
+ }
+
+ send_client_done(ts);
+ } while (buffctl.pkt_count != 0);
+
+ shutdown(ts, 2);
+ close(ts);
+ close(ms);
+ log(1, "All buffers receive done.\n");
+ return 0;
+}
diff --git a/code/jasmine/configure.ac b/code/jasmine/configure.ac
new file mode 100755
index 00000000..8f42bab2
--- /dev/null
+++ b/code/jasmine/configure.ac
@@ -0,0 +1,65 @@
+# -*- Autoconf -*-
+# Process this file with autoconf to produce a configure script.
+
+# bootstrap / init
+AC_PREREQ([2.61])
+
+AC_INIT([jasmine],
+ m4_esyscmd([build-aux/git-version-gen .tarball-version]),
+ [hu.zhijiang@zte.com.cn])
+
+AC_USE_SYSTEM_EXTENSIONS
+
+AM_INIT_AUTOMAKE([-Wno-portability])
+
+AC_CONFIG_HEADER([config.h])
+
+AC_CONFIG_MACRO_DIR([m4])
+
+AC_SUBST(WITH_LIST, [""])
+
+dnl Fix default variables - "prefix" variable if not specified
+if test "$prefix" = "NONE"; then
+ prefix="/usr"
+
+ dnl Fix "localstatedir" variable if not specified
+ if test "$localstatedir" = "\${prefix}/var"; then
+ localstatedir="/var"
+ fi
+ dnl Fix "sysconfdir" variable if not specified
+ if test "$sysconfdir" = "\${prefix}/etc"; then
+ sysconfdir="/etc"
+ fi
+ dnl Fix "libdir" variable if not specified
+ if test "$libdir" = "\${exec_prefix}/lib"; then
+ if test -e /usr/lib64; then
+ libdir="/usr/lib64"
+ else
+ libdir="/usr/lib"
+ fi
+ fi
+fi
+
+# Checks for programs.
+AC_PATH_PROG([BASHPATH], [bash])
+
+AC_CONFIG_FILES([Makefile])
+
+PACKAGE_FEATURES=""
+
+ENV_CFLAGS="$CFLAGS"
+OPT_CFLAGS=""
+GDB_FLAGS=""
+EXTRA_WARNINGS="-Wall"
+CFLAGS="$ENV_CFLAGS $OPT_CFLAGS $GDB_FLAGS $EXTRA_WARNINGS"
+
+# substitute what we need:
+AC_SUBST([BASHPATH])
+
+AC_OUTPUT
+
+AC_MSG_RESULT([ Version = ${PACKAGE_VERSION}])
+AC_MSG_RESULT([ Final CFLAGS = ${CFLAGS}])
+AC_MSG_RESULT([ Final CPPFLAGS = ${CPPFLAGS}])
+AC_MSG_RESULT([ Final LDFLAGS = ${LDFLAGS}])
+AC_MSG_RESULT([ Features = ${PACKAGE_FEATURES}])
diff --git a/code/jasmine/jasmine.spec.in b/code/jasmine/jasmine.spec.in
new file mode 100755
index 00000000..73172f31
--- /dev/null
+++ b/code/jasmine/jasmine.spec.in
@@ -0,0 +1,33 @@
+Summary: Just A Small Multicast engINE
+Name: jasmine
+Version: @version@
+Release: 1%{?dist}
+Vendor: ZTE
+License: Apache-2.0
+Group: System Environment/Base
+URL: https://wiki.opnfv.org/display/DAIS
+Source: %{name}-%{version}%{?gittarver}.tar.gz
+
+BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX)
+
+%description
+jasmine is used to distribute files over UDP multicast for installers
+
+%prep
+%setup -q -n %{name}-%{version}%{?gittarver}
+
+%build
+if [ ! -f configure ]; then
+ ./autogen.sh
+fi
+
+./configure
+
+%install
+rm -rf %{buildroot}
+make install DESTDIR=%{buildroot}
+
+%files
+%{_bindir}/jasmines
+%{_bindir}/jasminec
+
diff --git a/code/jasmine/misc.c b/code/jasmine/misc.c
new file mode 100755
index 00000000..fefdf690
--- /dev/null
+++ b/code/jasmine/misc.c
@@ -0,0 +1,73 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <fcntl.h>
+
+#include "misc.h"
+
+struct sockaddr_in make_addr(char *addr, unsigned short port)
+{
+ struct sockaddr_in sin;
+
+ sin.sin_family = AF_INET;
+ if (!addr) {
+ sin.sin_addr.s_addr = htonl(INADDR_ANY);
+ } else {
+ if (!inet_aton(addr, &sin.sin_addr))
+ crit("cant resolve address: %s", addr);
+ }
+ sin.sin_port = htons(port);
+ return sin;
+}
+
+/* Better than using a macro */
+void set_nonblock(int s)
+{
+ if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) {
+ crit("set_nonblock failed, can't set O_NONBLOCK");
+ }
+}
+
+void * wrapper_malloc(size_t size)
+{
+ void * res;
+
+ if (size == 0) {
+ crit("wrapper_malloc: malloc 0 size not allowed");
+ }
+
+ res = malloc(size);
+ if (res == NULL) {
+ crit("wrapper_malloc: malloc failed");
+ }
+
+ return res;
+}
+
+size_t read_all(int fd, void *buf, size_t count)
+{
+ size_t t = 0;
+ size_t r = 0;
+
+ while (count > 0) {
+ r = read(fd, buf + t, count);
+ if (r <= 0) {
+ return t ? t : r;
+ }
+
+ t += r;
+ count -= r;
+ }
+
+ return t;
+}
diff --git a/code/jasmine/misc.h b/code/jasmine/misc.h
new file mode 100755
index 00000000..e60d0b63
--- /dev/null
+++ b/code/jasmine/misc.h
@@ -0,0 +1,39 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_MISC_H
+#define _MCAST_MISC_H
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <error.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#define gettid() syscall(__NR_gettid)
+
+#define level 6
+
+#define crit(x, args...) do { \
+ fprintf(stderr, "\nERROR: "); fprintf(stderr, x, ##args); \
+ error(-1,errno, "\nERROR: [%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \
+} while (0);
+
+#define log(l, f, args...) if (l < level) { \
+ fprintf(stderr, "\n[%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \
+ fprintf(stderr, f, ##args); \
+ fprintf(stderr, "\n"); \
+}
+
+struct sockaddr_in make_addr(char *addr, unsigned short port);
+void * wrapper_malloc(size_t size);
+void set_nonblock(int s);
+size_t read_all(int fd, void *buf, size_t count);
+
+#endif
diff --git a/code/jasmine/server-tcp.c b/code/jasmine/server-tcp.c
new file mode 100755
index 00000000..f5c83b96
--- /dev/null
+++ b/code/jasmine/server-tcp.c
@@ -0,0 +1,550 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <sys/poll.h>
+#include <string.h>
+#include <pthread.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "udp-common.h"
+#include "tcp-queue.h"
+#include "misc.h"
+#include "server.h"
+
+#define MAX_CLIENTS 128 /* Clients per TCP server */
+#define TCP_BUFF_SIZE 65536
+
+struct cdata {
+ struct tcpq *tx;
+ struct tcpq *rx;
+ int await; /* non-zero if we are in the middle of receiving clients requests */
+ struct sockaddr_in peer;
+};
+
+struct server_status_data {
+ int state; /* state machine */
+ int sync_count;
+ int ccount;
+ struct semlink sl;
+ pthread_t pchild;
+
+ struct pollfd ds[MAX_CLIENTS + 1];
+ struct cdata cd[MAX_CLIENTS + 1];
+ int cindex;
+ int last_buff_pkt_count;
+};
+
+void init_sdata(struct server_status_data *sdata, void *thread_args)
+{
+ int i;
+
+ sdata->sl.parent = (struct semaphores *)thread_args;
+ sdata->sl.this = NULL;
+
+ sdata->state = S_PREP;
+ sdata->last_buff_pkt_count = 1;
+ sdata->sync_count = 0;
+
+ for (i = 0; i < MAX_CLIENTS + 1; i++) {
+ sdata->ds[i].fd = -1;
+ sdata->ds[i].events = POLLIN;
+ sdata->cd[i].tx = tcpq_queue_init();
+ sdata->cd[i].rx = tcpq_queue_init();
+ sdata->cd[i].await = 0;
+ }
+}
+
+/* disconnect one client and compress table */
+void kill_client(struct server_status_data *sdata)
+{
+ struct pollfd *ds = sdata->ds;
+ struct cdata *cd = sdata->cd;
+
+ close((ds + sdata->cindex)->fd);
+ tcpq_queue_free((cd + sdata->cindex)->rx);
+ tcpq_queue_free((cd + sdata->cindex)->tx);
+
+ /* Move last slot to this free slot and release the last slot */
+ *(ds + sdata->cindex) = *(ds + sdata->ccount - 1);
+ *(cd + sdata->cindex) = *(cd + sdata->ccount - 1);
+ sdata->cindex--;
+ sdata->ccount--;
+ client_count--;
+}
+
+void send_buffctl_to_all_clients(struct server_status_data *sdata)
+{
+ int i;
+ long send_sz;
+ void *cpy;
+
+ /* copy header to all queues */
+ send_sz = sizeof(struct buffer_ctl);
+ for (i = 1; i < sdata->ccount; i++) {
+ cpy = wrapper_malloc(send_sz);
+ memcpy(cpy, &buffctl, send_sz);
+ tcpq_queue_tail(sdata->cd[i].tx, cpy, send_sz);
+ sdata->ds[i].events |= POLLOUT;
+ }
+}
+
+void send_sent_to_all_clients(struct server_status_data *sdata)
+{
+ int i;
+ void *cpy;
+ uint8_t byte = SERVER_SENT;
+
+ /* Push data to clients */
+ for (i = 1; i < sdata->ccount; i++) {
+ cpy = wrapper_malloc(1);
+ memcpy(cpy, &byte, 1);
+ tcpq_queue_tail(sdata->cd[i].tx, cpy, 1);
+ sdata->ds[i].events |= POLLOUT;
+ }
+}
+void accept_clients(struct server_status_data *sdata)
+{
+ int res;
+ int new_fd;
+ socklen_t socklen;
+ int poll_events;
+ int timeout = -1;
+ struct pollfd *ds = sdata->ds;
+ struct cdata *cd = sdata->cd;
+
+ ds->events = POLLIN;
+ ds->revents = 0;
+ sdata->ccount = 1;
+
+ while (sdata->ccount <= MAX_CLIENTS && client_count < wait_count) {
+ poll_events = poll(ds, 1, timeout);
+ if (poll_events < 0) {
+ crit("poll() failed");
+ } else if (poll_events == 0) {
+ log(2, "poll() returned with no results!");
+ continue;
+ }
+
+ log(9, "poll: %d events", poll_events);
+
+ if (ds->revents) {
+
+ /* new connections come in */
+ if (ds->revents & (POLLIN|POLLPRI)) {
+
+ do {
+ socklen = sizeof((cd + sdata->ccount)->peer);
+retry_accept:
+ new_fd = accept(ds->fd,
+ (struct sockaddr *)&((cd + sdata->ccount)->peer),
+ &socklen);
+ if (new_fd == -1 && errno == EINTR) {
+ goto retry_accept;
+ }
+
+ if (new_fd == -1) {
+ log(2, "accept() returned with error %d", errno);
+ break;
+ }
+
+ /* Send group info, before set non block */
+ res = write(new_fd, &mcinfo, sizeof(struct mc_info));
+ if (res < 0) {
+ log(3, "Error opening connection: %s",
+ inet_ntoa((cd + sdata->ccount)->peer.sin_addr));
+ close(new_fd);
+ continue;
+ }
+
+ /* Can set to non block becasue we will poll it in future. */
+ res = fcntl(new_fd, F_SETFL, O_NONBLOCK);
+ if (res == -1) {
+ log(2, "fcntl() returned with error %d", errno);
+ close(new_fd);
+ continue;
+ }
+
+ (ds + sdata->ccount)->fd = new_fd;
+
+ log(5, "New connection %d: %s",
+ sdata->ccount, inet_ntoa((cd + sdata->ccount)->peer.sin_addr));
+ sdata->ccount++;
+ client_count++;
+ } while (sdata->ccount <= MAX_CLIENTS);
+
+ } else {
+ log(2, "Error on tcp socket");
+ }
+
+ poll_events--;
+ ds->revents = 0;
+ }
+ }
+
+ ds->events = 0;
+}
+
+void accept_clients_may_spawn(struct server_status_data *sdata)
+{
+ /* Wait the parent to let us accpet clients */
+ sl_wait_parent(&(sdata->sl));
+
+ sdata->ds[0].fd = sfd;
+ accept_clients(sdata);
+ if (client_count < wait_count) {
+ sdata->sl.this = spawn_thread(&sdata->pchild, tcp_server_main, 1);
+
+ /* Let child TCP servers accept connections */
+ sl_release_child(&(sdata->sl));
+ /* Wait until all childern TCP servers got all their clients */
+ sl_wait_child(&(sdata->sl));
+ }
+
+ /* Tell parent all client are accepted/connected */
+ sl_release_parent(&(sdata->sl));
+}
+
+void keep_on_receiving_client_request(struct server_status_data *sdata)
+{
+ char buf[TCP_BUFF_SIZE];
+ void *cpy;
+ struct request_ctl *req;
+ uint32_t *rqb;
+ struct packet_ctl *ans;
+ long rq_index;
+ long total_sz;
+
+ long read_out;
+ struct pollfd *ds;
+ struct cdata *cd;
+
+ ds = &(sdata->ds[sdata->cindex]);
+ cd = &(sdata->cd[sdata->cindex]);
+
+ read_out = read(ds->fd, buf, cd->await);
+ if (read_out <= 0) {
+ log(5, "Client disconnected (r): %s",
+ inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+ return;
+ }
+
+ log(7, "New data (%ld + %ld) on conn %d: %s",
+ read_out, tcpq_queue_dsize(cd->rx), sdata->cindex,
+ inet_ntoa(cd->peer.sin_addr));
+ cpy = wrapper_malloc(read_out);
+ memcpy(cpy, buf, read_out);
+ tcpq_queue_tail(cd->rx, cpy, read_out);
+ cd->await -= read_out;
+
+ /* Full header received */
+ if (tcpq_queue_dsize(cd->rx) == sizeof(struct request_ctl)) {
+ cpy = tcpq_queue_flat_peek(cd->rx, &read_out);
+ req = (struct request_ctl *)cpy;
+ cd->await = req->req_count * sizeof(uint32_t);
+ log(6, "Client request for %u packets on %d: %s",
+ req->req_count, sdata->cindex, inet_ntoa(cd->peer.sin_addr));
+ /* req->rqc may be zero? So do not return from here */
+ }
+
+ /* Whole request(struct request_ctl + all rq blocks) received */
+ if (cd->await == 0) {
+ cpy = tcpq_dqueue_flat(cd->rx, &read_out);
+ req = (struct request_ctl *)cpy;
+ rqb = (uint32_t *) (cpy + sizeof(struct request_ctl));
+ for (rq_index = 0; rq_index < req->req_count; rq_index++) {
+ ans = packet_get(*(rqb + rq_index));
+ total_sz = ans->data_size + sizeof(struct packet_ctl);
+ log(6, "Send packet %u (%u bytes) on %d",
+ rqb[rq_index], ans->data_size, sdata->cindex);
+ cpy = wrapper_malloc(total_sz);
+ memcpy(cpy, ans, total_sz);
+ tcpq_queue_tail(cd->tx, cpy, total_sz);
+ }
+
+ if (rq_index > 0) {
+ /* Data need to be sent out */
+ ds->events |= POLLOUT;
+ }
+ }
+}
+
+void handle_error_event(struct server_status_data *sdata)
+{
+ struct cdata *cd;
+
+ cd = &(sdata->cd[sdata->cindex]);
+ log(5, "Closing connection %d: %s",
+ sdata->cindex, inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+}
+
+void handle_client_ready(struct server_status_data *sdata)
+{
+ if (sdata->state == S_SYNC) {
+ sdata->sync_count++;
+
+ log(7, "Client SYNC %d of %d",
+ sdata->sync_count, sdata->ccount - 1);
+
+ if (sdata->sync_count == sdata->ccount - 1) {
+ /* All client SYNC done */
+ sdata->state = S_SEND;
+
+ /* Wait child ready */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent I am ready, release parent */
+ sl_release_parent(&(sdata->sl));
+
+ /* Wait parent to send mcast */
+ sl_wait_parent(&(sdata->sl));
+ /* tell child mcast done, release child */
+ sl_release_child(&(sdata->sl));
+
+ send_sent_to_all_clients(sdata);
+ }
+ }
+}
+
+void handle_client_done(struct server_status_data *sdata)
+{
+ if (sdata->state == S_SEND) {
+ log(7, "Client START %d of %d",
+ sdata->sync_count, sdata->ccount - 1);
+
+ sdata->sync_count--;
+ if (sdata->sync_count == 0) {
+ /* Got all client START */
+ sdata->state = S_PREP;
+ log(7, "All received, now tell UDP Server prepare next buffer");
+ /* Remeber old buffctl.pkt_count before it change */
+ sdata->last_buff_pkt_count = buffctl.pkt_count;
+
+ /* Wait child clients done */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent all our clients are done, release parent */
+ sl_release_parent(&(sdata->sl));
+ }
+ }
+}
+
+void handle_client_events(struct server_status_data *sdata)
+{
+ struct pollfd *ds;
+ struct cdata *cd;
+ int read_out;
+ uint8_t msgtype;
+
+ ds = &(sdata->ds[sdata->cindex]);
+ cd = &(sdata->cd[sdata->cindex]);
+
+ /* read message type */
+ read_out = read(ds->fd, &msgtype, 1);
+ if (read_out <= 0) {
+ log(5, "Client disconnected due to read error %d, ret:%d (r): %s",
+ errno, read_out, inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+ return; /* continue to check other clients */
+ }
+
+ switch (msgtype) {
+ case CLIENT_READY:
+ handle_client_ready(sdata);
+ break;
+ case CLIENT_DONE:
+ handle_client_done(sdata);
+ break;
+ case CLIENT_REQ:
+ /* wait a whole struct request_ctl */
+ cd->await = sizeof(struct request_ctl);
+ break;
+ default:
+ log(4, "Wrong message type from %s",
+ inet_ntoa(cd->peer.sin_addr));
+ break;
+ }
+}
+
+void handle_pullin_event(struct server_status_data *sdata)
+{
+ struct cdata *cd;
+
+ cd = &(sdata->cd[sdata->cindex]);
+
+ /* Await is set only for repeat request */
+ if (cd->await) {
+ keep_on_receiving_client_request(sdata);
+ } else {
+ handle_client_events(sdata);
+ }
+}
+
+void handle_pullout_event(struct server_status_data *sdata)
+{
+ char buf[TCP_BUFF_SIZE];
+ struct pollfd *ds;
+ struct cdata *cd;
+ long transmit_sz;
+ void *cpy;
+ long written_in;
+
+ ds = &(sdata->ds[sdata->cindex]);
+ cd = &(sdata->cd[sdata->cindex]);
+
+ if (cd->tx->count == 0) {
+ ds->events = POLLIN; /* Just make sure */
+ return;
+ }
+
+ log(7, "handle_pullout_event servs No.%d conn", sdata->cindex);
+
+ /* If there is some data to be sent, try to do it now */
+ cpy = tcpq_dequeue_head(cd->tx, &transmit_sz);
+ memcpy(buf, cpy, transmit_sz);
+ free(cpy);
+
+ written_in = write(ds->fd, buf, transmit_sz);
+ if (written_in <= 0) {
+ log(5, "Client disconnected (w): %s",
+ inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+ return;
+ }
+
+ if (written_in != transmit_sz) {
+ log(6, "Partial wrote: %ld out of %ld bytes sent",
+ written_in, transmit_sz);
+ cpy = wrapper_malloc(transmit_sz - written_in);
+ memcpy(cpy, buf + written_in, transmit_sz - written_in);
+ tcpq_queue_head(cd->tx, cpy, transmit_sz - written_in);
+ } else {
+ log(7, "Sent %ld bytes to %s",
+ written_in, inet_ntoa(cd->peer.sin_addr));
+ if (cd->tx->count == 0) {
+ /* Do not listen pollout event anymore */
+ ds->events = POLLIN;
+ }
+ }
+}
+
+void check_clients(struct server_status_data *sdata)
+{
+ int poll_events;
+ struct pollfd *ds;
+ int timeout = -1;
+
+ if (sdata->ccount == 1) {
+ log(5, "No more clients, start exiting. s,c,p:%d,%d,%d",
+ sdata->state, sdata->ccount, buffctl.pkt_count);
+ /* No client existed */
+ switch(sdata->state) {
+ case S_SYNC:
+ /* Wait all children ready */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent I am ready, release parent */
+ sl_release_parent(&(sdata->sl));
+
+ /* Wait parent to send mcast */
+ sl_wait_parent(&(sdata->sl));
+ /* Tell child mcast done, release child */
+ sl_release_child(&(sdata->sl));
+ case S_SEND:
+ sdata->state = S_PREP;
+ /* Remeber old buffctl.pkt_count before it change */
+ sdata->last_buff_pkt_count = buffctl.pkt_count;
+
+ /* Wait child clients done */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent all our clients are done */
+ sl_release_parent(&(sdata->sl));
+ }
+ /* return to main loop to finish the dummy run */
+ return;
+ }
+
+ /* poll clients only */
+ poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout);
+ if (poll_events < 0) {
+ crit("poll() failed");
+ } else if (poll_events == 0) {
+ log(2, "poll() returned with no results!");
+ return;
+ }
+
+ log(9, "poll clients: %d events", poll_events);
+
+ /* check all connected clients */
+ for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) {
+ ds = &(sdata->ds[sdata->cindex]);
+ if (!ds->revents) {
+ continue;
+ }
+
+ if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) {
+ handle_error_event(sdata);
+ } else if (ds->revents & (POLLIN|POLLPRI)) {
+ handle_pullin_event(sdata);
+ } else if (ds->revents & POLLOUT) {
+ handle_pullout_event(sdata);
+ }
+ }
+}
+
+void * tcp_server_main(void *args)
+{
+ int i;
+ struct server_status_data ssdata;
+ struct server_status_data *sdata;
+
+ sdata = &ssdata;
+ init_sdata(sdata, args);
+
+ accept_clients_may_spawn(sdata);
+
+ while (1) {
+ if (sdata->state == S_PREP) {
+ /* Wait UDP server to prepare the buffctl of this round to send */
+ sl_wait_parent(&(sdata->sl));
+ log(6, "After waiting UDP server preparation. s,c,p:%d,%d,%d",
+ sdata->state, sdata->ccount - 1, buffctl.pkt_count);
+ /* Tell children buffer pareparation done, release child */
+ sl_release_child(&(sdata->sl));
+ send_buffctl_to_all_clients(sdata);
+ sdata->state = S_SYNC;
+ }
+
+ check_clients(sdata);
+
+ if (sdata->state == S_PREP && sdata->last_buff_pkt_count == 0) {
+ log(5, "No more output, TCP server finished");
+ if (sdata->sl.this && pthread_join(sdata->pchild, 0) < 0) {
+ crit("pthread_join()");
+ }
+
+ /* shutdown conn to clients */
+ for (i = sdata->ccount - 1; i > 0; i--) {
+ shutdown((sdata->ds[i]).fd, 2);
+ }
+
+ if (!sdata->sl.this) {
+ /* leaf server */
+ close(sfd);
+ sfd = -1;
+ }
+ return 0;
+ }
+ }
+}
diff --git a/code/jasmine/server-udp.c b/code/jasmine/server-udp.c
new file mode 100755
index 00000000..e8a62c25
--- /dev/null
+++ b/code/jasmine/server-udp.c
@@ -0,0 +1,95 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <pthread.h>
+#include <signal.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "misc.h"
+#include "server.h"
+
+void * udp_server_main(void *args)
+{
+ pthread_t pchild;
+ struct sockaddr_in maddr;
+ int ms;
+ int i;
+ struct semlink sl;
+ char adr[128];
+ long long total = 0;
+
+ sprintf(adr, "%s%s", MCAST_ADDR_BASE, MCAST_ADDR_SUFFIX);
+ maddr = make_addr(adr, server_port);
+ mcinfo.group = maddr;
+ ms = init_mcast_socket(&local_addr, &maddr);
+
+ sl.this = spawn_thread(&pchild, tcp_server_main, 1);
+ sl.parent = NULL;
+
+ buffer_init();
+
+ /* Let TCP servers accept connections */
+ sl_release_child(&sl);
+ /* Wait until all TCP servers got all their clients */
+ sl_wait_child(&sl);
+
+ log(7, "UDP Server: All clients were accepted");
+
+ do {
+ /* one buffer round */
+
+ i = buffer_fill(STDIN_FILENO);
+
+ if (!client_count) {
+ /* No need to send, make it the last run of main loop, also let
+ TCP Server to do dummy run right away(not waiting until all
+ data sent). */
+ i = 0;
+ buffctl.pkt_count = 0;
+ }
+
+ log(7, "Signal TCP Server to send buffctl");
+
+ /* Tell tcp to Send headers, release child */
+ sl_release_child(&sl);
+ /* Wait all clients ready to start receiving */
+ sl_wait_child(&sl);
+
+ /* multicast data */
+ while (i--) {
+ sendto(ms, packetctl[i],
+ packetctl[i]->data_size + sizeof(struct packet_ctl), 0,
+ (struct sockaddr *)&maddr, sizeof(maddr));
+ }
+
+ log(7, "send finish");
+ /* Tell tcp to Send SERVER_SENT, release child */
+ sl_release_child(&sl);
+ /* wait tcp to send SERVER_SENT */
+ sl_wait_child(&sl);
+
+ total = total + buffctl.buffer_size;
+ log(1, "Buffer Done: sent %lld bytes", total);
+ } while (buffctl.pkt_count);
+
+
+ if (pthread_join(pchild, 0) < 0) {
+ crit("pthread_join() TCP Server");
+ }
+
+ log(1, "All Done");
+ close(ms);
+ return 0;
+}
diff --git a/code/jasmine/server.c b/code/jasmine/server.c
new file mode 100755
index 00000000..cf5d398e
--- /dev/null
+++ b/code/jasmine/server.c
@@ -0,0 +1,123 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <pthread.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "misc.h"
+#include "server.h"
+
+/* Global variables shared among server*.c */
+int wait_count = 0;
+int client_count = 0;
+int server_port = DEF_PORT; /* Currently for both UDP and TCP */
+struct in_addr local_addr;
+/* listen socket - global for all TCP threads */
+int sfd = -1;
+
+void init_semaphores(struct semaphores *sp)
+{
+ if (sem_init(&sp->wait_parent, 0, 0) < 0) {
+ crit("sem_init() wait_parent");
+ }
+
+ if (sem_init(&sp->wait_child, 0, 0) < 0) {
+ crit("sem_init() wait_child");
+ }
+}
+
+void sl_wait_child(struct semlink *sl)
+{
+ if (sl->this) {
+ P(sl->this->wait_child);
+ }
+}
+
+void sl_release_child(struct semlink *sl)
+{
+ if (sl->this) {
+ V(sl->this->wait_parent);
+ }
+}
+
+void sl_wait_parent(struct semlink *sl)
+{
+ P(sl->parent->wait_parent);
+}
+
+void sl_release_parent(struct semlink *sl)
+{
+ V(sl->parent->wait_child);
+}
+
+/* Wrapper for pthread_create, additionaly may initialize/pass thru
+ a semaphores parameter */
+struct semaphores * spawn_thread(pthread_t *pchild,
+ void *(*entry)(void *),
+ int create_sems)
+{
+ struct semaphores *sp = NULL;
+
+ if (create_sems != 0) {
+ sp = (struct semaphores *) wrapper_malloc(sizeof(struct semaphores));
+ init_semaphores(sp);
+ }
+
+ if (pthread_create(pchild, 0, entry, (void *)sp) != 0) {
+ crit("pthread_create");
+ }
+ return sp;
+}
+
+int main(int argc, char *argv[])
+{
+ pthread_t pchild;
+
+ log(9, "buffer size:%ld, buffer head size:%ld, data size:%ld",
+ PACKET_SIZE, sizeof(struct packet_ctl), PACKET_PAYLOAD_SIZE);
+
+ if (argc < 3) {
+ printf("Usage: %s local_ip num_of_clients [port]\n", argv[0]);
+ return -1;
+ }
+
+ if (!inet_aton(argv[1], &local_addr)) {
+ crit("can not resolve address: %s", argv[1]);
+ }
+
+ wait_count = atoi(argv[2]);
+ if (!wait_count) {
+ crit("can not serv to 0 client\n");
+ }
+
+ if (argc > 3) {
+ server_port = atoi(argv[3]);
+ if (!server_port) {
+ server_port = DEF_PORT;
+ }
+ }
+
+ /* sfd is shared by all TCP threads as sdata->ds[0].fd */
+ sfd = init_tcp_server_socket(0, server_port);
+ set_nonblock(sfd);
+
+ /* start UDP servers thread */
+ (void)spawn_thread(&pchild, udp_server_main, 0);
+ if (pthread_join(pchild, 0) < 0) {
+ crit("pthread_join() UDP Server");
+ }
+
+ return 0;
+}
diff --git a/code/jasmine/server.h b/code/jasmine/server.h
new file mode 100755
index 00000000..1e7a6409
--- /dev/null
+++ b/code/jasmine/server.h
@@ -0,0 +1,63 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_SERVER_H
+#define _MCAST_SERVER_H
+
+#include <pthread.h>
+#include <semaphore.h>
+
+struct semaphores {
+ sem_t wait_parent;
+ sem_t wait_child;
+};
+
+struct semlink {
+ struct semaphores *this; /* used by parent to point to the struct semaphores
+ which it created during spawn child. */
+ struct semaphores *parent; /* used by child to point to the struct
+ semaphores which it created by parent */
+};
+
+void sl_wait_child(struct semlink *sl);
+void sl_release_child(struct semlink *sl);
+void sl_wait_parent(struct semlink *sl);
+void sl_release_parent(struct semlink *sl);
+
+/* Server state machine */
+#define S_PREP 0
+#define S_SYNC 1
+#define S_SEND 2
+
+extern int wait_count;
+extern int client_count;
+extern int server_port;
+extern struct in_addr local_addr;
+extern int sfd;
+
+void *udp_server_main(void *args);
+void *tcp_server_main(void *args);
+void init_semaphores(struct semaphores *sp);
+struct semaphores * spawn_thread(pthread_t *pchild,
+ void *(*entry)(void *),
+ int create_sems);
+
+#define V(x) do { \
+ if (sem_post(&x) < 0) { \
+ crit("sem_post()"); \
+ } \
+} while(0)
+
+#define P(x) do { \
+ if (sem_wait(&x) != 0) { \
+ crit("sem_wait()"); \
+ } \
+} while(0)
+
+#endif
diff --git a/code/jasmine/tcp-common.c b/code/jasmine/tcp-common.c
new file mode 100755
index 00000000..62e2bb26
--- /dev/null
+++ b/code/jasmine/tcp-common.c
@@ -0,0 +1,60 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "misc.h"
+
+int init_tcp_server_socket(char *addr, unsigned short port)
+{
+ int s;
+ struct sockaddr_in sin;
+
+ port = port ? port : TCP_DPORT;
+ sin = make_addr(addr, port);
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ crit("socket() failed");
+ }
+
+ if (bind(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) {
+ crit("bind() failed, error binding tcp socket");
+ }
+
+ if (listen(s, 5) < 0) {
+ crit("listen() failed, error listening on socket");
+ }
+
+ log(4, "init_tcp_server_socket %s:%d", inet_ntoa(sin.sin_addr), port);
+ return s;
+}
+
+int init_tcp_client_socket(char *addr, unsigned short port)
+{
+ int s;
+ struct sockaddr_in sin;
+
+ port = port ? port : TCP_DPORT;
+ sin = make_addr(addr, port);
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ crit("socket() failed");
+ }
+
+ if (connect(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) {
+ crit("connect() failed");
+ }
+
+ log(4, "init_tcp_client_socket %s:%d", inet_ntoa(sin.sin_addr), port);
+ return s;
+}
diff --git a/code/jasmine/tcp-common.h b/code/jasmine/tcp-common.h
new file mode 100755
index 00000000..6ac48e1c
--- /dev/null
+++ b/code/jasmine/tcp-common.h
@@ -0,0 +1,18 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_TCP_COMMON_H
+#define _MCAST_TCP_COMMON_H
+
+#define TCP_DPORT DEF_PORT
+
+int init_tcp_server_socket(char *addr, unsigned short port);
+int init_tcp_client_socket(char *addr, unsigned short port);
+
+#endif
diff --git a/code/jasmine/tcp-queue.c b/code/jasmine/tcp-queue.c
new file mode 100755
index 00000000..fc0323f9
--- /dev/null
+++ b/code/jasmine/tcp-queue.c
@@ -0,0 +1,178 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+/* TCP poll message single linked queue */
+#include <stdlib.h>
+#include <string.h>
+
+#include "misc.h"
+#include "tcp-queue.h"
+
+void tcpq_queue_tail(struct tcpq *q, void *data, long size)
+{
+ struct qmsg *tmp;
+
+ if (!q) {
+ return;
+ }
+
+ tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg));
+ tmp->next = NULL;
+
+ if (!q->head) {
+ q->head = q->tail = tmp;
+ } else {
+ q->tail->next = tmp;
+ q->tail = tmp;
+ }
+
+ q->tail->data = data;
+ q->tail->size = size;
+ q->size += size;
+ q->count++;
+}
+
+void tcpq_queue_head(struct tcpq *q, void *data, long size)
+{
+ struct qmsg *tmp;
+
+ if (!q) {
+ return;
+ }
+
+ tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg));
+ tmp->next = NULL;
+
+ if (!q->head) {
+ q->head = q->tail = tmp;
+ } else {
+ tmp->next = q->head;
+ q->head = tmp;
+ }
+
+ q->head->data = data;
+ q->head->size = size;
+ q->size += size;
+ q->count++;
+}
+
+void * tcpq_dequeue_head(struct tcpq *q, long *size)
+{
+ void *res = NULL;
+ struct qmsg *tmp;
+
+ if (q && q->head) {
+ res = q->head->data;
+ *size = q->head->size;
+
+ tmp = q->head;
+ q->head = q->head->next;
+ if (!q->head) {
+ q->tail = NULL;
+ }
+
+ free(tmp);
+ q->count--;
+ q->size -= *size;
+ }
+ return res;
+}
+
+void * tcpq_queue_peek(struct tcpq *q, long *size)
+{
+ if (q && q->head) {
+ *size = q->head->size;
+ return q->head->data;
+ }
+
+ return NULL;
+}
+
+long tcpq_queue_dsize(struct tcpq *q)
+{
+ if (q) {
+ return q->size;
+ }
+
+ return 0;
+}
+
+void tcpq_queue_free(struct tcpq *q)
+{
+ struct qmsg *tmp;
+
+ if (!q) {
+ return;
+ }
+
+ while (q->head) {
+ tmp = q->head;
+ q->head = tmp->next;
+ free(tmp->data);
+ free(tmp);
+ }
+
+ q->count = 0;
+ q->size = 0;
+ q->head = q->tail = NULL;
+}
+
+struct tcpq * tcpq_queue_init(void)
+{
+ struct tcpq *q = wrapper_malloc(sizeof(struct tcpq));
+
+ q->count = 0;
+ q->size = 0;
+ q->head = q->tail = NULL;
+ return q;
+}
+
+void * tcpq_dqueue_flat(struct tcpq *q, long *size)
+{
+ void *res;
+ struct qmsg *tmp;
+ long offs = 0;
+
+ if (!q || q->count == 0) {
+ return NULL;
+ }
+
+ res = wrapper_malloc(q->size);
+ *size = q->size;
+
+ while (q->head) {
+ memcpy(res + offs, q->head->data, q->head->size);
+ offs += q->head->size;
+
+ tmp = q->head;
+ q->head = tmp->next;
+ free(tmp->data);
+ free(tmp);
+ }
+ tcpq_queue_free(q);
+ return res;
+}
+
+void * tcpq_queue_flat_peek(struct tcpq *q, long *size)
+{
+ void *cpy;
+
+ if (!q) {
+ return NULL;
+ }
+
+ if (q->count > 1) {
+ cpy = tcpq_dqueue_flat(q, size);
+ tcpq_queue_tail(q, cpy, *size); /* use tcpq_queue_head is also OK */
+ } else {
+ cpy = tcpq_queue_peek(q, size);
+ }
+
+ return cpy;
+}
diff --git a/code/jasmine/tcp-queue.h b/code/jasmine/tcp-queue.h
new file mode 100755
index 00000000..4f096f85
--- /dev/null
+++ b/code/jasmine/tcp-queue.h
@@ -0,0 +1,35 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_TCP_QUEUE_H
+#define _MCAST_TCP_QUEUE_H
+
+struct tcpq {
+ struct qmsg *head, *tail;
+ long count; /* message count in a queue */
+ long size; /* Total data size of a queue */
+};
+
+struct qmsg {
+ struct qmsg *next;
+ void *data;
+ long size;
+};
+
+struct tcpq * tcpq_queue_init(void);
+void tcpq_queue_free(struct tcpq *q);
+long tcpq_queue_dsize(struct tcpq *q);
+void tcpq_queue_tail(struct tcpq *q, void *data, long size);
+void tcpq_queue_head(struct tcpq *q, void *data, long size);
+void * tcpq_dequeue_head(struct tcpq *q, long *size);
+void * tcpq_queue_peek(struct tcpq *q, long *size);
+void * tcpq_dqueue_flat(struct tcpq *q, long *size);
+void * tcpq_queue_flat_peek(struct tcpq *q, long *size);
+
+#endif
diff --git a/code/jasmine/udp-common.c b/code/jasmine/udp-common.c
new file mode 100755
index 00000000..317b816b
--- /dev/null
+++ b/code/jasmine/udp-common.c
@@ -0,0 +1,97 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <fcntl.h>
+#include <net/if.h>
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "misc.h"
+
+struct mc_info mcinfo;
+
+int init_mcast_socket(struct in_addr *local_addr,
+ struct sockaddr_in *maddr)
+{
+ struct ip_mreqn mreqn; /* multicast request new */
+ int ms; /* multicast socket */
+ u_short msock_port;
+ struct sockaddr_in bind_addr;
+
+ int msock_reuse = MCAST_REUSE;
+ u_char msock_loop = MCAST_LOOP;
+ u_char msock_ttl = MCAST_TTL;
+
+ if ((ms = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
+ crit("socket() failed, error allocating multicast socket");
+ }
+
+ // In order to let client and server on same host to use same port
+ if (setsockopt(ms, SOL_SOCKET, SO_REUSEADDR,
+ &msock_reuse, sizeof(msock_reuse)) < 0) {
+ crit("setsockopt() failed, can't set reuse flag");
+ }
+
+ if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_TTL,
+ &msock_ttl,sizeof(msock_ttl)) < 0) {
+ crit("setsockopt() failed, can't set ttl value");
+ }
+
+ if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_LOOP,
+ &msock_loop, sizeof(msock_loop)) < 0) {
+ crit("setsockopt() failed, can't set multicast packet looping");
+ }
+
+ /* TODO: Do we neet non-block? */
+
+ log(4, "Using multicast address: %s:%d",
+ inet_ntoa(maddr->sin_addr), ntohs(maddr->sin_port));
+
+ mreqn.imr_multiaddr = maddr->sin_addr;
+ mreqn.imr_address = *local_addr;
+ mreqn.imr_ifindex = 0;
+
+ /* Interface and remote mcast address choosing */
+
+ /* This tell interface which has mreqn.imr_address to join
+ mreqn.imr_multiaddr group, it has nothing to do with socket, port, and
+ mreqn.imr_address. */
+ if (setsockopt(ms, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+ &mreqn, sizeof(mreqn)) < 0) {
+ crit("setsockopt() failed, %s can't join multicast group",
+ inet_ntoa(*local_addr));
+ }
+
+ /* local address choosing */
+
+ /* This tell kernel to use interface which has mreqn.imr_address as device
+ and mreqn.imr_address as source addr when sending any multicast through
+ socket. */
+ if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_IF,
+ &mreqn, sizeof(mreqn)) < 0) {
+ crit("setsockopt() failed to IP_MULTICAST_IF.\n");
+ }
+
+ /* Local port choosing, remote port is choosed when calling sendto()! */
+
+ /* This let us uses specific udp port number as source port when sending
+ any multicast datagrams. So ip in maddr is useless here and should be
+ INADDR_ANY, otherwise, bind will return Invalid argument. */
+ msock_port = ntohs(maddr->sin_port);
+ bind_addr = make_addr(0, msock_port);
+ if (bind(ms, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) {
+ crit("bind() error, can't bind mcast port");
+ }
+
+ return ms;
+}
diff --git a/code/jasmine/udp-common.h b/code/jasmine/udp-common.h
new file mode 100755
index 00000000..861545b4
--- /dev/null
+++ b/code/jasmine/udp-common.h
@@ -0,0 +1,42 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_UDP_COMMON_H
+#define _MCAST_UDP_COMMON_H
+
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/udp.h>
+
+#define MCAST_DPORT DEF_PORT
+
+#define MCAST_TTL 8
+/*
+ * Force reuse
+ */
+#define MCAST_REUSE 1
+
+/*
+ * Loop to let server host itself can receive data too.
+ */
+#define MCAST_LOOP 1
+#define MCAST_ADDR_BASE "224.238.0."
+#define MCAST_ADDR_SUFFIX "31"
+
+// Initial data
+struct mc_info {
+ struct sockaddr_in group;
+};
+
+extern struct mc_info mcinfo;
+
+// How to capture udp packets: tcpdump -i ethx udp -vv -n
+
+int init_mcast_socket(struct in_addr *local_addr, struct sockaddr_in *maddr);
+#endif