summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.coveragerc28
-rw-r--r--.gitignore1
-rwxr-xr-xci/deploy/deploy.sh40
-rw-r--r--code/jasmine/.tarball-version1
-rwxr-xr-xcode/jasmine/Makefile.am83
-rw-r--r--code/jasmine/README24
-rwxr-xr-xcode/jasmine/autogen.sh5
-rwxr-xr-xcode/jasmine/buffer.c152
-rwxr-xr-xcode/jasmine/buffer.h64
-rwxr-xr-xcode/jasmine/build-aux/git-version-gen170
-rwxr-xr-xcode/jasmine/client.c313
-rwxr-xr-xcode/jasmine/configure.ac65
-rwxr-xr-xcode/jasmine/jasmine.spec.in33
-rwxr-xr-xcode/jasmine/misc.c73
-rwxr-xr-xcode/jasmine/misc.h39
-rwxr-xr-xcode/jasmine/server-tcp.c550
-rwxr-xr-xcode/jasmine/server-udp.c95
-rwxr-xr-xcode/jasmine/server.c123
-rwxr-xr-xcode/jasmine/server.h63
-rwxr-xr-xcode/jasmine/tcp-common.c60
-rwxr-xr-xcode/jasmine/tcp-common.h18
-rwxr-xr-xcode/jasmine/tcp-queue.c178
-rwxr-xr-xcode/jasmine/tcp-queue.h35
-rwxr-xr-xcode/jasmine/udp-common.c97
-rwxr-xr-xcode/jasmine/udp-common.h42
-rwxr-xr-xdeploy/check_openstack_progress.sh8
-rw-r--r--deploy/config/bm_environment/zte-baremetal1/deploy.yml1
-rw-r--r--deploy/config/vm_environment/zte-virtual1/deploy.yml2
-rwxr-xr-xdeploy/get_conf.py3
-rw-r--r--deploy/tempest.py34
-rw-r--r--docker/Dockerfile4
-rw-r--r--docs/developer/design/multicast.rst278
-rw-r--r--docs/developer/spec/multicast.rst190
-rw-r--r--requirements.txt0
-rw-r--r--setup.py9
-rw-r--r--test-requirements.txt5
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/unit/__init__.py0
-rw-r--r--tests/unit/test_placeholder.py2
-rw-r--r--tox.ini42
40 files changed, 2902 insertions, 28 deletions
diff --git a/.coveragerc b/.coveragerc
new file mode 100644
index 00000000..d58e2766
--- /dev/null
+++ b/.coveragerc
@@ -0,0 +1,28 @@
+# .coveragerc to control coverage.py
+
+[run]
+branch = True
+source =
+ deploy
+ tests
+
+[report]
+# Regexes for lines to exclude from consideration
+exclude_lines =
+ # Have to re-enable the standard pragma
+ pragma: no cover
+
+ # Don't complain about missing debug-only code:
+ def __repr__
+ if self\.debug
+
+ # Don't complain if tests don't hit defensive assertion code:
+ raise AssertionError
+ raise NotImplementedError
+
+ # Don't complain if non-runnable code isn't run:
+ if 0:
+ if __name__ == .__main__.:
+
+ignore_errors = True
+
diff --git a/.gitignore b/.gitignore
index 078f1c9b..b636538f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,6 +42,7 @@ pip-delete-this-directory.txt
.tox/
.coverage
.cache
+coverage.xml
# Log files:
*.log
diff --git a/ci/deploy/deploy.sh b/ci/deploy/deploy.sh
index b2802c5d..4ec2c2f7 100755
--- a/ci/deploy/deploy.sh
+++ b/ci/deploy/deploy.sh
@@ -20,7 +20,7 @@
############################################################################
# BEGIN of usage description
#
-usage ()
+function usage
{
cat << EOF
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
@@ -144,18 +144,18 @@ WORKDIR=${WORKDIR:-/tmp/workdir}
# set extra ssh paramters
SSH_PARAS="-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no"
-deploy_path=$WORKSPACE/deploy
+DEPLOY_PATH=$WORKSPACE/deploy
-create_qcow2_path=$WORKSPACE/tools
+CREATE_QCOW2_PATH=$WORKSPACE/tools
-daisy_server_net=$WORKSPACE/templates/virtual_environment/networks/daisy.xml
-target_node_net=$WORKSPACE/templates/virtual_environment/networks/os-all_in_one.xml
-vmdeploy_daisy_server_vm=$WORKSPACE/templates/virtual_environment/vms/daisy.xml
-vmdeploy_target_node_vm=$WORKSPACE/templates/virtual_environment/vms/all_in_one.xml
+VMDELOY_DAISY_SERVER_NET=$WORKSPACE/templates/virtual_environment/networks/daisy.xml
+VMDEPLOY_TARGET_NODE_NET=$WORKSPACE/templates/virtual_environment/networks/os-all_in_one.xml
+VMDEPLOY_DAISY_SERVER_VM=$WORKSPACE/templates/virtual_environment/vms/daisy.xml
+VMDEPLOY_TARGET_NODE_VM=$WORKSPACE/templates/virtual_environment/vms/all_in_one.xml
-bmdeploy_daisy_server_net=$WORKSPACE/templates/physical_environment/networks/daisy.xml
-bmdeploy_daisy_server_vm=$WORKSPACE/templates/physical_environment/vms/daisy.xml
+BMDEPLOY_DAISY_SERVER_NET=$WORKSPACE/templates/physical_environment/networks/daisy.xml
+BMDEPLOY_DAISY_SERVER_VM=$WORKSPACE/templates/physical_environment/vms/daisy.xml
PARAS_FROM_DEPLOY=`python $WORKSPACE/deploy/get_para_from_deploy.py --dha $DHA_CONF`
TARGET_HOSTS_NUM=`echo $PARAS_FROM_DEPLOY | cut -d " " -f 1`
@@ -173,10 +173,10 @@ if [ $DRY_RUN -eq 1 ]; then
DAISY_IP: $DAISY_IP
DAISY_PASSWD: $DAISY_PASSWD
PARAS_IMAGE: $PARAS_IMAGE
- daisy_server_net: $daisy_server_net
- target_node_net: $target_node_net
- vmdeploy_daisy_server_vm: $vmdeploy_daisy_server_vm
- vmdeploy_target_node_vm: $vmdeploy_target_node_vm
+ VMDELOY_DAISY_SERVER_NET: $VMDELOY_DAISY_SERVER_NET
+ VMDEPLOY_TARGET_NODE_NET: $VMDEPLOY_TARGET_NODE_NET
+ VMDEPLOY_DAISY_SERVER_VM: $VMDEPLOY_DAISY_SERVER_VM
+ VMDEPLOY_TARGET_NODE_VM: $VMDEPLOY_TARGET_NODE_VM
"""
exit 1
fi
@@ -197,6 +197,7 @@ function create_node
local net_name=$2
local vms_template=$3
local vms_name=$4
+
virsh net-define $net_template
virsh net-autostart $net_name
virsh net-start $net_name
@@ -242,6 +243,7 @@ function clean_up
{
local vm_name=$1
local network_name=$2
+
virsh destroy $vm_name
virsh undefine $vm_name
virsh net-destroy $network_name
@@ -262,17 +264,17 @@ fi
echo "=======create daisy node================"
if [ $IS_BARE == 0 ];then
- $create_qcow2_path/daisy-img-modify.sh -c $create_qcow2_path/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE
- create_node $daisy_server_net daisy1 $vmdeploy_daisy_server_vm daisy
+ $CREATE_QCOW2_PATH/daisy-img-modify.sh -c $CREATE_QCOW2_PATH/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE
+ create_node $VMDELOY_DAISY_SERVER_NET daisy1 $VMDEPLOY_DAISY_SERVER_VM daisy
else
- $create_qcow2_path/daisy-img-modify.sh -c $create_qcow2_path/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE
- virsh define $bmdeploy_daisy_server_vm
+ $CREATE_QCOW2_PATH/daisy-img-modify.sh -c $CREATE_QCOW2_PATH/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE
+ virsh define $BMDEPLOY_DAISY_SERVER_VM
virsh start daisy
fi
sleep 20
echo "====== install daisy==========="
-$deploy_path/trustme.sh $DAISY_IP $DAISY_PASSWD
+$DEPLOY_PATH/trustme.sh $DAISY_IP $DAISY_PASSWD
ssh $SSH_PARAS $DAISY_IP "if [[ -f ${REMOTE_SPACE} || -d ${REMOTE_SPACE} ]]; then rm -fr ${REMOTE_SPACE}; fi"
scp -r $WORKSPACE root@$DAISY_IP:${REMOTE_SPACE}
ssh $SSH_PARAS $DAISY_IP "mkdir -p /home/daisy_install"
@@ -304,7 +306,7 @@ ssh $SSH_PARAS $DAISY_IP "python ${REMOTE_SPACE}/deploy/tempest.py --dha $DHA --
echo "=====create and find node======"
if [ $IS_BARE == 0 ];then
qemu-img create -f qcow2 ${VM_STORAGE}/all_in_one.qcow2 200G
- create_node $target_node_net daisy2 $vmdeploy_target_node_vm all_in_one
+ create_node $VMDEPLOY_TARGET_NODE_NET daisy2 $VMDEPLOY_TARGET_NODE_VM all_in_one
sleep 20
else
for i in $(seq 106 110); do
diff --git a/code/jasmine/.tarball-version b/code/jasmine/.tarball-version
new file mode 100644
index 00000000..21e8796a
--- /dev/null
+++ b/code/jasmine/.tarball-version
@@ -0,0 +1 @@
+1.0.3
diff --git a/code/jasmine/Makefile.am b/code/jasmine/Makefile.am
new file mode 100755
index 00000000..41acfd62
--- /dev/null
+++ b/code/jasmine/Makefile.am
@@ -0,0 +1,83 @@
+# Copyright (c) 2015 ZTE, Inc.
+
+SPEC = $(PACKAGE_NAME).spec
+
+TARFILE = $(PACKAGE_NAME)-$(VERSION).tar.gz
+
+EXTRA_DIST = autogen.sh $(SPEC).in \
+ build-aux/git-version-gen \
+ .version
+
+AUTOMAKE_OPTIONS = foreign
+
+ACLOCAL_AMFLAGS = -I m4
+
+MAINTAINERCLEANFILES = Makefile.in aclocal.m4 configure depcomp \
+ config.guess config.sub missing install-sh \
+ autoheader automake autoconf \
+ autoscan.log configure.scan ltmain.sh test-driver config.h.in
+
+noinst_HEADERS = buffer.h misc.h server.h tcp-common.h tcp-queue.h udp-common.h
+
+dist-clean-local:
+ rm -f autoconf automake autoheader
+
+clean-generic:
+ rm -rf $(SPEC) $(TARFILE)
+
+## make rpm/srpm section.
+
+$(SPEC): $(SPEC).in
+ rm -f $@-t $@
+ ver="$(VERSION)" && \
+ sed \
+ -e "s#@version@#$$ver#g" \
+ $< > $@-t; \
+ chmod a-w $@-t
+ mv $@-t $@
+
+$(TARFILE):
+ $(MAKE) dist
+
+RPMBUILDOPTS = --define "_sourcedir $(abs_builddir)" \
+ --define "_specdir $(abs_builddir)" \
+ --define "_builddir $(abs_builddir)" \
+ --define "_srcrpmdir $(abs_builddir)" \
+ --define "_rpmdir $(abs_builddir)"
+
+srpm: clean
+ $(MAKE) $(SPEC) $(TARFILE)
+ rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) --nodeps -bs $(SPEC)
+
+rpm: clean _version
+ $(MAKE) $(SPEC) $(TARFILE)
+ rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) -ba $(SPEC)
+
+# release/versioning
+BUILT_SOURCES = .version
+.version:
+ echo $(VERSION) > $@-t && mv $@-t $@
+
+dist-hook:
+ echo $(VERSION) > $(distdir)/.tarball-version
+
+.PHONY: _version
+
+_version:
+ cd $(srcdir) && rm -rf autom4te.cache .version && autoreconf -i
+ $(MAKE) $(AM_MAKEFLAGS) Makefile
+
+maintainer-clean-local:
+ rm -rf m4
+
+###
+
+bin_PROGRAMS = jasmines jasminec
+
+DEFAULT_INCLUDES = -I. -I/usr/include
+
+jasmines_SOURCES = udp-common.c tcp-common.c buffer.c misc.c server.c server-udp.c server-tcp.c tcp-queue.c
+jasminec_SOURCES = udp-common.c tcp-common.c buffer.c misc.c client.c
+
+jasmines_LDADD = -lpthread
+jasminec_LDADD = -lpthread
diff --git a/code/jasmine/README b/code/jasmine/README
new file mode 100644
index 00000000..48f169ff
--- /dev/null
+++ b/code/jasmine/README
@@ -0,0 +1,24 @@
+jasmine: Just A Small Multicast engINE
+
+Installation
+------------
+
+./autogen.sh
+./configure
+make && make install
+
+RPM Based Installation
+----------------------
+./autogen.sh
+./configure
+make rpm
+rpm -ivh ./x86_64/jasmine-1.0.3-1.rpm
+
+Usage
+-----
+jasmine server:
+Usage: jasmines local_ip num_of_clients [port]
+
+jasmine client:
+Usage: jasminec <local_ip> <server_ip> [port]
+
diff --git a/code/jasmine/autogen.sh b/code/jasmine/autogen.sh
new file mode 100755
index 00000000..5bf25ecc
--- /dev/null
+++ b/code/jasmine/autogen.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+# Run this to generate all the initial makefiles, etc.
+mkdir -p m4
+echo Building configuration system...
+autoreconf -i
diff --git a/code/jasmine/buffer.c b/code/jasmine/buffer.c
new file mode 100755
index 00000000..0567026b
--- /dev/null
+++ b/code/jasmine/buffer.c
@@ -0,0 +1,152 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "buffer.h"
+#include "misc.h"
+
+struct buffer_ctl buffctl;
+struct packet_ctl *packetctl[PACKETS_PER_BUFFER];
+struct packet_ctl empty;
+
+void buffer_init()
+{
+ int i;
+
+ for (i = 0; i < PACKETS_PER_BUFFER; i++) {
+ packetctl[i] = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE);
+ memset(packetctl[i], 0, PACKET_SIZE);
+ packetctl[i]->data_size = 0;
+ packetctl[i]->seq = 0;
+ }
+
+ buffctl.buffer_id = 0;
+ buffctl.packet_id_base = 0;
+ buffctl.pkt_count = 0;
+ empty.data_size = 0;
+}
+
+void packetctl_precheck()
+{
+ int i;
+
+ for (i = 0; i < PACKETS_PER_BUFFER; i++) {
+ if (packetctl[i]->data_size != 0) {
+ crit("Error precheck packet slot %d,%d", i, packetctl[i]->data_size);
+ }
+ }
+}
+
+// Calculate packet checksum
+uint32_t packet_csum(uint8_t *data, int len)
+{
+ uint32_t *item, sum = 0;
+ int i = 0;
+
+ while (len % 4) {
+ data[len] = 0; len++;
+ }
+
+ while (i < len) {
+ item = (uint32_t *)((char *)data + i);
+ sum += *item;
+ i += 4;
+ }
+
+ return sum;
+}
+
+// Fill buffers from file descriptor
+long buffer_fill(int fd)
+{
+ int s = 0;
+ int r = PACKET_PAYLOAD_SIZE;
+
+ buffctl.buffer_id++;
+ buffctl.packet_id_base += buffctl.pkt_count;
+ buffctl.buffer_size = 0;
+ while (r > 0 && s < PACKETS_PER_BUFFER) {
+ if ((r = read(fd, packetctl[s]->data, PACKET_PAYLOAD_SIZE)) < 0) {
+ crit("Error reading data from stdin");
+ }
+
+ // r == 0 means EOF
+
+ if (r > 0) {
+ buffctl.buffer_size += r;
+ packetctl[s]->data_size = r;
+ packetctl[s]->seq = buffctl.packet_id_base + s;
+ packetctl[s]->crc = packet_csum(packetctl[s]->data, r);
+ s++;
+ }
+ }
+
+ log(6, "input %d bytes of data in %d packets", buffctl.buffer_size, s);
+ buffctl.pkt_count = s;
+ return s;
+}
+
+long buffer_flush(int fd)
+{
+ int s = 0;
+ while (buffctl.pkt_count--) {
+ write(fd, packetctl[s]->data, packetctl[s]->data_size);
+ buffctl.buffer_size -= packetctl[s]->data_size;
+ packetctl[s]->data_size = 0;
+ packetctl[s]->seq = 0;
+ packetctl[s]->crc = 0;
+ s++;
+ }
+
+ return s;
+}
+
+/* Caller should use new_pkt as packet buffer again if this returns NULL */
+struct packet_ctl *packet_put(struct packet_ctl *new_pkt)
+{
+ struct packet_ctl *freed_pkt;
+ uint32_t i;
+
+ if (new_pkt->seq < buffctl.packet_id_base ||
+ new_pkt->seq - buffctl.packet_id_base >= buffctl.pkt_count) {
+ return NULL;
+ }
+
+ if (new_pkt->data_size == 0) {
+ return NULL;
+ }
+
+ i = packet_csum(new_pkt->data, new_pkt->data_size);
+ if (new_pkt->crc != i) {
+ return NULL;
+ }
+
+ i = new_pkt->seq - buffctl.packet_id_base;
+ freed_pkt = packetctl[i];
+ packetctl[i] = new_pkt;
+ return freed_pkt;
+}
+
+struct packet_ctl *packet_get(uint32_t seq)
+{
+ empty.seq = seq;
+
+ if (seq < buffctl.packet_id_base ||
+ seq - buffctl.packet_id_base >= buffctl.pkt_count) {
+ return &empty;
+ }
+
+ if (packetctl[seq - buffctl.packet_id_base]->data_size == 0) {
+ return &empty;
+ }
+
+ return packetctl[seq - buffctl.packet_id_base];
+}
diff --git a/code/jasmine/buffer.h b/code/jasmine/buffer.h
new file mode 100755
index 00000000..e899fe62
--- /dev/null
+++ b/code/jasmine/buffer.h
@@ -0,0 +1,64 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_BUFFER_H
+#define _MCAST_BUFFER_H
+
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/udp.h>
+
+#define MTU 1500
+
+#define PACKET_SIZE ((MTU) - sizeof(struct iphdr) - sizeof(struct udphdr))
+
+/* Exclude padding */
+#define PACKET_PAYLOAD_SIZE (((PACKET_SIZE) - sizeof(struct packet_ctl)) & ~0x3)
+
+#define PACKETS_PER_BUFFER 1024
+
+#define DEF_PORT 18383 /* for both UDP and TCP */
+
+/* Buffer header (align to 4 Byte) */
+struct buffer_ctl {
+ uint32_t buffer_id;
+ uint32_t buffer_size;
+ uint32_t packet_id_base;
+ uint32_t pkt_count;
+};
+
+/* Packet header (align to 4 Byte) */
+struct packet_ctl {
+ uint32_t seq;
+ uint32_t crc;
+ uint32_t data_size;
+ uint8_t data[0];
+};
+
+#define CLIENT_READY 0x1
+#define CLIENT_REQ 0x2
+#define CLIENT_DONE 0x4
+#define SERVER_SENT 0x8
+
+/* Retransmition Request Header (align to 4 Byte) */
+struct request_ctl {
+ uint32_t req_count; /* Requested packet slot count */
+};
+
+extern struct buffer_ctl buffctl;
+extern struct packet_ctl *packetctl[PACKETS_PER_BUFFER];
+
+void buffer_init();
+long buffer_fill(int fd);
+long buffer_flush(int fd);
+struct packet_ctl *packet_put(struct packet_ctl *new_pkt);
+struct packet_ctl *packet_get(uint32_t seq);
+void packetctl_precheck();
+
+#endif
diff --git a/code/jasmine/build-aux/git-version-gen b/code/jasmine/build-aux/git-version-gen
new file mode 100755
index 00000000..c3d53f36
--- /dev/null
+++ b/code/jasmine/build-aux/git-version-gen
@@ -0,0 +1,170 @@
+#!/bin/sh
+# Print a version string.
+scriptversion=2010-10-13.20; # UTC
+
+# Copyright (C) 2007-2010 Free Software Foundation, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# This script is derived from GIT-VERSION-GEN from GIT: http://git.or.cz/.
+# It may be run two ways:
+# - from a git repository in which the "git describe" command below
+# produces useful output (thus requiring at least one signed tag)
+# - from a non-git-repo directory containing a .tarball-version file, which
+# presumes this script is invoked like "./git-version-gen .tarball-version".
+
+# In order to use intra-version strings in your project, you will need two
+# separate generated version string files:
+#
+# .tarball-version - present only in a distribution tarball, and not in
+# a checked-out repository. Created with contents that were learned at
+# the last time autoconf was run, and used by git-version-gen. Must not
+# be present in either $(srcdir) or $(builddir) for git-version-gen to
+# give accurate answers during normal development with a checked out tree,
+# but must be present in a tarball when there is no version control system.
+# Therefore, it cannot be used in any dependencies. GNUmakefile has
+# hooks to force a reconfigure at distribution time to get the value
+# correct, without penalizing normal development with extra reconfigures.
+#
+# .version - present in a checked-out repository and in a distribution
+# tarball. Usable in dependencies, particularly for files that don't
+# want to depend on config.h but do want to track version changes.
+# Delete this file prior to any autoconf run where you want to rebuild
+# files to pick up a version string change; and leave it stale to
+# minimize rebuild time after unrelated changes to configure sources.
+#
+# It is probably wise to add these two files to .gitignore, so that you
+# don't accidentally commit either generated file.
+#
+# Use the following line in your configure.ac, so that $(VERSION) will
+# automatically be up-to-date each time configure is run (and note that
+# since configure.ac no longer includes a version string, Makefile rules
+# should not depend on configure.ac for version updates).
+#
+# AC_INIT([GNU project],
+# m4_esyscmd([build-aux/git-version-gen .tarball-version]),
+# [bug-project@example])
+#
+# Then use the following lines in your Makefile.am, so that .version
+# will be present for dependencies, and so that .tarball-version will
+# exist in distribution tarballs.
+#
+# BUILT_SOURCES = $(top_srcdir)/.version
+# $(top_srcdir)/.version:
+# echo $(VERSION) > $@-t && mv $@-t $@
+# dist-hook:
+# echo $(VERSION) > $(distdir)/.tarball-version
+
+case $# in
+ 1|2) ;;
+ *) echo 1>&2 "Usage: $0 \$srcdir/.tarball-version" \
+ '[TAG-NORMALIZATION-SED-SCRIPT]'
+ exit 1;;
+esac
+
+tarball_version_file=$1
+tag_sed_script="${2:-s/x/x/}"
+nl='
+'
+
+# Avoid meddling by environment variable of the same name.
+v=
+
+svn log --non-interactive -q --limit 1 > /dev/null 2>&1
+svnwork=$?
+
+# First see if there is a tarball-only version file.
+# then try "git describe", then default.
+if test -f $tarball_version_file
+then
+ v=`cat $tarball_version_file` || exit 1
+ case $v in
+ *$nl*) v= ;; # reject multi-line output
+ [0-9]*) ;;
+ *) v= ;;
+ esac
+ test -z "$v" \
+ && echo "$0: WARNING: $tarball_version_file seems to be damaged" 1>&2
+fi
+
+if test -n "$v"
+then
+ : # use $v
+# Otherwise, if there is at least one git commit involving the working
+# directory, and "git describe" output looks sensible, use that to
+# derive a version string.
+elif test "`git log -1 --pretty=format:x . 2>&1`" = x \
+ && v=`git describe --abbrev=4 --match='v*' HEAD 2>/dev/null \
+ || git describe --abbrev=4 HEAD 2>/dev/null` \
+ && v=`printf '%s\n' "$v" | sed "$tag_sed_script"` \
+ && case $v in
+ v[0-9]*) ;;
+ *) (exit 1) ;;
+ esac
+then
+ # Is this a new git that lists number of commits since the last
+ # tag or the previous older version that did not?
+ # Newer: v6.10-77-g0f8faeb
+ # Older: v6.10-g0f8faeb
+ case $v in
+ *-*-*) : git describe is okay three part flavor ;;
+ *-*)
+ : git describe is older two part flavor
+ # Recreate the number of commits and rewrite such that the
+ # result is the same as if we were using the newer version
+ # of git describe.
+ vtag=`echo "$v" | sed 's/-.*//'`
+ numcommits=`git rev-list "$vtag"..HEAD | wc -l`
+ v=`echo "$v" | sed "s/\(.*\)-\(.*\)/\1-$numcommits-\2/"`;
+ ;;
+ esac
+
+ # Change the first '-' to a '.', so version-comparing tools work properly.
+ # Remove the "g" in git describe's output string, to save a byte.
+ v=`echo "$v" | sed 's/-/./;s/\(.*\)-g/\1-/'`;
+elif [ "$svnwork" -eq "0" ] ;
+then
+ vtag="1.0.0" # we do not have git tag in svn, so here just hard coded it.
+ v=`svn log -q --limit 1 | grep "|" | awk '{print $1}'`
+ v=`echo "$v" |sed 's/^r//'`
+ v="$vtag.$v"
+else
+ v=UNKNOWN
+fi
+
+v=`echo "$v" |sed 's/^v//'`
+
+# Don't declare a version "dirty" merely because a time stamp has changed.
+git update-index --refresh > /dev/null 2>&1
+
+dirty=`sh -c 'git diff-index --name-only HEAD' 2>/dev/null` || dirty=
+case "$dirty" in
+ '') ;;
+ *) # Append the suffix only if there isn't one already.
+ case $v in
+ *-dirty) ;;
+ *) v="$v-dirty" ;;
+ esac ;;
+esac
+
+# Omit the trailing newline, so that m4_esyscmd can use the result directly.
+echo "$v" | tr -d "$nl"
+
+# Local variables:
+# eval: (add-hook 'write-file-hooks 'time-stamp)
+# time-stamp-start: "scriptversion="
+# time-stamp-format: "%:y-%02m-%02d.%02H"
+# time-stamp-time-zone: "UTC"
+# time-stamp-end: "; # UTC"
+# End:
diff --git a/code/jasmine/client.c b/code/jasmine/client.c
new file mode 100755
index 00000000..42d996af
--- /dev/null
+++ b/code/jasmine/client.c
@@ -0,0 +1,313 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "tcp-common.h"
+#include "misc.h"
+
+/* Global statistics */
+long tot_dups = 0;
+long tot_reps = 0;
+long tot_pkts = 0;
+long long tot_size = 0;
+
+void recv_mcinfo(int tcp_socket)
+{
+ int res;
+
+ res = read_all(tcp_socket, &mcinfo, sizeof(struct mc_info));
+ if (res != sizeof(struct mc_info)) {
+ crit("Error while reading initial data");
+ }
+}
+
+void recv_buffctl(int tcp_socket)
+{
+ int res;
+
+ res = read_all(tcp_socket, &buffctl, sizeof(struct buffer_ctl));
+ if (res != sizeof(struct buffer_ctl)) {
+ crit("Error reading buffer control");
+ }
+}
+
+void send_client_ready(int tcp_socket)
+{
+ uint8_t cmd;
+
+ /* Tell TCP server we are ready to receive */
+ cmd = CLIENT_READY;
+ write(tcp_socket, &cmd, 1);
+}
+
+/* Read out the SERVER_SENT signal */
+void recv_server_sent(int tcp_socket)
+{
+ uint8_t cmd;
+ int res;
+
+ res = read_all(tcp_socket, &cmd, 1);
+ if (res != 1) {
+ //if (read(tcp_socket, &cmd, 1) < 1) {
+ crit("Error reading SERVER_SENT");
+ }
+
+ if (cmd != SERVER_SENT) {
+ crit("Error reading SERVER_SENT %d", cmd);
+ }
+}
+
+void send_client_request(int tcp_socket, struct request_ctl *req)
+{
+ uint8_t cmd;
+
+ /* Tell TCP server we are ready to receive */
+ cmd = CLIENT_REQ;
+ write(tcp_socket, &cmd, 1);
+ write(tcp_socket, req,
+ sizeof(struct request_ctl) + (req->req_count) * sizeof(uint32_t));
+}
+
+void recv_server_ack(int tcp_socket, struct packet_ctl *pkt)
+{
+ int res;
+
+ res = read_all(tcp_socket, pkt, sizeof(struct packet_ctl));
+ if (res != sizeof(struct packet_ctl)) {
+ crit("Error on tcp socket");
+ }
+
+ res = read_all(tcp_socket,
+ ((char *)pkt) + sizeof(struct packet_ctl),
+ pkt->data_size);
+ if (res != pkt->data_size) {
+ crit("Error on tcp socket received %d of %d",
+ res, pkt->data_size);
+ }
+}
+
+void tcp_retransmition(int tcp_socket,
+ struct packet_ctl **curr_pkt,
+ struct packet_ctl **freed_pkt)
+{
+ struct request_ctl *reqctl;
+ uint32_t *reqbody;
+ uint8_t rqbuf[sizeof(struct request_ctl) + PACKETS_PER_BUFFER * sizeof(uint32_t)];
+ uint32_t l;
+
+ reqctl = (struct request_ctl *)rqbuf;
+ reqbody = (uint32_t *)(rqbuf + sizeof(struct request_ctl));
+
+ reqctl->req_count = 0;
+ for (l = 0; l < buffctl.pkt_count; l++) {
+ if (!packetctl[l]->data_size) {
+ log(6, "Requesting packet %u", l + buffctl.packet_id_base);
+ reqbody[reqctl->req_count] = l + buffctl.packet_id_base;
+ reqctl->req_count++;
+ }
+ }
+
+ if (reqctl->req_count > 0) {
+ send_client_request(tcp_socket, reqctl);
+
+ /* read retransmitted blocks via TCP */
+ for (l = 0; l < reqctl->req_count; l++) {
+ if (*freed_pkt) {
+ *curr_pkt = *freed_pkt;
+ }
+
+ recv_server_ack(tcp_socket, *curr_pkt);
+
+ *freed_pkt = packet_put(*curr_pkt);
+ if (!(*freed_pkt)) {
+ crit("Malformed packet on tcp socket");
+ }
+ if ((*freed_pkt)->data_size != 0) {
+ crit("Malformed free packet slot or TCP data");
+ }
+
+ log(6, "Received retran packet %u", (*curr_pkt)->seq);
+ }
+
+ tot_reps += reqctl->req_count;
+ }
+}
+
+/* Returns how many good packets received from UDP */
+int recv_mcast(int tcp_socket, int udp_socket,
+ struct packet_ctl **curr_pkt,
+ struct packet_ctl **freed_pkt)
+{
+ int rcv_pkt_count;
+ int got_sent;
+ int maxfd;
+ fd_set rfds;
+ struct timeval tv;
+ int res;
+
+ maxfd = tcp_socket;
+ if (maxfd < udp_socket) {
+ maxfd = udp_socket;
+ }
+ maxfd++;
+ FD_ZERO(&rfds);
+
+ rcv_pkt_count = 0;
+ got_sent = 0;
+
+ if (buffctl.pkt_count != 0) {
+ do {
+ FD_SET(tcp_socket, &rfds);
+ FD_SET(udp_socket, &rfds);
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+
+ res = select(maxfd, &rfds, 0, 0, &tv);
+ if (res < 0) {
+ crit("select error");
+ }
+
+ if (res == 0) {
+ crit("select timed out");
+ }
+
+ /* Read multicast packet */
+ if (FD_ISSET(udp_socket, &rfds)) {
+ log(7, "Reading multicast packet");
+ if (*freed_pkt) {
+ *curr_pkt = *freed_pkt;
+ }
+
+ res = recv(udp_socket, *curr_pkt, PACKET_SIZE, 0);
+ if (res <= 0) {
+ crit("error on multicast socket");
+ }
+
+ if (res < sizeof(struct packet_ctl) ) {
+ log(7, "Truncated packet received (%d bytes)", res);
+ } else if (res != (*curr_pkt)->data_size + sizeof(struct packet_ctl)) {
+ log(7,
+ "Truncated packet received (%d of %ld bytes)",
+ res, (*curr_pkt)->data_size + sizeof(struct packet_ctl));
+ } else {
+ log(9, "Normal packet seq:%d", (*curr_pkt)->seq);
+ (*freed_pkt) = packet_put((*curr_pkt));
+ if (!(*freed_pkt)) {
+ log(5, "Malformed packet");
+ } else {
+ if ((*freed_pkt)->data_size == 0) {
+ rcv_pkt_count++;
+ } else {
+ log(6, "Duplicated packet");
+ tot_dups++;
+ }
+ }
+ }
+ } else if (FD_ISSET(tcp_socket, &rfds)) {
+ /* Check TCP, only if there was no more data from UDP */
+ log(6, "No more data path1");
+ recv_server_sent(tcp_socket);
+ got_sent = 1;
+ break;
+ }
+ } while (rcv_pkt_count < buffctl.pkt_count);
+ }
+
+ if (got_sent == 0) {
+ log(6, "No more data path2");
+ recv_server_sent(tcp_socket);
+ }
+
+ return rcv_pkt_count;
+}
+
+void send_client_done(int tcp_socket)
+{
+ uint8_t cmd;
+
+ /* Tell TCP server we are ready to receive */
+ cmd = CLIENT_DONE;
+ write(tcp_socket, &cmd, 1);
+}
+
+int main(int argc, char *argv[])
+{
+ int ms, ts;
+ struct packet_ctl *alloc_pkt, *curr_pkt, *freed_pkt;
+ int udp_rcv_count;
+ u_short port = DEF_PORT;
+ struct in_addr local_addr;
+
+ if (argc < 3) {
+ printf("Usage: %s <local_ip> <server_ip> [port]\n", argv[0]);
+ return 0;
+ }
+
+ if (!inet_aton(argv[1], &local_addr)) {
+ crit("can not resolve address: %s", argv[1]);
+ }
+
+ if (argc > 3) {
+ port = atoi(argv[3]);
+ if (!port) {
+ port = DEF_PORT;
+ }
+ }
+
+ buffer_init();
+ /* Init first time packet slot */
+ alloc_pkt = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE);
+ memset(alloc_pkt, 0, PACKET_SIZE);
+ freed_pkt = curr_pkt = alloc_pkt;
+
+ ts = init_tcp_client_socket(argv[2], port);
+ recv_mcinfo(ts);
+ ms = init_mcast_socket(&local_addr, &mcinfo.group);
+ /* Do we need set_nonblock(ms)??? ; */
+
+ /* Will do dummy run even if buffctl.pkt_count is zero at the first round */
+ do { /* one buffer a round */
+ packetctl_precheck();
+ recv_buffctl(ts);
+ send_client_ready(ts);
+
+ udp_rcv_count = recv_mcast(ts, ms, &curr_pkt,&freed_pkt);
+ if (udp_rcv_count == buffctl.pkt_count) {
+ log(6, "All packets of current buffer received from UDP");
+ } else {
+ tcp_retransmition(ts, &curr_pkt, &freed_pkt);
+ }
+
+ tot_pkts += buffctl.pkt_count;
+ tot_size += buffctl.buffer_size;
+ log(1, "\rBuffer received %lld Bytes, %ld Packets(%ld Repeats %ld Dups)",
+ tot_size, tot_pkts, tot_reps, tot_dups);
+
+ if (buffctl.pkt_count) {
+ buffer_flush(STDOUT_FILENO);
+ }
+
+ send_client_done(ts);
+ } while (buffctl.pkt_count != 0);
+
+ shutdown(ts, 2);
+ close(ts);
+ close(ms);
+ log(1, "All buffers receive done.\n");
+ return 0;
+}
diff --git a/code/jasmine/configure.ac b/code/jasmine/configure.ac
new file mode 100755
index 00000000..77870c95
--- /dev/null
+++ b/code/jasmine/configure.ac
@@ -0,0 +1,65 @@
+# -*- Autoconf -*-
+# Process this file with autoconf to produce a configure script.
+
+# bootstrap / init
+AC_PREREQ([2.61])
+
+AC_INIT([jasmine],
+ m4_esyscmd([build-aux/git-version-gen .tarball-version]),
+ [hu.zhijiang@zte.com.cn])
+
+AC_USE_SYSTEM_EXTENSIONS
+
+AM_INIT_AUTOMAKE([-Wno-portability])
+
+AC_CONFIG_HEADER([config.h])
+
+AC_CONFIG_MACRO_DIR([m4])
+
+AC_SUBST(WITH_LIST, [""])
+
+dnl Fix default variables - "prefix" variable if not specified
+if test "$prefix" = "NONE"; then
+ prefix="/usr"
+
+ dnl Fix "localstatedir" variable if not specified
+ if test "$localstatedir" = "\${prefix}/var"; then
+ localstatedir="/var"
+ fi
+ dnl Fix "sysconfdir" variable if not specified
+ if test "$sysconfdir" = "\${prefix}/etc"; then
+ sysconfdir="/etc"
+ fi
+ dnl Fix "libdir" variable if not specified
+ if test "$libdir" = "\${exec_prefix}/lib"; then
+ if test -e /usr/lib64; then
+ libdir="/usr/lib64"
+ else
+ libdir="/usr/lib"
+ fi
+ fi
+fi
+
+# Checks for programs.
+AC_PATH_PROG([BASHPATH], [bash])
+
+AC_CONFIG_FILES([Makefile])
+
+PACKAGE_FEATURES=""
+
+ENV_CFLAGS="$CFLAGS"
+OPT_CFLAGS=""
+GDB_FLAGS=""
+EXTRA_WARNINGS="-Wall"
+CFLAGS="$ENV_CFLAGS $OPT_CFLAGS $GDB_FLAGS $EXTRA_WARNINGS"
+
+# substitute what we need:
+AC_SUBST([BASHPATH])
+
+AC_OUTPUT
+
+AC_MSG_RESULT([ Version = ${PACKAGE_VERSION}])
+AC_MSG_RESULT([ Final CFLAGS = ${CFLAGS}])
+AC_MSG_RESULT([ Final CPPFLAGS = ${CPPFLAGS}])
+AC_MSG_RESULT([ Final LDFLAGS = ${LDFLAGS}])
+AC_MSG_RESULT([ Features = ${PACKAGE_FEATURES}])
diff --git a/code/jasmine/jasmine.spec.in b/code/jasmine/jasmine.spec.in
new file mode 100755
index 00000000..73172f31
--- /dev/null
+++ b/code/jasmine/jasmine.spec.in
@@ -0,0 +1,33 @@
+Summary: Just A Small Multicast engINE
+Name: jasmine
+Version: @version@
+Release: 1%{?dist}
+Vendor: ZTE
+License: Apache-2.0
+Group: System Environment/Base
+URL: https://wiki.opnfv.org/display/DAIS
+Source: %{name}-%{version}%{?gittarver}.tar.gz
+
+BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX)
+
+%description
+jasmine is used to distribute files over UDP multicast for installers
+
+%prep
+%setup -q -n %{name}-%{version}%{?gittarver}
+
+%build
+if [ ! -f configure ]; then
+ ./autogen.sh
+fi
+
+./configure
+
+%install
+rm -rf %{buildroot}
+make install DESTDIR=%{buildroot}
+
+%files
+%{_bindir}/jasmines
+%{_bindir}/jasminec
+
diff --git a/code/jasmine/misc.c b/code/jasmine/misc.c
new file mode 100755
index 00000000..eab367e1
--- /dev/null
+++ b/code/jasmine/misc.c
@@ -0,0 +1,73 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <fcntl.h>
+
+#include "misc.h"
+
+struct sockaddr_in make_addr(char *addr, unsigned short port)
+{
+ struct sockaddr_in sin;
+
+ sin.sin_family = AF_INET;
+ if (!addr) {
+ sin.sin_addr.s_addr = htonl(INADDR_ANY);
+ } else {
+ if (!inet_aton(addr, &sin.sin_addr))
+ crit("cant resolve address: %s", addr);
+ }
+ sin.sin_port = htons(port);
+ return sin;
+}
+
+/* Better than using a macro */
+void set_nonblock(int s)
+{
+ if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) {
+ crit("set_nonblock failed, can't set O_NONBLOCK");
+ }
+}
+
+void * wrapper_malloc(size_t size)
+{
+ void * res;
+
+ if (size == 0) {
+ crit("wrapper_malloc: malloc 0 size not allowed");
+ }
+
+ res = malloc(size);
+ if (res == NULL) {
+ crit("wrapper_malloc: malloc failed");
+ }
+
+ return res;
+}
+
+size_t read_all(int fd, void *buf, size_t count)
+{
+ size_t t = 0;
+ size_t r = 0;
+
+ while (count > 0) {
+ r = read(fd, buf + t, count);
+ if (r <= 0) {
+ return t ? t : r;
+ }
+
+ t += r;
+ count -= r;
+ }
+
+ return t;
+}
diff --git a/code/jasmine/misc.h b/code/jasmine/misc.h
new file mode 100755
index 00000000..e60d0b63
--- /dev/null
+++ b/code/jasmine/misc.h
@@ -0,0 +1,39 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_MISC_H
+#define _MCAST_MISC_H
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <error.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#define gettid() syscall(__NR_gettid)
+
+#define level 6
+
+#define crit(x, args...) do { \
+ fprintf(stderr, "\nERROR: "); fprintf(stderr, x, ##args); \
+ error(-1,errno, "\nERROR: [%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \
+} while (0);
+
+#define log(l, f, args...) if (l < level) { \
+ fprintf(stderr, "\n[%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \
+ fprintf(stderr, f, ##args); \
+ fprintf(stderr, "\n"); \
+}
+
+struct sockaddr_in make_addr(char *addr, unsigned short port);
+void * wrapper_malloc(size_t size);
+void set_nonblock(int s);
+size_t read_all(int fd, void *buf, size_t count);
+
+#endif
diff --git a/code/jasmine/server-tcp.c b/code/jasmine/server-tcp.c
new file mode 100755
index 00000000..c48de77d
--- /dev/null
+++ b/code/jasmine/server-tcp.c
@@ -0,0 +1,550 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <sys/poll.h>
+#include <string.h>
+#include <pthread.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "udp-common.h"
+#include "tcp-queue.h"
+#include "misc.h"
+#include "server.h"
+
+#define MAX_CLIENTS 128 /* Clients per TCP server */
+#define TCP_BUFF_SIZE 65536
+
+struct cdata {
+ struct tcpq *tx;
+ struct tcpq *rx;
+ int await; /* non-zero if we are in the middle of receiving clients requests */
+ struct sockaddr_in peer;
+};
+
+struct server_status_data {
+ int state; /* state machine */
+ int sync_count;
+ int ccount;
+ struct semlink sl;
+ pthread_t pchild;
+
+ struct pollfd ds[MAX_CLIENTS + 1];
+ struct cdata cd[MAX_CLIENTS + 1];
+ int cindex;
+ int last_buff_pkt_count;
+};
+
+void init_sdata(struct server_status_data *sdata, void *thread_args)
+{
+ int i;
+
+ sdata->sl.parent = (struct semaphores *)thread_args;
+ sdata->sl.this = NULL;
+
+ sdata->state = S_PREP;
+ sdata->last_buff_pkt_count = 1;
+ sdata->sync_count = 0;
+
+ for (i = 0; i < MAX_CLIENTS + 1; i++) {
+ sdata->ds[i].fd = -1;
+ sdata->ds[i].events = POLLIN;
+ sdata->cd[i].tx = tcpq_queue_init();
+ sdata->cd[i].rx = tcpq_queue_init();
+ sdata->cd[i].await = 0;
+ }
+}
+
+/* disconnect one client and compress table */
+void kill_client(struct server_status_data *sdata)
+{
+ struct pollfd *ds = sdata->ds;
+ struct cdata *cd = sdata->cd;
+
+ close((ds + sdata->cindex)->fd);
+ tcpq_queue_free((cd + sdata->cindex)->rx);
+ tcpq_queue_free((cd + sdata->cindex)->tx);
+
+ /* Move last slot to this free slot and release the last slot */
+ *(ds + sdata->cindex) = *(ds + sdata->ccount - 1);
+ *(cd + sdata->cindex) = *(cd + sdata->ccount - 1);
+ sdata->cindex--;
+ sdata->ccount--;
+ client_count--;
+}
+
+void send_buffctl_to_all_clients(struct server_status_data *sdata)
+{
+ int i;
+ long send_sz;
+ void *cpy;
+
+ /* copy header to all queues */
+ send_sz = sizeof(struct buffer_ctl);
+ for (i = 1; i < sdata->ccount; i++) {
+ cpy = wrapper_malloc(send_sz);
+ memcpy(cpy, &buffctl, send_sz);
+ tcpq_queue_tail(sdata->cd[i].tx, cpy, send_sz);
+ sdata->ds[i].events |= POLLOUT;
+ }
+}
+
+void send_sent_to_all_clients(struct server_status_data *sdata)
+{
+ int i;
+ void *cpy;
+ uint8_t byte = SERVER_SENT;
+
+ /* Push data to clients */
+ for (i = 1; i < sdata->ccount; i++) {
+ cpy = wrapper_malloc(1);
+ memcpy(cpy, &byte, 1);
+ tcpq_queue_tail(sdata->cd[i].tx, cpy, 1);
+ sdata->ds[i].events |= POLLOUT;
+ }
+}
+void accept_clients(struct server_status_data *sdata)
+{
+ int res;
+ int new_fd;
+ socklen_t socklen;
+ int poll_events;
+ int timeout = -1;
+ struct pollfd *ds = sdata->ds;
+ struct cdata *cd = sdata->cd;
+
+ ds->events = POLLIN;
+ ds->revents = 0;
+ sdata->ccount = 1;
+
+ while (sdata->ccount <= MAX_CLIENTS && client_count < wait_count) {
+ poll_events = poll(ds, 1, timeout);
+ if (poll_events < 0) {
+ crit("poll() failed");
+ } else if (poll_events == 0) {
+ log(2, "poll() returned with no results!");
+ continue;
+ }
+
+ log(9, "poll: %d events", poll_events);
+
+ if (ds->revents) {
+
+ /* new connections come in */
+ if (ds->revents & (POLLIN|POLLPRI)) {
+
+ do {
+ socklen = sizeof((cd + sdata->ccount)->peer);
+retry_accept:
+ new_fd = accept(ds->fd,
+ (struct sockaddr *)&((cd + sdata->ccount)->peer),
+ &socklen);
+ if (new_fd == -1 && errno == EINTR) {
+ goto retry_accept;
+ }
+
+ if (new_fd == -1) {
+ log(2, "accept() returned with error %d", errno);
+ break;
+ }
+
+ /* Send group info, before set non block */
+ res = write(new_fd, &mcinfo, sizeof(struct mc_info));
+ if (res < 0) {
+ log(3, "Error opening connection: %s",
+ inet_ntoa((cd + sdata->ccount)->peer.sin_addr));
+ close(new_fd);
+ continue;
+ }
+
+ /* Can set to non block becasue we will poll it in future. */
+ res = fcntl(new_fd, F_SETFL, O_NONBLOCK);
+ if (res == -1) {
+ log(2, "fcntl() returned with error %d", errno);
+ close(new_fd);
+ continue;
+ }
+
+ (ds + sdata->ccount)->fd = new_fd;
+
+ log(5, "New connection %d: %s",
+ sdata->ccount, inet_ntoa((cd + sdata->ccount)->peer.sin_addr));
+ sdata->ccount++;
+ client_count++;
+ } while (sdata->ccount <= MAX_CLIENTS);
+
+ } else {
+ log(2, "Error on tcp socket");
+ }
+
+ poll_events--;
+ ds->revents = 0;
+ }
+ }
+
+ ds->events = 0;
+}
+
+void accept_clients_may_spawn(struct server_status_data *sdata)
+{
+ /* Wait the parent to let us accpet clients */
+ sl_wait_parent(&(sdata->sl));
+
+ sdata->ds[0].fd = sfd;
+ accept_clients(sdata);
+ if (client_count < wait_count) {
+ sdata->sl.this = spawn_thread(&sdata->pchild, tcp_server_main, 1);
+
+ /* Let child TCP servers accept connections */
+ sl_release_child(&(sdata->sl));
+ /* Wait until all childern TCP servers got all their clients */
+ sl_wait_child(&(sdata->sl));
+ }
+
+ /* Tell parent all client are accepted/connected */
+ sl_release_parent(&(sdata->sl));
+}
+
+void keep_on_receiving_client_request(struct server_status_data *sdata)
+{
+ char buf[TCP_BUFF_SIZE];
+ void *cpy;
+ struct request_ctl *req;
+ uint32_t *rqb;
+ struct packet_ctl *ans;
+ long rq_index;
+ long total_sz;
+
+ long read_out;
+ struct pollfd *ds;
+ struct cdata *cd;
+
+ ds = &(sdata->ds[sdata->cindex]);
+ cd = &(sdata->cd[sdata->cindex]);
+
+ read_out = read(ds->fd, buf, cd->await);
+ if (read_out <= 0) {
+ log(5, "Client disconnected (r): %s",
+ inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+ return;
+ }
+
+ log(7, "New data (%ld + %ld) on conn %d: %s",
+ read_out, tcpq_queue_dsize(cd->rx), sdata->cindex,
+ inet_ntoa(cd->peer.sin_addr));
+ cpy = wrapper_malloc(read_out);
+ memcpy(cpy, buf, read_out);
+ tcpq_queue_tail(cd->rx, cpy, read_out);
+ cd->await -= read_out;
+
+ /* Full header received */
+ if (tcpq_queue_dsize(cd->rx) == sizeof(struct request_ctl)) {
+ cpy = tcpq_queue_flat_peek(cd->rx, &read_out);
+ req = (struct request_ctl *)cpy;
+ cd->await = req->req_count * sizeof(uint32_t);
+ log(6, "Client request for %u packets on %d: %s",
+ req->req_count, sdata->cindex, inet_ntoa(cd->peer.sin_addr));
+ /* req->rqc may be zero? So do not return from here */
+ }
+
+ /* Whole request(struct request_ctl + all rq blocks) received */
+ if (cd->await == 0) {
+ cpy = tcpq_dqueue_flat(cd->rx, &read_out);
+ req = (struct request_ctl *)cpy;
+ rqb = (uint32_t *) (cpy + sizeof(struct request_ctl));
+ for (rq_index = 0; rq_index < req->req_count; rq_index++) {
+ ans = packet_get(*(rqb + rq_index));
+ total_sz = ans->data_size + sizeof(struct packet_ctl);
+ log(6, "Send packet %u (%u bytes) on %d",
+ rqb[rq_index], ans->data_size, sdata->cindex);
+ cpy = wrapper_malloc(total_sz);
+ memcpy(cpy, ans, total_sz);
+ tcpq_queue_tail(cd->tx, cpy, total_sz);
+ }
+
+ if (rq_index > 0) {
+ /* Data need to be sent out */
+ ds->events |= POLLOUT;
+ }
+ }
+}
+
+void handle_error_event(struct server_status_data *sdata)
+{
+ struct cdata *cd;
+
+ cd = &(sdata->cd[sdata->cindex]);
+ log(5, "Closing connection %d: %s",
+ sdata->cindex, inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+}
+
+void handle_client_ready(struct server_status_data *sdata)
+{
+ if (sdata->state == S_SYNC) {
+ sdata->sync_count++;
+
+ log(7, "Client SYNC %d of %d",
+ sdata->sync_count, sdata->ccount - 1);
+
+ if (sdata->sync_count == sdata->ccount - 1) {
+ /* All client SYNC done */
+ sdata->state = S_SEND;
+
+ /* Wait child ready */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent I am ready, release parent */
+ sl_release_parent(&(sdata->sl));
+
+ /* Wait parent to send mcast */
+ sl_wait_parent(&(sdata->sl));
+ /* tell child mcast done, release child */
+ sl_release_child(&(sdata->sl));
+
+ send_sent_to_all_clients(sdata);
+ }
+ }
+}
+
+void handle_client_done(struct server_status_data *sdata)
+{
+ if (sdata->state == S_SEND) {
+ log(7, "Client START %d of %d",
+ sdata->sync_count, sdata->ccount - 1);
+
+ sdata->sync_count--;
+ if (sdata->sync_count == 0) {
+ /* Got all client START */
+ sdata->state = S_PREP;
+ log(7, "All received, now tell UDP Server prepare next buffer");
+ /* Remeber old buffctl.pkt_count before it change */
+ sdata->last_buff_pkt_count = buffctl.pkt_count;
+
+ /* Wait child clients done */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent all our clients are done, release parent */
+ sl_release_parent(&(sdata->sl));
+ }
+ }
+}
+
+void handle_client_events(struct server_status_data *sdata)
+{
+ struct pollfd *ds;
+ struct cdata *cd;
+ int read_out;
+ uint8_t msgtype;
+
+ ds = &(sdata->ds[sdata->cindex]);
+ cd = &(sdata->cd[sdata->cindex]);
+
+ /* read message type */
+ read_out = read(ds->fd, &msgtype, 1);
+ if (read_out <= 0) {
+ log(5, "Client disconnected due to read error %d, ret:%d (r): %s",
+ errno, read_out, inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+ return; /* continue to check other clients */
+ }
+
+ switch (msgtype) {
+ case CLIENT_READY:
+ handle_client_ready(sdata);
+ break;
+ case CLIENT_DONE:
+ handle_client_done(sdata);
+ break;
+ case CLIENT_REQ:
+ /* wait a whole struct request_ctl */
+ cd->await = sizeof(struct request_ctl);
+ break;
+ default:
+ log(4, "Wrong message type from %s",
+ inet_ntoa(cd->peer.sin_addr));
+ break;
+ }
+}
+
+void handle_pullin_event(struct server_status_data *sdata)
+{
+ struct cdata *cd;
+
+ cd = &(sdata->cd[sdata->cindex]);
+
+ /* Await is set only for repeat request */
+ if (cd->await) {
+ keep_on_receiving_client_request(sdata);
+ } else {
+ handle_client_events(sdata);
+ }
+}
+
+void handle_pullout_event(struct server_status_data *sdata)
+{
+ char buf[TCP_BUFF_SIZE];
+ struct pollfd *ds;
+ struct cdata *cd;
+ long transmit_sz;
+ void *cpy;
+ long written_in;
+
+ ds = &(sdata->ds[sdata->cindex]);
+ cd = &(sdata->cd[sdata->cindex]);
+
+ if (cd->tx->count == 0) {
+ ds->events = POLLIN; /* Just make sure */
+ return;
+ }
+
+ log(7, "handle_pullout_event servs No.%d conn", sdata->cindex);
+
+ /* If there is some data to be sent, try to do it now */
+ cpy = tcpq_dequeue_head(cd->tx, &transmit_sz);
+ memcpy(buf, cpy, transmit_sz);
+ free(cpy);
+
+ written_in = write(ds->fd, buf, transmit_sz);
+ if (written_in <= 0) {
+ log(5, "Client disconnected (w): %s",
+ inet_ntoa(cd->peer.sin_addr));
+ kill_client(sdata);
+ return;
+ }
+
+ if (written_in != transmit_sz) {
+ log(6, "Partial wrote: %ld out of %ld bytes sent",
+ written_in, transmit_sz);
+ cpy = wrapper_malloc(transmit_sz - written_in);
+ memcpy(cpy, buf + written_in, transmit_sz - written_in);
+ tcpq_queue_head(cd->tx, cpy, transmit_sz - written_in);
+ } else {
+ log(7, "Sent %ld bytes to %s",
+ written_in, inet_ntoa(cd->peer.sin_addr));
+ if (cd->tx->count == 0) {
+ /* Do not listen pollout event anymore */
+ ds->events = POLLIN;
+ }
+ }
+}
+
+void check_clients(struct server_status_data *sdata)
+{
+ int poll_events;
+ struct pollfd *ds;
+ int timeout = -1;
+
+ if (sdata->ccount == 1) {
+ log(5, "No more clients, start exiting. s,c,p:%d,%d,%d",
+ sdata->state, sdata->ccount, buffctl.pkt_count);
+ /* No client existed */
+ switch(sdata->state) {
+ case S_SYNC:
+ /* Wait all children ready */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent I am ready, release parent */
+ sl_release_parent(&(sdata->sl));
+
+ /* Wait parent to send mcast */
+ sl_wait_parent(&(sdata->sl));
+ /* Tell child mcast done, release child */
+ sl_release_child(&(sdata->sl));
+ case S_SEND:
+ sdata->state = S_PREP;
+ /* Remeber old buffctl.pkt_count before it change */
+ sdata->last_buff_pkt_count = buffctl.pkt_count;
+
+ /* Wait child clients done */
+ sl_wait_child(&(sdata->sl));
+ /* Tell parent all our clients are done */
+ sl_release_parent(&(sdata->sl));
+ }
+ /* return to main loop to finish the dummy run */
+ return;
+ }
+
+ /* poll clients only */
+ poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout);
+ if (poll_events < 0) {
+ crit("poll() failed");
+ } else if (poll_events == 0) {
+ log(2, "poll() returned with no results!");
+ return;
+ }
+
+ log(9, "poll clients: %d events", poll_events);
+
+ /* check all connected clients */
+ for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) {
+ ds = &(sdata->ds[sdata->cindex]);
+ if (!ds->revents) {
+ continue;
+ }
+
+ if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) {
+ handle_error_event(sdata);
+ } else if (ds->revents & (POLLIN|POLLPRI)) {
+ handle_pullin_event(sdata);
+ } else if (ds->revents & POLLOUT) {
+ handle_pullout_event(sdata);
+ }
+ }
+}
+
+void * tcp_server_main(void *args)
+{
+ int i;
+ struct server_status_data ssdata;
+ struct server_status_data *sdata;
+
+ sdata = &ssdata;
+ init_sdata(sdata, args);
+
+ accept_clients_may_spawn(sdata);
+
+ while (1) {
+ if (sdata->state == S_PREP) {
+ /* Wait UDP server to prepare the buffctl of this round to send */
+ sl_wait_parent(&(sdata->sl));
+ log(6, "After waiting UDP server preparation. s,c,p:%d,%d,%d",
+ sdata->state, sdata->ccount - 1, buffctl.pkt_count);
+ /* Tell children buffer pareparation done, release child */
+ sl_release_child(&(sdata->sl));
+ send_buffctl_to_all_clients(sdata);
+ sdata->state = S_SYNC;
+ }
+
+ check_clients(sdata);
+
+ if (sdata->state == S_PREP && sdata->last_buff_pkt_count == 0) {
+ log(5, "No more output, TCP server finished");
+ if (sdata->sl.this && pthread_join(sdata->pchild, 0) < 0) {
+ crit("pthread_join()");
+ }
+
+ /* shutdown conn to clients */
+ for (i = sdata->ccount - 1; i > 0; i--) {
+ shutdown((sdata->ds[i]).fd, 2);
+ }
+
+ if (!sdata->sl.this) {
+ /* leaf server */
+ close(sfd);
+ sfd = -1;
+ }
+ return 0;
+ }
+ }
+}
diff --git a/code/jasmine/server-udp.c b/code/jasmine/server-udp.c
new file mode 100755
index 00000000..c641289b
--- /dev/null
+++ b/code/jasmine/server-udp.c
@@ -0,0 +1,95 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <pthread.h>
+#include <signal.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "misc.h"
+#include "server.h"
+
+void * udp_server_main(void *args)
+{
+ pthread_t pchild;
+ struct sockaddr_in maddr;
+ int ms;
+ int i;
+ struct semlink sl;
+ char adr[128];
+ long long total = 0;
+
+ sprintf(adr, "%s%s", MCAST_ADDR_BASE, MCAST_ADDR_SUFFIX);
+ maddr = make_addr(adr, server_port);
+ mcinfo.group = maddr;
+ ms = init_mcast_socket(&local_addr, &maddr);
+
+ sl.this = spawn_thread(&pchild, tcp_server_main, 1);
+ sl.parent = NULL;
+
+ buffer_init();
+
+ /* Let TCP servers accept connections */
+ sl_release_child(&sl);
+ /* Wait until all TCP servers got all their clients */
+ sl_wait_child(&sl);
+
+ log(7, "UDP Server: All clients were accepted");
+
+ do {
+ /* one buffer round */
+
+ i = buffer_fill(STDIN_FILENO);
+
+ if (!client_count) {
+ /* No need to send, make it the last run of main loop, also let
+ TCP Server to do dummy run right away(not waiting until all
+ data sent). */
+ i = 0;
+ buffctl.pkt_count = 0;
+ }
+
+ log(7, "Signal TCP Server to send buffctl");
+
+ /* Tell tcp to Send headers, release child */
+ sl_release_child(&sl);
+ /* Wait all clients ready to start receiving */
+ sl_wait_child(&sl);
+
+ /* multicast data */
+ while (i--) {
+ sendto(ms, packetctl[i],
+ packetctl[i]->data_size + sizeof(struct packet_ctl), 0,
+ (struct sockaddr *)&maddr, sizeof(maddr));
+ }
+
+ log(7, "send finish");
+ /* Tell tcp to Send SERVER_SENT, release child */
+ sl_release_child(&sl);
+ /* wait tcp to send SERVER_SENT */
+ sl_wait_child(&sl);
+
+ total = total + buffctl.buffer_size;
+ log(1, "Buffer Done: sent %lld bytes", total);
+ } while (buffctl.pkt_count);
+
+
+ if (pthread_join(pchild, 0) < 0) {
+ crit("pthread_join() TCP Server");
+ }
+
+ log(1, "All Done");
+ close(ms);
+ return 0;
+}
diff --git a/code/jasmine/server.c b/code/jasmine/server.c
new file mode 100755
index 00000000..4a9a3279
--- /dev/null
+++ b/code/jasmine/server.c
@@ -0,0 +1,123 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <pthread.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "misc.h"
+#include "server.h"
+
+/* Global variables shared among server*.c */
+int wait_count = 0;
+int client_count = 0;
+int server_port = DEF_PORT; /* Currently for both UDP and TCP */
+struct in_addr local_addr;
+/* listen socket - global for all TCP threads */
+int sfd = -1;
+
+void init_semaphores(struct semaphores *sp)
+{
+ if (sem_init(&sp->wait_parent, 0, 0) < 0) {
+ crit("sem_init() wait_parent");
+ }
+
+ if (sem_init(&sp->wait_child, 0, 0) < 0) {
+ crit("sem_init() wait_child");
+ }
+}
+
+void sl_wait_child(struct semlink *sl)
+{
+ if (sl->this) {
+ P(sl->this->wait_child);
+ }
+}
+
+void sl_release_child(struct semlink *sl)
+{
+ if (sl->this) {
+ V(sl->this->wait_parent);
+ }
+}
+
+void sl_wait_parent(struct semlink *sl)
+{
+ P(sl->parent->wait_parent);
+}
+
+void sl_release_parent(struct semlink *sl)
+{
+ V(sl->parent->wait_child);
+}
+
+/* Wrapper for pthread_create, additionaly may initialize/pass thru
+ a semaphores parameter */
+struct semaphores * spawn_thread(pthread_t *pchild,
+ void *(*entry)(void *),
+ int create_sems)
+{
+ struct semaphores *sp = NULL;
+
+ if (create_sems != 0) {
+ sp = (struct semaphores *) wrapper_malloc(sizeof(struct semaphores));
+ init_semaphores(sp);
+ }
+
+ if (pthread_create(pchild, 0, entry, (void *)sp) != 0) {
+ crit("pthread_create");
+ }
+ return sp;
+}
+
+int main(int argc, char *argv[])
+{
+ pthread_t pchild;
+
+ log(9, "buffer size:%ld, buffer head size:%ld, data size:%ld",
+ PACKET_SIZE, sizeof(struct packet_ctl), PACKET_PAYLOAD_SIZE);
+
+ if (argc < 3) {
+ printf("Usage: %s local_ip num_of_clients [port]\n", argv[0]);
+ return -1;
+ }
+
+ if (!inet_aton(argv[1], &local_addr)) {
+ crit("can not resolve address: %s", argv[1]);
+ }
+
+ wait_count = atoi(argv[2]);
+ if (!wait_count) {
+ crit("can not serv to 0 client\n");
+ }
+
+ if (argc > 3) {
+ server_port = atoi(argv[3]);
+ if (!server_port) {
+ server_port = DEF_PORT;
+ }
+ }
+
+ /* sfd is shared by all TCP threads as sdata->ds[0].fd */
+ sfd = init_tcp_server_socket(0, server_port);
+ set_nonblock(sfd);
+
+ /* start UDP servers thread */
+ (void)spawn_thread(&pchild, udp_server_main, 0);
+ if (pthread_join(pchild, 0) < 0) {
+ crit("pthread_join() UDP Server");
+ }
+
+ return 0;
+}
diff --git a/code/jasmine/server.h b/code/jasmine/server.h
new file mode 100755
index 00000000..23dbed54
--- /dev/null
+++ b/code/jasmine/server.h
@@ -0,0 +1,63 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_SERVER_H
+#define _MCAST_SERVER_H
+
+#include <pthread.h>
+#include <semaphore.h>
+
+struct semaphores {
+ sem_t wait_parent;
+ sem_t wait_child;
+};
+
+struct semlink {
+ struct semaphores *this; /* used by parent to point to the struct semaphores
+ which it created during spawn child. */
+ struct semaphores *parent; /* used by child to point to the struct
+ semaphores which it created by parent */
+};
+
+void sl_wait_child(struct semlink *sl);
+void sl_release_child(struct semlink *sl);
+void sl_wait_parent(struct semlink *sl);
+void sl_release_parent(struct semlink *sl);
+
+/* Server state machine */
+#define S_PREP 0
+#define S_SYNC 1
+#define S_SEND 2
+
+extern int wait_count;
+extern int client_count;
+extern int server_port;
+extern struct in_addr local_addr;
+extern int sfd;
+
+void *udp_server_main(void *args);
+void *tcp_server_main(void *args);
+void init_semaphores(struct semaphores *sp);
+struct semaphores * spawn_thread(pthread_t *pchild,
+ void *(*entry)(void *),
+ int create_sems);
+
+#define V(x) do { \
+ if (sem_post(&x) < 0) { \
+ crit("sem_post()"); \
+ } \
+} while(0)
+
+#define P(x) do { \
+ if (sem_wait(&x) != 0) { \
+ crit("sem_wait()"); \
+ } \
+} while(0)
+
+#endif
diff --git a/code/jasmine/tcp-common.c b/code/jasmine/tcp-common.c
new file mode 100755
index 00000000..69a57ba0
--- /dev/null
+++ b/code/jasmine/tcp-common.c
@@ -0,0 +1,60 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "tcp-common.h"
+#include "misc.h"
+
+int init_tcp_server_socket(char *addr, unsigned short port)
+{
+ int s;
+ struct sockaddr_in sin;
+
+ port = port ? port : TCP_DPORT;
+ sin = make_addr(addr, port);
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ crit("socket() failed");
+ }
+
+ if (bind(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) {
+ crit("bind() failed, error binding tcp socket");
+ }
+
+ if (listen(s, 5) < 0) {
+ crit("listen() failed, error listening on socket");
+ }
+
+ log(4, "init_tcp_server_socket %s:%d", inet_ntoa(sin.sin_addr), port);
+ return s;
+}
+
+int init_tcp_client_socket(char *addr, unsigned short port)
+{
+ int s;
+ struct sockaddr_in sin;
+
+ port = port ? port : TCP_DPORT;
+ sin = make_addr(addr, port);
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ crit("socket() failed");
+ }
+
+ if (connect(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) {
+ crit("connect() failed");
+ }
+
+ log(4, "init_tcp_client_socket %s:%d", inet_ntoa(sin.sin_addr), port);
+ return s;
+}
diff --git a/code/jasmine/tcp-common.h b/code/jasmine/tcp-common.h
new file mode 100755
index 00000000..6ac48e1c
--- /dev/null
+++ b/code/jasmine/tcp-common.h
@@ -0,0 +1,18 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_TCP_COMMON_H
+#define _MCAST_TCP_COMMON_H
+
+#define TCP_DPORT DEF_PORT
+
+int init_tcp_server_socket(char *addr, unsigned short port);
+int init_tcp_client_socket(char *addr, unsigned short port);
+
+#endif
diff --git a/code/jasmine/tcp-queue.c b/code/jasmine/tcp-queue.c
new file mode 100755
index 00000000..63a57f2e
--- /dev/null
+++ b/code/jasmine/tcp-queue.c
@@ -0,0 +1,178 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+/* TCP poll message single linked queue */
+#include <stdlib.h>
+#include <string.h>
+
+#include "misc.h"
+#include "tcp-queue.h"
+
+void tcpq_queue_tail(struct tcpq *q, void *data, long size)
+{
+ struct qmsg *tmp;
+
+ if (!q) {
+ return;
+ }
+
+ tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg));
+ tmp->next = NULL;
+
+ if (!q->head) {
+ q->head = q->tail = tmp;
+ } else {
+ q->tail->next = tmp;
+ q->tail = tmp;
+ }
+
+ q->tail->data = data;
+ q->tail->size = size;
+ q->size += size;
+ q->count++;
+}
+
+void tcpq_queue_head(struct tcpq *q, void *data, long size)
+{
+ struct qmsg *tmp;
+
+ if (!q) {
+ return;
+ }
+
+ tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg));
+ tmp->next = NULL;
+
+ if (!q->head) {
+ q->head = q->tail = tmp;
+ } else {
+ tmp->next = q->head;
+ q->head = tmp;
+ }
+
+ q->head->data = data;
+ q->head->size = size;
+ q->size += size;
+ q->count++;
+}
+
+void * tcpq_dequeue_head(struct tcpq *q, long *size)
+{
+ void *res = NULL;
+ struct qmsg *tmp;
+
+ if (q && q->head) {
+ res = q->head->data;
+ *size = q->head->size;
+
+ tmp = q->head;
+ q->head = q->head->next;
+ if (!q->head) {
+ q->tail = NULL;
+ }
+
+ free(tmp);
+ q->count--;
+ q->size -= *size;
+ }
+ return res;
+}
+
+void * tcpq_queue_peek(struct tcpq *q, long *size)
+{
+ if (q && q->head) {
+ *size = q->head->size;
+ return q->head->data;
+ }
+
+ return NULL;
+}
+
+long tcpq_queue_dsize(struct tcpq *q)
+{
+ if (q) {
+ return q->size;
+ }
+
+ return 0;
+}
+
+void tcpq_queue_free(struct tcpq *q)
+{
+ struct qmsg *tmp;
+
+ if (!q) {
+ return;
+ }
+
+ while (q->head) {
+ tmp = q->head;
+ q->head = tmp->next;
+ free(tmp->data);
+ free(tmp);
+ }
+
+ q->count = 0;
+ q->size = 0;
+ q->head = q->tail = NULL;
+}
+
+struct tcpq * tcpq_queue_init(void)
+{
+ struct tcpq *q = wrapper_malloc(sizeof(struct tcpq));
+
+ q->count = 0;
+ q->size = 0;
+ q->head = q->tail = NULL;
+ return q;
+}
+
+void * tcpq_dqueue_flat(struct tcpq *q, long *size)
+{
+ void *res;
+ struct qmsg *tmp;
+ long offs = 0;
+
+ if (!q || q->count == 0) {
+ return NULL;
+ }
+
+ res = wrapper_malloc(q->size);
+ *size = q->size;
+
+ while (q->head) {
+ memcpy(res + offs, q->head->data, q->head->size);
+ offs += q->head->size;
+
+ tmp = q->head;
+ q->head = tmp->next;
+ free(tmp->data);
+ free(tmp);
+ }
+ tcpq_queue_free(q);
+ return res;
+}
+
+void * tcpq_queue_flat_peek(struct tcpq *q, long *size)
+{
+ void *cpy;
+
+ if (!q) {
+ return NULL;
+ }
+
+ if (q->count > 1) {
+ cpy = tcpq_dqueue_flat(q, size);
+ tcpq_queue_tail(q, cpy, *size); /* use tcpq_queue_head is also OK */
+ } else {
+ cpy = tcpq_queue_peek(q, size);
+ }
+
+ return cpy;
+}
diff --git a/code/jasmine/tcp-queue.h b/code/jasmine/tcp-queue.h
new file mode 100755
index 00000000..4f096f85
--- /dev/null
+++ b/code/jasmine/tcp-queue.h
@@ -0,0 +1,35 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_TCP_QUEUE_H
+#define _MCAST_TCP_QUEUE_H
+
+struct tcpq {
+ struct qmsg *head, *tail;
+ long count; /* message count in a queue */
+ long size; /* Total data size of a queue */
+};
+
+struct qmsg {
+ struct qmsg *next;
+ void *data;
+ long size;
+};
+
+struct tcpq * tcpq_queue_init(void);
+void tcpq_queue_free(struct tcpq *q);
+long tcpq_queue_dsize(struct tcpq *q);
+void tcpq_queue_tail(struct tcpq *q, void *data, long size);
+void tcpq_queue_head(struct tcpq *q, void *data, long size);
+void * tcpq_dequeue_head(struct tcpq *q, long *size);
+void * tcpq_queue_peek(struct tcpq *q, long *size);
+void * tcpq_dqueue_flat(struct tcpq *q, long *size);
+void * tcpq_queue_flat_peek(struct tcpq *q, long *size);
+
+#endif
diff --git a/code/jasmine/udp-common.c b/code/jasmine/udp-common.c
new file mode 100755
index 00000000..c7314226
--- /dev/null
+++ b/code/jasmine/udp-common.c
@@ -0,0 +1,97 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#include <fcntl.h>
+#include <net/if.h>
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+#include "buffer.h"
+#include "udp-common.h"
+#include "misc.h"
+
+struct mc_info mcinfo;
+
+int init_mcast_socket(struct in_addr *local_addr,
+ struct sockaddr_in *maddr)
+{
+ struct ip_mreqn mreqn; /* multicast request new */
+ int ms; /* multicast socket */
+ u_short msock_port;
+ struct sockaddr_in bind_addr;
+
+ int msock_reuse = MCAST_REUSE;
+ u_char msock_loop = MCAST_LOOP;
+ u_char msock_ttl = MCAST_TTL;
+
+ if ((ms = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
+ crit("socket() failed, error allocating multicast socket");
+ }
+
+ // In order to let client and server on same host to use same port
+ if (setsockopt(ms, SOL_SOCKET, SO_REUSEADDR,
+ &msock_reuse, sizeof(msock_reuse)) < 0) {
+ crit("setsockopt() failed, can't set reuse flag");
+ }
+
+ if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_TTL,
+ &msock_ttl,sizeof(msock_ttl)) < 0) {
+ crit("setsockopt() failed, can't set ttl value");
+ }
+
+ if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_LOOP,
+ &msock_loop, sizeof(msock_loop)) < 0) {
+ crit("setsockopt() failed, can't set multicast packet looping");
+ }
+
+ /* TODO: Do we neet non-block? */
+
+ log(4, "Using multicast address: %s:%d",
+ inet_ntoa(maddr->sin_addr), ntohs(maddr->sin_port));
+
+ mreqn.imr_multiaddr = maddr->sin_addr;
+ mreqn.imr_address = *local_addr;
+ mreqn.imr_ifindex = 0;
+
+ /* Interface and remote mcast address choosing */
+
+ /* This tell interface which has mreqn.imr_address to join
+ mreqn.imr_multiaddr group, it has nothing to do with socket, port, and
+ mreqn.imr_address. */
+ if (setsockopt(ms, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+ &mreqn, sizeof(mreqn)) < 0) {
+ crit("setsockopt() failed, %s can't join multicast group",
+ inet_ntoa(*local_addr));
+ }
+
+ /* local address choosing */
+
+ /* This tell kernel to use interface which has mreqn.imr_address as device
+ and mreqn.imr_address as source addr when sending any multicast through
+ socket. */
+ if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_IF,
+ &mreqn, sizeof(mreqn)) < 0) {
+ crit("setsockopt() failed to IP_MULTICAST_IF.\n");
+ }
+
+ /* Local port choosing, remote port is choosed when calling sendto()! */
+
+ /* This let us uses specific udp port number as source port when sending
+ any multicast datagrams. So ip in maddr is useless here and should be
+ INADDR_ANY, otherwise, bind will return Invalid argument. */
+ msock_port = ntohs(maddr->sin_port);
+ bind_addr = make_addr(0, msock_port);
+ if (bind(ms, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) {
+ crit("bind() error, can't bind mcast port");
+ }
+
+ return ms;
+}
diff --git a/code/jasmine/udp-common.h b/code/jasmine/udp-common.h
new file mode 100755
index 00000000..861545b4
--- /dev/null
+++ b/code/jasmine/udp-common.h
@@ -0,0 +1,42 @@
+/*##############################################################################
+# Copyright (c) 2017 ZTE Coreporation and others.
+# hu.zhijiang@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################*/
+
+#ifndef _MCAST_UDP_COMMON_H
+#define _MCAST_UDP_COMMON_H
+
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/udp.h>
+
+#define MCAST_DPORT DEF_PORT
+
+#define MCAST_TTL 8
+/*
+ * Force reuse
+ */
+#define MCAST_REUSE 1
+
+/*
+ * Loop to let server host itself can receive data too.
+ */
+#define MCAST_LOOP 1
+#define MCAST_ADDR_BASE "224.238.0."
+#define MCAST_ADDR_SUFFIX "31"
+
+// Initial data
+struct mc_info {
+ struct sockaddr_in group;
+};
+
+extern struct mc_info mcinfo;
+
+// How to capture udp packets: tcpdump -i ethx udp -vv -n
+
+int init_mcast_socket(struct in_addr *local_addr, struct sockaddr_in *maddr);
+#endif
diff --git a/deploy/check_openstack_progress.sh b/deploy/check_openstack_progress.sh
index 3512d6bc..f0ab1e90 100755
--- a/deploy/check_openstack_progress.sh
+++ b/deploy/check_openstack_progress.sh
@@ -44,8 +44,9 @@ while true; do
fi
count=$[count + 1]
- openstack_install_active=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $12}' | grep -c "active" `
- openstack_install_failed=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $12}' | grep -c "install-failed" `
+ # get 'Role_status' column
+ openstack_install_active=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $13}' | grep -c "active" `
+ openstack_install_failed=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $13}' | grep -c "install-failed" `
if [ $openstack_install_active -eq $hosts_num ]; then
echo "openstack installing successful ..."
break
@@ -54,7 +55,8 @@ while true; do
tail -n 200 /var/log/daisy/kolla_$cluster_id*
exit 1
else
- progress=`daisy host-list --cluster-id $cluster_id |grep DISCOVERY_SUCCESSFUL |awk -F "|" '{print $11}'|sed s/[[:space:]]//g|sed ':a;N;$ s/\n/ /g;ba'`
+ # get 'Role_progress' column
+ progress=`daisy host-list --cluster-id $cluster_id |grep DISCOVERY_SUCCESSFUL |awk -F "|" '{print $12}'|sed s/[[:space:]]//g|sed ':a;N;$ s/\n/ /g;ba'`
echo " openstack in installing , progress of each node is $progress%"
sleep 30
fi
diff --git a/deploy/config/bm_environment/zte-baremetal1/deploy.yml b/deploy/config/bm_environment/zte-baremetal1/deploy.yml
index eafff5fd..58516e88 100644
--- a/deploy/config/bm_environment/zte-baremetal1/deploy.yml
+++ b/deploy/config/bm_environment/zte-baremetal1/deploy.yml
@@ -21,3 +21,4 @@ disks:
daisy_passwd: 'r00tme'
daisy_ip: '10.20.7.3'
daisy_gateway: '10.20.7.1'
+ceph_disk_name: '/dev/sdb'
diff --git a/deploy/config/vm_environment/zte-virtual1/deploy.yml b/deploy/config/vm_environment/zte-virtual1/deploy.yml
index 1bf254f5..12fa1690 100644
--- a/deploy/config/vm_environment/zte-virtual1/deploy.yml
+++ b/deploy/config/vm_environment/zte-virtual1/deploy.yml
@@ -10,4 +10,4 @@ disks:
daisy_passwd: 'r00tme'
daisy_ip: '10.20.11.2'
daisy_gateway: '10.20.11.1'
-deploy_env: 'virtual' \ No newline at end of file
+ceph_disk_name: '/dev/sdb'
diff --git a/deploy/get_conf.py b/deploy/get_conf.py
index a2d7bf6a..37da2beb 100755
--- a/deploy/get_conf.py
+++ b/deploy/get_conf.py
@@ -91,7 +91,8 @@ def dha_config_parse(s, dha_file):
def config(dha_file, network_file):
data = init(dha_file)
+ ceph_disk_name = data.get('ceph_disk_name')
hosts_name = dha_config_parse(data, dha_file)
data = init(network_file)
network_map, vip, interface_map = network_config_parse(data, network_file)
- return interface_map, hosts_name, network_map, vip
+ return interface_map, hosts_name, network_map, vip, ceph_disk_name
diff --git a/deploy/tempest.py b/deploy/tempest.py
index 263e62e0..bd1bc04f 100644
--- a/deploy/tempest.py
+++ b/deploy/tempest.py
@@ -66,7 +66,7 @@ def prepare_install():
print("get config...")
conf = cfg.ConfigOpts()
parse(conf, sys.argv[1:])
- host_interface_map, hosts_name, network_map, vip = \
+ host_interface_map, hosts_name, network_map, vip, ceph_disk_name = \
get_conf.config(conf['dha'], conf['network'])
if conf['cluster'] and conf['cluster'] == 'yes':
print("add cluster...")
@@ -88,6 +88,17 @@ def prepare_install():
cluster_id = cluster_info.id
add_hosts_interface(cluster_id, hosts_info, hosts_name,
host_interface_map, vip)
+ if len(hosts_name) == 1:
+ protocol_type = 'LVM'
+ service_name = 'cinder'
+ elif len(hosts_name) > 2:
+ protocol_type = 'RAW'
+ service_name = 'ceph'
+ else:
+ print('hosts_num is %s' % len(hosts_name))
+ protocol_type = None
+ enable_cinder_backend(cluster_id, service_name,
+ ceph_disk_name, protocol_type)
if 'isbare' in conf and conf['isbare'] == 0:
install_os_for_vm_step1(cluster_id)
else:
@@ -117,9 +128,7 @@ def install_os_for_vm_step1(cluster_id):
def install_os_for_bm_oneshot(cluster_id):
- cluster_meta = {'cluster_id': cluster_id,
- 'pxe_only': "false",
- 'skip_pxe_ipmi': "false"}
+ cluster_meta = {'cluster_id': cluster_id}
client.install.install(**cluster_meta)
@@ -217,5 +226,22 @@ def add_host_role(cluster_id, host_id, host_exp_name, host_real_name, vip):
client.roles.update(computer_role_id, **role_computer_update_meta)
+def enable_cinder_backend(cluster_id, service_name, disk_name, protocol_type):
+ role_meta = {'filters': {'cluster_id': cluster_id}}
+ role_list_generator = client.roles.list(**role_meta)
+ role_list = [role for role in role_list_generator]
+ lb_role_id = [role.id for role in role_list if
+ role.name == "CONTROLLER_LB"][0]
+ service_disk_meta = {'service': service_name,
+ 'disk_location': 'local',
+ 'partition': disk_name,
+ 'protocol_type': protocol_type,
+ 'role_id': lb_role_id}
+ try:
+ client.disk_array.service_disk_add(**service_disk_meta)
+ except Exception as e:
+ print e
+
+
if __name__ == "__main__":
prepare_install()
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 6026db15..06002bff 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -15,6 +15,10 @@ RUN yum -y install \
make \
rpm \
rpm-build \
+ gcc \
+ autoconf \
+ automake \
+ glibc-devel \
python-sphinx \
python-XStatic-Angular \
python-XStatic-Angular-Bootstrap \
diff --git a/docs/developer/design/multicast.rst b/docs/developer/design/multicast.rst
new file mode 100644
index 00000000..89422fe6
--- /dev/null
+++ b/docs/developer/design/multicast.rst
@@ -0,0 +1,278 @@
+Detailed Design
+===============
+
+Protocol Design
+---------------
+
+1. All Protocol headers are 1 byte long or align to 4 bytes.
+2. Packet size should not exceed above 1500(MTU) bytes including UDP/IP header and should
+be align to 4 bytes. In future, MTU can be modified larger than 1500(Jumbo Frame) through
+cmd line option to enlarge the data throughput.
+
+/* Packet header definition (align to 4 bytes) */
+struct packet_ctl {
+ uint32_t seq; // packet seq number start from 0, unique in server life cycle.
+ uint32_t crc; // checksum
+ uint32_t data_size; // payload length
+ uint8_t data[0];
+};
+
+/* Buffer info definition (align to 4 bytes) */
+struct buffer_ctl {
+ uint32_t buffer_id; // buffer seq number start from 0, unique in server life cycle.
+ uint32_t buffer_size; // payload total length of a buffer
+ uint32_t packet_id_base; // seq number of the first packet in this buffer.
+ uint32_t pkt_count; // number of packet in this buffer, 0 means EOF.
+};
+
+
+3. 1-byte-long header definition
+
+Signals such as the four below are 1 byte long, to simplify the receive process(since it
+cannot be spitted ).
+
+#define CLIENT_READY 0x1
+#define CLIENT_REQ 0x2
+#define CLIENT_DONE 0x4
+#define SERVER_SENT 0x8
+
+Note: Please see the collaboration diagram for their meanings.
+
+4. Retransmission Request Header
+
+/* Retransmition Request Header (align to 4 bytes) */
+struct request_ctl {
+ uint32_t req_count; // How many seqs below.
+ uint32_t seqs[0]; // packet seqs.
+};
+
+5. Buffer operations
+
+void buffer_init(); // Init the buffer_ctl structure and all(say 1024) packet_ctl
+structures. Allocate buffer memory.
+long buffer_fill(int fd); // fill a buffer from fd, such as stdin
+long buffer_flush(int fd); // flush a buffer to fd, say stdout
+struct packet_ctl *packet_put(struct packet_ctl *new_pkt);// put a packet to a buffer
+and return a free memory slot for the next packet.
+struct packet_ctl *packet_get(uint32_t seq);// get a packet data in buffer by
+indicating the packet seq.
+
+
+How to sync between server threads
+----------------------------------
+
+If children's aaa() operation need to wait the parents's init() to be done, then do it
+literally like this:
+
+ UDP Server
+ TCP Server1 = spawn( )----> TCP Server1
+ init()
+ TCP Server2 = spawn( )-----> TCP Server2
+ V(sem)----------------------> P(sem) // No child any more
+ V(sem)---------------------> P(sem)
+ aaa() // No need to V(sem), for no child
+ aaa()
+
+If parent's send() operation need to wait the children's ready() done, then do it
+literally too, but is a reverse way:
+
+ UDP Server TCP Server1 TCP Server2
+ // No child any more
+ ready() ready()
+ P(sem) <--------------------- V(sem)
+ P(sem) <------------------ V(sem)
+ send()
+
+Note that the aaa() and ready() operations above run in parallel. If this is not the
+case due to race condition, the sequence above can be modified into this below:
+
+ UDP Server TCP Server1 TCP Server2
+ // No child any more
+ ready()
+ P(sem) <--------------------- V(sem)
+ ready()
+ P(sem) <------------------- V(sem)
+ send()
+
+
+In order to implement such chained/zipper sync pattern, a pair of semaphores is
+needed between the parent and the child. One is used by child to wait parent , the
+other is used by parent to wait child. semaphore pair can be allocated by parent
+and pass the pointer to the child over spawn() operation such as pthread_create().
+
+/* semaphore pair definition */
+struct semaphores {
+ sem_t wait_parent;
+ sem_t wait_child;
+};
+
+Then the semaphore pair can be recorded by threads by using the semlink struct below:
+struct semlink {
+ struct semaphores *this; /* used by parent to point to the struct semaphores
+ which it created during spawn child. */
+ struct semaphores *parent; /* used by child to point to the struct
+ semaphores which it created by parent */
+};
+
+chained/zipper sync API:
+
+void sl_wait_child(struct semlink *sl);
+void sl_release_child(struct semlink *sl);
+void sl_wait_parent(struct semlink *sl);
+void sl_release_parent(struct semlink *sl);
+
+API usage is like this.
+
+Thread1(root parent) Thread2(child) Thread3(grandchild)
+sl_wait_parent(noop op)
+sl_release_child
+ +---------->sl_wait_parent
+ sl_release_child
+ +-----------> sl_wait_parent
+ sl_release_child(noop op)
+ ...
+ sl_wait_child(noop op)
+ + sl_release_parent
+ sl_wait_child <-------------
+ + sl_release_parent
+sl_wait_child <------------
+sl_release_parent(noop op)
+
+API implementation:
+
+void sl_wait_child(struct semlink *sl)
+{
+ if (sl->this) {
+ P(sl->this->wait_child);
+ }
+}
+
+void sl_release_child(struct semlink *sl)
+{
+ if (sl->this) {
+ V(sl->this->wait_parent);
+ }
+}
+
+void sl_wait_parent(struct semlink *sl)
+{
+ if (sl->parent) {
+ P(sl->parent->wait_parent);
+ }
+}
+
+void sl_release_parent(struct semlink *sl)
+{
+ if (sl->parent) {
+ V(sl->parent->wait_child);
+ }
+}
+
+Client flow chart
+-----------------
+See Collaboration Diagram
+
+UDP thread flow chart
+---------------------
+See Collaboration Diagram
+
+TCP thread flow chart
+---------------------
+
+
+S_INIT --- (UDP initialized) ---> S_ACCEPT --- (accept clients) --+
+ |
+ /----------------------------------------------------------------/
+ V
+S_PREP --- (UDP prepared abuffer)
+ ^ |
+ | \--> S_SYNC --- (clients ClIENT_READY)
+ | |
+ | \--> S_SEND --- (clients CLIENT_DONE)
+ | |
+ | V
+ \---------------(bufferctl.pkt_count != 0)-----------------------+
+ |
+ V
+ exit() <--- (bufferctl.pkt_count == 0)
+
+
+TCP using poll and message queue
+--------------------------------
+
+TCP uses poll() to sync with client's events as well as output event from itself, so
+that we can use non-block socket operations to reduce the latency. POLLIN means there
+are message from client and POLLOUT means we are ready to send message/retransmission
+packets to client.
+
+poll main loop pseudo code:
+void check_clients(struct server_status_data *sdata)
+{
+ poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout);
+
+ /* check all connected clients */
+ for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) {
+ ds = &(sdata->ds[sdata->cindex]);
+ if (!ds->revents) {
+ continue;
+ }
+
+ if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) {
+ handle_error_event(sdata);
+ } else if (ds->revents & (POLLIN|POLLPRI)) {
+ handle_pullin_event(sdata); // may set POLLOUT into ds->events
+ // to trigger handle_pullout_event().
+ } else if (ds->revents & POLLOUT) {
+ handle_pullout_event(sdata);
+ }
+ }
+}
+
+For TCP, since the message from client may not complete and send data may be also
+interrupted due to non-block fashion, there should be one send message queue and a
+receive message queue on the server side for each client (client do not use non-block
+operations).
+
+TCP message queue definition:
+
+struct tcpq {
+ struct qmsg *head, *tail;
+ long count; /* message count in a queue */
+ long size; /* Total data size of a queue */
+};
+
+TCP message queue item definition:
+
+struct qmsg {
+ struct qmsg *next;
+ void *data;
+ long size;
+};
+
+TCP message queue API:
+
+// Allocate and init a queue.
+struct tcpq * tcpq_queue_init(void);
+
+// Free a queue.
+void tcpq_queue_free(struct tcpq *q);
+
+// Return queue length.
+long tcpq_queue_dsize(struct tcpq *q);
+
+// queue new message to tail.
+void tcpq_queue_tail(struct tcpq *q, void *data, long size);
+
+// queue message that cannot be sent currently back to queue head.
+void tcpq_queue_head(struct tcpq *q, void *data, long size);
+
+// get one piece from queue head.
+void * tcpq_dequeue_head(struct tcpq *q, long *size);
+
+// Serialize all pieces of a queue, and move it out of queue, to ease the further
+//operation on it.
+void * tcpq_dqueue_flat(struct tcpq *q, long *size);
+
+// Serialize all pieces of a queue, do not move it out of queue, to ease the further
+//operation on it.
+void * tcpq_queue_flat_peek(struct tcpq *q, long *size);
diff --git a/docs/developer/spec/multicast.rst b/docs/developer/spec/multicast.rst
new file mode 100644
index 00000000..ba314d3a
--- /dev/null
+++ b/docs/developer/spec/multicast.rst
@@ -0,0 +1,190 @@
+Requirement
+===========
+1. When deploying a large OPNFV/OpenStack cluster, we would like to take the advantage of UDP
+multicast to prevent the network bottleneck when distributing Kolla container from one
+Installer Server to all target hosts by using unicast.
+
+2. When it comes to auto scaling (extension) of compute nodes, use unicast is acceptable, since
+the number of nodes in this condition is usually small.
+
+The basic step to introduce multicast to deployment is:
+a. Still setup the monopolistic docker registry server on Daisy server as a failsafe.
+b. Daisy server, as the multicast server, prepares the image file to be transmitted, and count
+how many target hosts(as the multicast clients)that should receive the image file
+simultaneously.
+c. Multicast clients tell the multicast server about ready to receive the image.
+d. Multicast server transmits image over UDP multicast channel.
+e. Multicast clients report success after received the whole image.
+f. Setup docker registry server on each target hosts based upon received docker image.
+g. Setup Kolla ansible to use 127.0.0.1 as the registry server IP so that the real docker
+container retrieving network activities only take place inside target hosts.
+
+
+Design
+======
+
+Methods to achieve
+------------------
+
+TIPC
+++++
+
+TIPC or its wrapper such as ZeroMQ is good at multicast, but it is not suitable as an
+installer:
+1. The default TIPC kernel module equipped by CentOS7(kernel verison 3.10) is NOT stable
+especially in L3 multicast(although we can use L2 multicast, but the network will be limited to
+L2). If errors happen, it is hard for us to recover a node from kernel panic.
+
+2. TIPC's design is based on a stable node cluster environment, esp in Lossless Ethernet. But
+the real environment is generally not in that case. When multicast is broken, Installer should
+switch to unicast, but TIPC currently do not have such capability.
+
+Top level design
+----------------
+1. There are two kinds of thread on the server side, one is UDP multicast thread the other is
+TCP sync/retransmit thread. There will be more than one TCP threads since one TCP thread can
+only serve a limited client (say 64~128) in order to limit the CPU load and unicast retransmit
+network usage.
+
+2. There is only one thread on client side.
+
+3. All the packets that a client lost during UDP multicast will be request by client to the TCP
+thread and resend by using TCP unicast, if unicast still cannot deliver the packets successfully,
+the client will failback to using the monopolistic docker registry server on Daisy server as a
+failsafe option.
+
+4. Each packet needs checksum.
+
+
+UDP Server Design (runs on Daisy Server)
+----------------------------------------
+
+1. Multicast group IP and Port should be configurable, as well as the interface that will be
+used as the egress of the multicast packets. The user will pass the interface's IP as the
+handle to find the egress.
+
+2. Image data to be sent is passed to server through stdin.
+
+3. Consider the size of image is large (xGB), the server cannot pre-allocate whole buffer to
+hold all image at once. Besides, since the data is from stdin and the actual length is
+unpredictable. So the server should split the data into small size buffers and send to the
+clients one by one. Furthermore, buffer shall be divided into packets which size is MTU
+including the UDP/IP header. Then the buffer size can be , for example 1024 * MTU including the
+UDP/IP header.
+
+4. After sending one buffer to client the server should stop and get feedback from client to
+see if all clients have got all packets in that buffer. If any clients lost any buffer, client
+should request the server to resend packets from a more stable way(TCP).
+
+5. when got the EOF from stdin, server should send a buffer which size is 0 as an EOF signal to
+the client to let it know about the end of sending.
+
+
+TCP Server Design (runs on Daisy Server)
+----------------------------------------
+
+1. All TCP server threads and the only one UDP thread share one process. The UDP thread is the
+parent thread, and the first TCP thread is the child, while the second TCP thread is the
+grandchild, and so on. Thus, for each TCP thread, there is only one parent and at most one
+child.
+
+2. TCP thread accepts the connect request from client. The number of client is predefined by
+server cmdline parameter. Each TCP thread connect with at most ,say 64 clients, if there are
+more clients to be connected to, then a child TCP thread is spawned by the parent.
+
+3. Before UDP thread sending any buffer to client, all TCP threads should send UDP multicast
+IP/Port information to their clients beforehand.
+
+4. During each buffer sending cycle, TCP threads send a special protocol message to tell
+clients about the size/id of the buffer and id of each packet in it. After getting
+acknowledgements from all clients, TCP threads then signal the UDP thread to start
+multicasting buffer over UDP. After multicasting finished, TCP threads notifies clients
+multicast is done, and wait acknowledgements from clients again. If clients requests
+retransmission, then it is the responsibility of TCP threads to resend packets over unicast.
+If no retransmission needed, then clients should signal TCP threads that they are ready for
+the next buffer to come.
+
+5. Repeat step 4 if buffer size is not 0 in the last round, otherwise, TCP server shutdown
+connection and exit.
+
+
+Server cmdline usage example
+----------------------------
+
+./server <local_ip> <number_of_clients> [port] < kolla_image.tgz
+
+<local_ip> is used here to specify the multicast egress interface. But which interface will be
+used by TCP is leaved to route table to decide.
+<number_of_clients> indicates the number of clients , thus the number of target hosts which
+need to receive the image.
+[port] is the port that will be used by both UDP and TCP. Default value can be used if user
+does not provide it.
+
+
+Client Design(Target Host side)
+--------------------------------
+
+1. Each target hosts has only one client process.
+
+2. Client connect to TCP server according to the cmdline parameters right after start up.
+
+3. After connecting to TCP server, client first read from TCP server the multicast group
+information which can be used to create the multicast receive socket then.
+
+4. During each buffer receiving cycle, the client first read from TCP server the buffer info,
+prepare the receive buffer, and acknowledge the TCP server that it is ready to receive. Then,
+client receive buffer from the multicast socket until TCP server notifying the end of
+multicast. By compare the buffer info and the received packets, the client knows whether to
+send the retransmission request or not and whether to wait retransmission packet or not.
+After all packets are received from UDP/TCP, the client eventually flush buffer to stdout
+and tells the TCP server about ready to receive the next buffer.
+
+5. Repeat step 4 if buffer size is not 0 in the last round, otherwise, client shutdowns
+connection and exit.
+
+Client cmdline usage example
+----------------------------
+
+./client <local_ip> <server_ip> [port] > kolla_image.tgz
+
+<local_ip> is used here to specify the multicast ingress interface. But which interface
+will be used by TCP is leaved to route table to decide.
+<server_ip> indicates the TCP server IP to be connected to.
+[port] is the port that will be used by both connect to TCP server and receive multicast
+data.
+
+
+Collaboration diagram among UDP Server, TCP Server(illustrate only one TCP thread)
+and Clients:
+
+
+UDP Server TCP Server Client
+ | | |
+init mcast group
+init mcast send socket
+ ---------------------------------->
+ accept clients
+ <------------------------connet------------------
+ --------------------send mcast group info------->
+ <----------------------------------
+ state = PREP
+do {
+read data from stdin
+prepare one buffer
+ ----------------------------------->
+ state = SYNC
+ -------------------send buffer info-------------->
+ <----------------------send ClIENT_READY-----------
+ <----------------------------------
+ state = SEND
+
+ ================================================send buffer over UDP multicast======>
+ ----------------------------------->
+ -----------------------send SERVER_SENT----------->
+ [<-------------------send CLIENT_REQUEST----------]
+ [--------------send buffer over TCP unicast------>]
+ flush buffer to stdout
+ <-------------------send CLIENT_DONE---------------
+ <----------------------------------
+ state = PREP
+while (buffer.len != 0)
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/requirements.txt
diff --git a/setup.py b/setup.py
new file mode 100644
index 00000000..d556de77
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python
+
+from setuptools import setup
+
+setup(
+ name="daisy",
+ version="master",
+ url="https://www.opnfv.org",
+)
diff --git a/test-requirements.txt b/test-requirements.txt
new file mode 100644
index 00000000..0483907e
--- /dev/null
+++ b/test-requirements.txt
@@ -0,0 +1,5 @@
+pytest
+pytest-cov
+pytest-faker
+pytest-mock
+
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unit/__init__.py
diff --git a/tests/unit/test_placeholder.py b/tests/unit/test_placeholder.py
new file mode 100644
index 00000000..457e464c
--- /dev/null
+++ b/tests/unit/test_placeholder.py
@@ -0,0 +1,2 @@
+def test_holder():
+ assert True
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 00000000..28fbf8f5
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,42 @@
+# Tox (http://tox.testrun.org/) is a tool for running tests
+# in multiple virtualenvs. This configuration file will run the
+# test suite on all supported python versions. To use it, "pip install tox"
+# and then run "tox" from this directory.
+
+[tox]
+envlist = py27,pep8
+skipsdist = True
+
+[testenv]
+usedevelop = True
+install_command = pip install -U {opts} {packages}
+deps =
+ -rrequirements.txt
+ -rtest-requirements.txt
+commands=
+ py.test \
+ --basetemp={envtmpdir} \
+ --cov \
+ --cov-report term-missing \
+ --cov-report xml \
+ {posargs}
+setenv=
+ HOME = {envtmpdir}
+ PYTHONPATH = {toxinidir}
+
+[testenv:pep8]
+deps = flake8
+commands = flake8 {toxinidir}
+
+[flake8]
+# H803 skipped on purpose per list discussion.
+# E123, E125 skipped as they are invalid PEP-8.
+
+show-source = True
+ignore = E123,E125,H803,E501
+builtins = _
+exclude = build,dist,doc,legacy,.eggs,.git,.tox,.venv
+
+[pytest]
+testpaths = tests
+python_functions = test_*