diff options
40 files changed, 2902 insertions, 28 deletions
diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..d58e2766 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,28 @@ +# .coveragerc to control coverage.py + +[run] +branch = True +source = + deploy + tests + +[report] +# Regexes for lines to exclude from consideration +exclude_lines = + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + # Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: + +ignore_errors = True + @@ -42,6 +42,7 @@ pip-delete-this-directory.txt .tox/ .coverage .cache +coverage.xml # Log files: *.log diff --git a/ci/deploy/deploy.sh b/ci/deploy/deploy.sh index b2802c5d..4ec2c2f7 100755 --- a/ci/deploy/deploy.sh +++ b/ci/deploy/deploy.sh @@ -20,7 +20,7 @@ ############################################################################ # BEGIN of usage description # -usage () +function usage { cat << EOF xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx @@ -144,18 +144,18 @@ WORKDIR=${WORKDIR:-/tmp/workdir} # set extra ssh paramters SSH_PARAS="-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no" -deploy_path=$WORKSPACE/deploy +DEPLOY_PATH=$WORKSPACE/deploy -create_qcow2_path=$WORKSPACE/tools +CREATE_QCOW2_PATH=$WORKSPACE/tools -daisy_server_net=$WORKSPACE/templates/virtual_environment/networks/daisy.xml -target_node_net=$WORKSPACE/templates/virtual_environment/networks/os-all_in_one.xml -vmdeploy_daisy_server_vm=$WORKSPACE/templates/virtual_environment/vms/daisy.xml -vmdeploy_target_node_vm=$WORKSPACE/templates/virtual_environment/vms/all_in_one.xml +VMDELOY_DAISY_SERVER_NET=$WORKSPACE/templates/virtual_environment/networks/daisy.xml +VMDEPLOY_TARGET_NODE_NET=$WORKSPACE/templates/virtual_environment/networks/os-all_in_one.xml +VMDEPLOY_DAISY_SERVER_VM=$WORKSPACE/templates/virtual_environment/vms/daisy.xml +VMDEPLOY_TARGET_NODE_VM=$WORKSPACE/templates/virtual_environment/vms/all_in_one.xml -bmdeploy_daisy_server_net=$WORKSPACE/templates/physical_environment/networks/daisy.xml -bmdeploy_daisy_server_vm=$WORKSPACE/templates/physical_environment/vms/daisy.xml +BMDEPLOY_DAISY_SERVER_NET=$WORKSPACE/templates/physical_environment/networks/daisy.xml +BMDEPLOY_DAISY_SERVER_VM=$WORKSPACE/templates/physical_environment/vms/daisy.xml PARAS_FROM_DEPLOY=`python $WORKSPACE/deploy/get_para_from_deploy.py --dha $DHA_CONF` TARGET_HOSTS_NUM=`echo $PARAS_FROM_DEPLOY | cut -d " " -f 1` @@ -173,10 +173,10 @@ if [ $DRY_RUN -eq 1 ]; then DAISY_IP: $DAISY_IP DAISY_PASSWD: $DAISY_PASSWD PARAS_IMAGE: $PARAS_IMAGE - daisy_server_net: $daisy_server_net - target_node_net: $target_node_net - vmdeploy_daisy_server_vm: $vmdeploy_daisy_server_vm - vmdeploy_target_node_vm: $vmdeploy_target_node_vm + VMDELOY_DAISY_SERVER_NET: $VMDELOY_DAISY_SERVER_NET + VMDEPLOY_TARGET_NODE_NET: $VMDEPLOY_TARGET_NODE_NET + VMDEPLOY_DAISY_SERVER_VM: $VMDEPLOY_DAISY_SERVER_VM + VMDEPLOY_TARGET_NODE_VM: $VMDEPLOY_TARGET_NODE_VM """ exit 1 fi @@ -197,6 +197,7 @@ function create_node local net_name=$2 local vms_template=$3 local vms_name=$4 + virsh net-define $net_template virsh net-autostart $net_name virsh net-start $net_name @@ -242,6 +243,7 @@ function clean_up { local vm_name=$1 local network_name=$2 + virsh destroy $vm_name virsh undefine $vm_name virsh net-destroy $network_name @@ -262,17 +264,17 @@ fi echo "=======create daisy node================" if [ $IS_BARE == 0 ];then - $create_qcow2_path/daisy-img-modify.sh -c $create_qcow2_path/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE - create_node $daisy_server_net daisy1 $vmdeploy_daisy_server_vm daisy + $CREATE_QCOW2_PATH/daisy-img-modify.sh -c $CREATE_QCOW2_PATH/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE + create_node $VMDELOY_DAISY_SERVER_NET daisy1 $VMDEPLOY_DAISY_SERVER_VM daisy else - $create_qcow2_path/daisy-img-modify.sh -c $create_qcow2_path/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE - virsh define $bmdeploy_daisy_server_vm + $CREATE_QCOW2_PATH/daisy-img-modify.sh -c $CREATE_QCOW2_PATH/centos-img-modify.sh -a $DAISY_IP $PARAS_IMAGE + virsh define $BMDEPLOY_DAISY_SERVER_VM virsh start daisy fi sleep 20 echo "====== install daisy===========" -$deploy_path/trustme.sh $DAISY_IP $DAISY_PASSWD +$DEPLOY_PATH/trustme.sh $DAISY_IP $DAISY_PASSWD ssh $SSH_PARAS $DAISY_IP "if [[ -f ${REMOTE_SPACE} || -d ${REMOTE_SPACE} ]]; then rm -fr ${REMOTE_SPACE}; fi" scp -r $WORKSPACE root@$DAISY_IP:${REMOTE_SPACE} ssh $SSH_PARAS $DAISY_IP "mkdir -p /home/daisy_install" @@ -304,7 +306,7 @@ ssh $SSH_PARAS $DAISY_IP "python ${REMOTE_SPACE}/deploy/tempest.py --dha $DHA -- echo "=====create and find node======" if [ $IS_BARE == 0 ];then qemu-img create -f qcow2 ${VM_STORAGE}/all_in_one.qcow2 200G - create_node $target_node_net daisy2 $vmdeploy_target_node_vm all_in_one + create_node $VMDEPLOY_TARGET_NODE_NET daisy2 $VMDEPLOY_TARGET_NODE_VM all_in_one sleep 20 else for i in $(seq 106 110); do diff --git a/code/jasmine/.tarball-version b/code/jasmine/.tarball-version new file mode 100644 index 00000000..21e8796a --- /dev/null +++ b/code/jasmine/.tarball-version @@ -0,0 +1 @@ +1.0.3 diff --git a/code/jasmine/Makefile.am b/code/jasmine/Makefile.am new file mode 100755 index 00000000..41acfd62 --- /dev/null +++ b/code/jasmine/Makefile.am @@ -0,0 +1,83 @@ +# Copyright (c) 2015 ZTE, Inc. + +SPEC = $(PACKAGE_NAME).spec + +TARFILE = $(PACKAGE_NAME)-$(VERSION).tar.gz + +EXTRA_DIST = autogen.sh $(SPEC).in \ + build-aux/git-version-gen \ + .version + +AUTOMAKE_OPTIONS = foreign + +ACLOCAL_AMFLAGS = -I m4 + +MAINTAINERCLEANFILES = Makefile.in aclocal.m4 configure depcomp \ + config.guess config.sub missing install-sh \ + autoheader automake autoconf \ + autoscan.log configure.scan ltmain.sh test-driver config.h.in + +noinst_HEADERS = buffer.h misc.h server.h tcp-common.h tcp-queue.h udp-common.h + +dist-clean-local: + rm -f autoconf automake autoheader + +clean-generic: + rm -rf $(SPEC) $(TARFILE) + +## make rpm/srpm section. + +$(SPEC): $(SPEC).in + rm -f $@-t $@ + ver="$(VERSION)" && \ + sed \ + -e "s#@version@#$$ver#g" \ + $< > $@-t; \ + chmod a-w $@-t + mv $@-t $@ + +$(TARFILE): + $(MAKE) dist + +RPMBUILDOPTS = --define "_sourcedir $(abs_builddir)" \ + --define "_specdir $(abs_builddir)" \ + --define "_builddir $(abs_builddir)" \ + --define "_srcrpmdir $(abs_builddir)" \ + --define "_rpmdir $(abs_builddir)" + +srpm: clean + $(MAKE) $(SPEC) $(TARFILE) + rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) --nodeps -bs $(SPEC) + +rpm: clean _version + $(MAKE) $(SPEC) $(TARFILE) + rpmbuild $(WITH_LIST) $(RPMBUILDOPTS) -ba $(SPEC) + +# release/versioning +BUILT_SOURCES = .version +.version: + echo $(VERSION) > $@-t && mv $@-t $@ + +dist-hook: + echo $(VERSION) > $(distdir)/.tarball-version + +.PHONY: _version + +_version: + cd $(srcdir) && rm -rf autom4te.cache .version && autoreconf -i + $(MAKE) $(AM_MAKEFLAGS) Makefile + +maintainer-clean-local: + rm -rf m4 + +### + +bin_PROGRAMS = jasmines jasminec + +DEFAULT_INCLUDES = -I. -I/usr/include + +jasmines_SOURCES = udp-common.c tcp-common.c buffer.c misc.c server.c server-udp.c server-tcp.c tcp-queue.c +jasminec_SOURCES = udp-common.c tcp-common.c buffer.c misc.c client.c + +jasmines_LDADD = -lpthread +jasminec_LDADD = -lpthread diff --git a/code/jasmine/README b/code/jasmine/README new file mode 100644 index 00000000..48f169ff --- /dev/null +++ b/code/jasmine/README @@ -0,0 +1,24 @@ +jasmine: Just A Small Multicast engINE + +Installation +------------ + +./autogen.sh +./configure +make && make install + +RPM Based Installation +---------------------- +./autogen.sh +./configure +make rpm +rpm -ivh ./x86_64/jasmine-1.0.3-1.rpm + +Usage +----- +jasmine server: +Usage: jasmines local_ip num_of_clients [port] + +jasmine client: +Usage: jasminec <local_ip> <server_ip> [port] + diff --git a/code/jasmine/autogen.sh b/code/jasmine/autogen.sh new file mode 100755 index 00000000..5bf25ecc --- /dev/null +++ b/code/jasmine/autogen.sh @@ -0,0 +1,5 @@ +#!/bin/sh +# Run this to generate all the initial makefiles, etc. +mkdir -p m4 +echo Building configuration system... +autoreconf -i diff --git a/code/jasmine/buffer.c b/code/jasmine/buffer.c new file mode 100755 index 00000000..0567026b --- /dev/null +++ b/code/jasmine/buffer.c @@ -0,0 +1,152 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <stdlib.h> +#include <string.h> + +#include "buffer.h" +#include "misc.h" + +struct buffer_ctl buffctl; +struct packet_ctl *packetctl[PACKETS_PER_BUFFER]; +struct packet_ctl empty; + +void buffer_init() +{ + int i; + + for (i = 0; i < PACKETS_PER_BUFFER; i++) { + packetctl[i] = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE); + memset(packetctl[i], 0, PACKET_SIZE); + packetctl[i]->data_size = 0; + packetctl[i]->seq = 0; + } + + buffctl.buffer_id = 0; + buffctl.packet_id_base = 0; + buffctl.pkt_count = 0; + empty.data_size = 0; +} + +void packetctl_precheck() +{ + int i; + + for (i = 0; i < PACKETS_PER_BUFFER; i++) { + if (packetctl[i]->data_size != 0) { + crit("Error precheck packet slot %d,%d", i, packetctl[i]->data_size); + } + } +} + +// Calculate packet checksum +uint32_t packet_csum(uint8_t *data, int len) +{ + uint32_t *item, sum = 0; + int i = 0; + + while (len % 4) { + data[len] = 0; len++; + } + + while (i < len) { + item = (uint32_t *)((char *)data + i); + sum += *item; + i += 4; + } + + return sum; +} + +// Fill buffers from file descriptor +long buffer_fill(int fd) +{ + int s = 0; + int r = PACKET_PAYLOAD_SIZE; + + buffctl.buffer_id++; + buffctl.packet_id_base += buffctl.pkt_count; + buffctl.buffer_size = 0; + while (r > 0 && s < PACKETS_PER_BUFFER) { + if ((r = read(fd, packetctl[s]->data, PACKET_PAYLOAD_SIZE)) < 0) { + crit("Error reading data from stdin"); + } + + // r == 0 means EOF + + if (r > 0) { + buffctl.buffer_size += r; + packetctl[s]->data_size = r; + packetctl[s]->seq = buffctl.packet_id_base + s; + packetctl[s]->crc = packet_csum(packetctl[s]->data, r); + s++; + } + } + + log(6, "input %d bytes of data in %d packets", buffctl.buffer_size, s); + buffctl.pkt_count = s; + return s; +} + +long buffer_flush(int fd) +{ + int s = 0; + while (buffctl.pkt_count--) { + write(fd, packetctl[s]->data, packetctl[s]->data_size); + buffctl.buffer_size -= packetctl[s]->data_size; + packetctl[s]->data_size = 0; + packetctl[s]->seq = 0; + packetctl[s]->crc = 0; + s++; + } + + return s; +} + +/* Caller should use new_pkt as packet buffer again if this returns NULL */ +struct packet_ctl *packet_put(struct packet_ctl *new_pkt) +{ + struct packet_ctl *freed_pkt; + uint32_t i; + + if (new_pkt->seq < buffctl.packet_id_base || + new_pkt->seq - buffctl.packet_id_base >= buffctl.pkt_count) { + return NULL; + } + + if (new_pkt->data_size == 0) { + return NULL; + } + + i = packet_csum(new_pkt->data, new_pkt->data_size); + if (new_pkt->crc != i) { + return NULL; + } + + i = new_pkt->seq - buffctl.packet_id_base; + freed_pkt = packetctl[i]; + packetctl[i] = new_pkt; + return freed_pkt; +} + +struct packet_ctl *packet_get(uint32_t seq) +{ + empty.seq = seq; + + if (seq < buffctl.packet_id_base || + seq - buffctl.packet_id_base >= buffctl.pkt_count) { + return ∅ + } + + if (packetctl[seq - buffctl.packet_id_base]->data_size == 0) { + return ∅ + } + + return packetctl[seq - buffctl.packet_id_base]; +} diff --git a/code/jasmine/buffer.h b/code/jasmine/buffer.h new file mode 100755 index 00000000..e899fe62 --- /dev/null +++ b/code/jasmine/buffer.h @@ -0,0 +1,64 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_BUFFER_H +#define _MCAST_BUFFER_H + +#include <unistd.h> +#include <netinet/ip.h> +#include <netinet/udp.h> + +#define MTU 1500 + +#define PACKET_SIZE ((MTU) - sizeof(struct iphdr) - sizeof(struct udphdr)) + +/* Exclude padding */ +#define PACKET_PAYLOAD_SIZE (((PACKET_SIZE) - sizeof(struct packet_ctl)) & ~0x3) + +#define PACKETS_PER_BUFFER 1024 + +#define DEF_PORT 18383 /* for both UDP and TCP */ + +/* Buffer header (align to 4 Byte) */ +struct buffer_ctl { + uint32_t buffer_id; + uint32_t buffer_size; + uint32_t packet_id_base; + uint32_t pkt_count; +}; + +/* Packet header (align to 4 Byte) */ +struct packet_ctl { + uint32_t seq; + uint32_t crc; + uint32_t data_size; + uint8_t data[0]; +}; + +#define CLIENT_READY 0x1 +#define CLIENT_REQ 0x2 +#define CLIENT_DONE 0x4 +#define SERVER_SENT 0x8 + +/* Retransmition Request Header (align to 4 Byte) */ +struct request_ctl { + uint32_t req_count; /* Requested packet slot count */ +}; + +extern struct buffer_ctl buffctl; +extern struct packet_ctl *packetctl[PACKETS_PER_BUFFER]; + +void buffer_init(); +long buffer_fill(int fd); +long buffer_flush(int fd); +struct packet_ctl *packet_put(struct packet_ctl *new_pkt); +struct packet_ctl *packet_get(uint32_t seq); +void packetctl_precheck(); + +#endif diff --git a/code/jasmine/build-aux/git-version-gen b/code/jasmine/build-aux/git-version-gen new file mode 100755 index 00000000..c3d53f36 --- /dev/null +++ b/code/jasmine/build-aux/git-version-gen @@ -0,0 +1,170 @@ +#!/bin/sh +# Print a version string. +scriptversion=2010-10-13.20; # UTC + +# Copyright (C) 2007-2010 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# This script is derived from GIT-VERSION-GEN from GIT: http://git.or.cz/. +# It may be run two ways: +# - from a git repository in which the "git describe" command below +# produces useful output (thus requiring at least one signed tag) +# - from a non-git-repo directory containing a .tarball-version file, which +# presumes this script is invoked like "./git-version-gen .tarball-version". + +# In order to use intra-version strings in your project, you will need two +# separate generated version string files: +# +# .tarball-version - present only in a distribution tarball, and not in +# a checked-out repository. Created with contents that were learned at +# the last time autoconf was run, and used by git-version-gen. Must not +# be present in either $(srcdir) or $(builddir) for git-version-gen to +# give accurate answers during normal development with a checked out tree, +# but must be present in a tarball when there is no version control system. +# Therefore, it cannot be used in any dependencies. GNUmakefile has +# hooks to force a reconfigure at distribution time to get the value +# correct, without penalizing normal development with extra reconfigures. +# +# .version - present in a checked-out repository and in a distribution +# tarball. Usable in dependencies, particularly for files that don't +# want to depend on config.h but do want to track version changes. +# Delete this file prior to any autoconf run where you want to rebuild +# files to pick up a version string change; and leave it stale to +# minimize rebuild time after unrelated changes to configure sources. +# +# It is probably wise to add these two files to .gitignore, so that you +# don't accidentally commit either generated file. +# +# Use the following line in your configure.ac, so that $(VERSION) will +# automatically be up-to-date each time configure is run (and note that +# since configure.ac no longer includes a version string, Makefile rules +# should not depend on configure.ac for version updates). +# +# AC_INIT([GNU project], +# m4_esyscmd([build-aux/git-version-gen .tarball-version]), +# [bug-project@example]) +# +# Then use the following lines in your Makefile.am, so that .version +# will be present for dependencies, and so that .tarball-version will +# exist in distribution tarballs. +# +# BUILT_SOURCES = $(top_srcdir)/.version +# $(top_srcdir)/.version: +# echo $(VERSION) > $@-t && mv $@-t $@ +# dist-hook: +# echo $(VERSION) > $(distdir)/.tarball-version + +case $# in + 1|2) ;; + *) echo 1>&2 "Usage: $0 \$srcdir/.tarball-version" \ + '[TAG-NORMALIZATION-SED-SCRIPT]' + exit 1;; +esac + +tarball_version_file=$1 +tag_sed_script="${2:-s/x/x/}" +nl=' +' + +# Avoid meddling by environment variable of the same name. +v= + +svn log --non-interactive -q --limit 1 > /dev/null 2>&1 +svnwork=$? + +# First see if there is a tarball-only version file. +# then try "git describe", then default. +if test -f $tarball_version_file +then + v=`cat $tarball_version_file` || exit 1 + case $v in + *$nl*) v= ;; # reject multi-line output + [0-9]*) ;; + *) v= ;; + esac + test -z "$v" \ + && echo "$0: WARNING: $tarball_version_file seems to be damaged" 1>&2 +fi + +if test -n "$v" +then + : # use $v +# Otherwise, if there is at least one git commit involving the working +# directory, and "git describe" output looks sensible, use that to +# derive a version string. +elif test "`git log -1 --pretty=format:x . 2>&1`" = x \ + && v=`git describe --abbrev=4 --match='v*' HEAD 2>/dev/null \ + || git describe --abbrev=4 HEAD 2>/dev/null` \ + && v=`printf '%s\n' "$v" | sed "$tag_sed_script"` \ + && case $v in + v[0-9]*) ;; + *) (exit 1) ;; + esac +then + # Is this a new git that lists number of commits since the last + # tag or the previous older version that did not? + # Newer: v6.10-77-g0f8faeb + # Older: v6.10-g0f8faeb + case $v in + *-*-*) : git describe is okay three part flavor ;; + *-*) + : git describe is older two part flavor + # Recreate the number of commits and rewrite such that the + # result is the same as if we were using the newer version + # of git describe. + vtag=`echo "$v" | sed 's/-.*//'` + numcommits=`git rev-list "$vtag"..HEAD | wc -l` + v=`echo "$v" | sed "s/\(.*\)-\(.*\)/\1-$numcommits-\2/"`; + ;; + esac + + # Change the first '-' to a '.', so version-comparing tools work properly. + # Remove the "g" in git describe's output string, to save a byte. + v=`echo "$v" | sed 's/-/./;s/\(.*\)-g/\1-/'`; +elif [ "$svnwork" -eq "0" ] ; +then + vtag="1.0.0" # we do not have git tag in svn, so here just hard coded it. + v=`svn log -q --limit 1 | grep "|" | awk '{print $1}'` + v=`echo "$v" |sed 's/^r//'` + v="$vtag.$v" +else + v=UNKNOWN +fi + +v=`echo "$v" |sed 's/^v//'` + +# Don't declare a version "dirty" merely because a time stamp has changed. +git update-index --refresh > /dev/null 2>&1 + +dirty=`sh -c 'git diff-index --name-only HEAD' 2>/dev/null` || dirty= +case "$dirty" in + '') ;; + *) # Append the suffix only if there isn't one already. + case $v in + *-dirty) ;; + *) v="$v-dirty" ;; + esac ;; +esac + +# Omit the trailing newline, so that m4_esyscmd can use the result directly. +echo "$v" | tr -d "$nl" + +# Local variables: +# eval: (add-hook 'write-file-hooks 'time-stamp) +# time-stamp-start: "scriptversion=" +# time-stamp-format: "%:y-%02m-%02d.%02H" +# time-stamp-time-zone: "UTC" +# time-stamp-end: "; # UTC" +# End: diff --git a/code/jasmine/client.c b/code/jasmine/client.c new file mode 100755 index 00000000..42d996af --- /dev/null +++ b/code/jasmine/client.c @@ -0,0 +1,313 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> + +#include "buffer.h" +#include "udp-common.h" +#include "tcp-common.h" +#include "misc.h" + +/* Global statistics */ +long tot_dups = 0; +long tot_reps = 0; +long tot_pkts = 0; +long long tot_size = 0; + +void recv_mcinfo(int tcp_socket) +{ + int res; + + res = read_all(tcp_socket, &mcinfo, sizeof(struct mc_info)); + if (res != sizeof(struct mc_info)) { + crit("Error while reading initial data"); + } +} + +void recv_buffctl(int tcp_socket) +{ + int res; + + res = read_all(tcp_socket, &buffctl, sizeof(struct buffer_ctl)); + if (res != sizeof(struct buffer_ctl)) { + crit("Error reading buffer control"); + } +} + +void send_client_ready(int tcp_socket) +{ + uint8_t cmd; + + /* Tell TCP server we are ready to receive */ + cmd = CLIENT_READY; + write(tcp_socket, &cmd, 1); +} + +/* Read out the SERVER_SENT signal */ +void recv_server_sent(int tcp_socket) +{ + uint8_t cmd; + int res; + + res = read_all(tcp_socket, &cmd, 1); + if (res != 1) { + //if (read(tcp_socket, &cmd, 1) < 1) { + crit("Error reading SERVER_SENT"); + } + + if (cmd != SERVER_SENT) { + crit("Error reading SERVER_SENT %d", cmd); + } +} + +void send_client_request(int tcp_socket, struct request_ctl *req) +{ + uint8_t cmd; + + /* Tell TCP server we are ready to receive */ + cmd = CLIENT_REQ; + write(tcp_socket, &cmd, 1); + write(tcp_socket, req, + sizeof(struct request_ctl) + (req->req_count) * sizeof(uint32_t)); +} + +void recv_server_ack(int tcp_socket, struct packet_ctl *pkt) +{ + int res; + + res = read_all(tcp_socket, pkt, sizeof(struct packet_ctl)); + if (res != sizeof(struct packet_ctl)) { + crit("Error on tcp socket"); + } + + res = read_all(tcp_socket, + ((char *)pkt) + sizeof(struct packet_ctl), + pkt->data_size); + if (res != pkt->data_size) { + crit("Error on tcp socket received %d of %d", + res, pkt->data_size); + } +} + +void tcp_retransmition(int tcp_socket, + struct packet_ctl **curr_pkt, + struct packet_ctl **freed_pkt) +{ + struct request_ctl *reqctl; + uint32_t *reqbody; + uint8_t rqbuf[sizeof(struct request_ctl) + PACKETS_PER_BUFFER * sizeof(uint32_t)]; + uint32_t l; + + reqctl = (struct request_ctl *)rqbuf; + reqbody = (uint32_t *)(rqbuf + sizeof(struct request_ctl)); + + reqctl->req_count = 0; + for (l = 0; l < buffctl.pkt_count; l++) { + if (!packetctl[l]->data_size) { + log(6, "Requesting packet %u", l + buffctl.packet_id_base); + reqbody[reqctl->req_count] = l + buffctl.packet_id_base; + reqctl->req_count++; + } + } + + if (reqctl->req_count > 0) { + send_client_request(tcp_socket, reqctl); + + /* read retransmitted blocks via TCP */ + for (l = 0; l < reqctl->req_count; l++) { + if (*freed_pkt) { + *curr_pkt = *freed_pkt; + } + + recv_server_ack(tcp_socket, *curr_pkt); + + *freed_pkt = packet_put(*curr_pkt); + if (!(*freed_pkt)) { + crit("Malformed packet on tcp socket"); + } + if ((*freed_pkt)->data_size != 0) { + crit("Malformed free packet slot or TCP data"); + } + + log(6, "Received retran packet %u", (*curr_pkt)->seq); + } + + tot_reps += reqctl->req_count; + } +} + +/* Returns how many good packets received from UDP */ +int recv_mcast(int tcp_socket, int udp_socket, + struct packet_ctl **curr_pkt, + struct packet_ctl **freed_pkt) +{ + int rcv_pkt_count; + int got_sent; + int maxfd; + fd_set rfds; + struct timeval tv; + int res; + + maxfd = tcp_socket; + if (maxfd < udp_socket) { + maxfd = udp_socket; + } + maxfd++; + FD_ZERO(&rfds); + + rcv_pkt_count = 0; + got_sent = 0; + + if (buffctl.pkt_count != 0) { + do { + FD_SET(tcp_socket, &rfds); + FD_SET(udp_socket, &rfds); + tv.tv_sec = 5; + tv.tv_usec = 0; + + res = select(maxfd, &rfds, 0, 0, &tv); + if (res < 0) { + crit("select error"); + } + + if (res == 0) { + crit("select timed out"); + } + + /* Read multicast packet */ + if (FD_ISSET(udp_socket, &rfds)) { + log(7, "Reading multicast packet"); + if (*freed_pkt) { + *curr_pkt = *freed_pkt; + } + + res = recv(udp_socket, *curr_pkt, PACKET_SIZE, 0); + if (res <= 0) { + crit("error on multicast socket"); + } + + if (res < sizeof(struct packet_ctl) ) { + log(7, "Truncated packet received (%d bytes)", res); + } else if (res != (*curr_pkt)->data_size + sizeof(struct packet_ctl)) { + log(7, + "Truncated packet received (%d of %ld bytes)", + res, (*curr_pkt)->data_size + sizeof(struct packet_ctl)); + } else { + log(9, "Normal packet seq:%d", (*curr_pkt)->seq); + (*freed_pkt) = packet_put((*curr_pkt)); + if (!(*freed_pkt)) { + log(5, "Malformed packet"); + } else { + if ((*freed_pkt)->data_size == 0) { + rcv_pkt_count++; + } else { + log(6, "Duplicated packet"); + tot_dups++; + } + } + } + } else if (FD_ISSET(tcp_socket, &rfds)) { + /* Check TCP, only if there was no more data from UDP */ + log(6, "No more data path1"); + recv_server_sent(tcp_socket); + got_sent = 1; + break; + } + } while (rcv_pkt_count < buffctl.pkt_count); + } + + if (got_sent == 0) { + log(6, "No more data path2"); + recv_server_sent(tcp_socket); + } + + return rcv_pkt_count; +} + +void send_client_done(int tcp_socket) +{ + uint8_t cmd; + + /* Tell TCP server we are ready to receive */ + cmd = CLIENT_DONE; + write(tcp_socket, &cmd, 1); +} + +int main(int argc, char *argv[]) +{ + int ms, ts; + struct packet_ctl *alloc_pkt, *curr_pkt, *freed_pkt; + int udp_rcv_count; + u_short port = DEF_PORT; + struct in_addr local_addr; + + if (argc < 3) { + printf("Usage: %s <local_ip> <server_ip> [port]\n", argv[0]); + return 0; + } + + if (!inet_aton(argv[1], &local_addr)) { + crit("can not resolve address: %s", argv[1]); + } + + if (argc > 3) { + port = atoi(argv[3]); + if (!port) { + port = DEF_PORT; + } + } + + buffer_init(); + /* Init first time packet slot */ + alloc_pkt = (struct packet_ctl *)wrapper_malloc(PACKET_SIZE); + memset(alloc_pkt, 0, PACKET_SIZE); + freed_pkt = curr_pkt = alloc_pkt; + + ts = init_tcp_client_socket(argv[2], port); + recv_mcinfo(ts); + ms = init_mcast_socket(&local_addr, &mcinfo.group); + /* Do we need set_nonblock(ms)??? ; */ + + /* Will do dummy run even if buffctl.pkt_count is zero at the first round */ + do { /* one buffer a round */ + packetctl_precheck(); + recv_buffctl(ts); + send_client_ready(ts); + + udp_rcv_count = recv_mcast(ts, ms, &curr_pkt,&freed_pkt); + if (udp_rcv_count == buffctl.pkt_count) { + log(6, "All packets of current buffer received from UDP"); + } else { + tcp_retransmition(ts, &curr_pkt, &freed_pkt); + } + + tot_pkts += buffctl.pkt_count; + tot_size += buffctl.buffer_size; + log(1, "\rBuffer received %lld Bytes, %ld Packets(%ld Repeats %ld Dups)", + tot_size, tot_pkts, tot_reps, tot_dups); + + if (buffctl.pkt_count) { + buffer_flush(STDOUT_FILENO); + } + + send_client_done(ts); + } while (buffctl.pkt_count != 0); + + shutdown(ts, 2); + close(ts); + close(ms); + log(1, "All buffers receive done.\n"); + return 0; +} diff --git a/code/jasmine/configure.ac b/code/jasmine/configure.ac new file mode 100755 index 00000000..77870c95 --- /dev/null +++ b/code/jasmine/configure.ac @@ -0,0 +1,65 @@ +# -*- Autoconf -*- +# Process this file with autoconf to produce a configure script. + +# bootstrap / init +AC_PREREQ([2.61]) + +AC_INIT([jasmine], + m4_esyscmd([build-aux/git-version-gen .tarball-version]), + [hu.zhijiang@zte.com.cn]) + +AC_USE_SYSTEM_EXTENSIONS + +AM_INIT_AUTOMAKE([-Wno-portability]) + +AC_CONFIG_HEADER([config.h]) + +AC_CONFIG_MACRO_DIR([m4]) + +AC_SUBST(WITH_LIST, [""]) + +dnl Fix default variables - "prefix" variable if not specified +if test "$prefix" = "NONE"; then + prefix="/usr" + + dnl Fix "localstatedir" variable if not specified + if test "$localstatedir" = "\${prefix}/var"; then + localstatedir="/var" + fi + dnl Fix "sysconfdir" variable if not specified + if test "$sysconfdir" = "\${prefix}/etc"; then + sysconfdir="/etc" + fi + dnl Fix "libdir" variable if not specified + if test "$libdir" = "\${exec_prefix}/lib"; then + if test -e /usr/lib64; then + libdir="/usr/lib64" + else + libdir="/usr/lib" + fi + fi +fi + +# Checks for programs. +AC_PATH_PROG([BASHPATH], [bash]) + +AC_CONFIG_FILES([Makefile]) + +PACKAGE_FEATURES="" + +ENV_CFLAGS="$CFLAGS" +OPT_CFLAGS="" +GDB_FLAGS="" +EXTRA_WARNINGS="-Wall" +CFLAGS="$ENV_CFLAGS $OPT_CFLAGS $GDB_FLAGS $EXTRA_WARNINGS" + +# substitute what we need: +AC_SUBST([BASHPATH]) + +AC_OUTPUT + +AC_MSG_RESULT([ Version = ${PACKAGE_VERSION}]) +AC_MSG_RESULT([ Final CFLAGS = ${CFLAGS}]) +AC_MSG_RESULT([ Final CPPFLAGS = ${CPPFLAGS}]) +AC_MSG_RESULT([ Final LDFLAGS = ${LDFLAGS}]) +AC_MSG_RESULT([ Features = ${PACKAGE_FEATURES}]) diff --git a/code/jasmine/jasmine.spec.in b/code/jasmine/jasmine.spec.in new file mode 100755 index 00000000..73172f31 --- /dev/null +++ b/code/jasmine/jasmine.spec.in @@ -0,0 +1,33 @@ +Summary: Just A Small Multicast engINE +Name: jasmine +Version: @version@ +Release: 1%{?dist} +Vendor: ZTE +License: Apache-2.0 +Group: System Environment/Base +URL: https://wiki.opnfv.org/display/DAIS +Source: %{name}-%{version}%{?gittarver}.tar.gz + +BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX) + +%description +jasmine is used to distribute files over UDP multicast for installers + +%prep +%setup -q -n %{name}-%{version}%{?gittarver} + +%build +if [ ! -f configure ]; then + ./autogen.sh +fi + +./configure + +%install +rm -rf %{buildroot} +make install DESTDIR=%{buildroot} + +%files +%{_bindir}/jasmines +%{_bindir}/jasminec + diff --git a/code/jasmine/misc.c b/code/jasmine/misc.c new file mode 100755 index 00000000..eab367e1 --- /dev/null +++ b/code/jasmine/misc.c @@ -0,0 +1,73 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <stdlib.h> +#include <unistd.h> +#include <arpa/inet.h> +#include <sys/types.h> +#include <fcntl.h> + +#include "misc.h" + +struct sockaddr_in make_addr(char *addr, unsigned short port) +{ + struct sockaddr_in sin; + + sin.sin_family = AF_INET; + if (!addr) { + sin.sin_addr.s_addr = htonl(INADDR_ANY); + } else { + if (!inet_aton(addr, &sin.sin_addr)) + crit("cant resolve address: %s", addr); + } + sin.sin_port = htons(port); + return sin; +} + +/* Better than using a macro */ +void set_nonblock(int s) +{ + if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) { + crit("set_nonblock failed, can't set O_NONBLOCK"); + } +} + +void * wrapper_malloc(size_t size) +{ + void * res; + + if (size == 0) { + crit("wrapper_malloc: malloc 0 size not allowed"); + } + + res = malloc(size); + if (res == NULL) { + crit("wrapper_malloc: malloc failed"); + } + + return res; +} + +size_t read_all(int fd, void *buf, size_t count) +{ + size_t t = 0; + size_t r = 0; + + while (count > 0) { + r = read(fd, buf + t, count); + if (r <= 0) { + return t ? t : r; + } + + t += r; + count -= r; + } + + return t; +} diff --git a/code/jasmine/misc.h b/code/jasmine/misc.h new file mode 100755 index 00000000..e60d0b63 --- /dev/null +++ b/code/jasmine/misc.h @@ -0,0 +1,39 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_MISC_H +#define _MCAST_MISC_H + +#include <stdlib.h> +#include <stdio.h> +#include <error.h> +#include <errno.h> +#include <unistd.h> +#include <sys/syscall.h> +#define gettid() syscall(__NR_gettid) + +#define level 6 + +#define crit(x, args...) do { \ + fprintf(stderr, "\nERROR: "); fprintf(stderr, x, ##args); \ + error(-1,errno, "\nERROR: [%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \ +} while (0); + +#define log(l, f, args...) if (l < level) { \ + fprintf(stderr, "\n[%s:%d:%ld] ", __FILE__, __LINE__, gettid()); \ + fprintf(stderr, f, ##args); \ + fprintf(stderr, "\n"); \ +} + +struct sockaddr_in make_addr(char *addr, unsigned short port); +void * wrapper_malloc(size_t size); +void set_nonblock(int s); +size_t read_all(int fd, void *buf, size_t count); + +#endif diff --git a/code/jasmine/server-tcp.c b/code/jasmine/server-tcp.c new file mode 100755 index 00000000..c48de77d --- /dev/null +++ b/code/jasmine/server-tcp.c @@ -0,0 +1,550 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <sys/poll.h> +#include <string.h> +#include <pthread.h> + +#include "buffer.h" +#include "tcp-common.h" +#include "udp-common.h" +#include "tcp-queue.h" +#include "misc.h" +#include "server.h" + +#define MAX_CLIENTS 128 /* Clients per TCP server */ +#define TCP_BUFF_SIZE 65536 + +struct cdata { + struct tcpq *tx; + struct tcpq *rx; + int await; /* non-zero if we are in the middle of receiving clients requests */ + struct sockaddr_in peer; +}; + +struct server_status_data { + int state; /* state machine */ + int sync_count; + int ccount; + struct semlink sl; + pthread_t pchild; + + struct pollfd ds[MAX_CLIENTS + 1]; + struct cdata cd[MAX_CLIENTS + 1]; + int cindex; + int last_buff_pkt_count; +}; + +void init_sdata(struct server_status_data *sdata, void *thread_args) +{ + int i; + + sdata->sl.parent = (struct semaphores *)thread_args; + sdata->sl.this = NULL; + + sdata->state = S_PREP; + sdata->last_buff_pkt_count = 1; + sdata->sync_count = 0; + + for (i = 0; i < MAX_CLIENTS + 1; i++) { + sdata->ds[i].fd = -1; + sdata->ds[i].events = POLLIN; + sdata->cd[i].tx = tcpq_queue_init(); + sdata->cd[i].rx = tcpq_queue_init(); + sdata->cd[i].await = 0; + } +} + +/* disconnect one client and compress table */ +void kill_client(struct server_status_data *sdata) +{ + struct pollfd *ds = sdata->ds; + struct cdata *cd = sdata->cd; + + close((ds + sdata->cindex)->fd); + tcpq_queue_free((cd + sdata->cindex)->rx); + tcpq_queue_free((cd + sdata->cindex)->tx); + + /* Move last slot to this free slot and release the last slot */ + *(ds + sdata->cindex) = *(ds + sdata->ccount - 1); + *(cd + sdata->cindex) = *(cd + sdata->ccount - 1); + sdata->cindex--; + sdata->ccount--; + client_count--; +} + +void send_buffctl_to_all_clients(struct server_status_data *sdata) +{ + int i; + long send_sz; + void *cpy; + + /* copy header to all queues */ + send_sz = sizeof(struct buffer_ctl); + for (i = 1; i < sdata->ccount; i++) { + cpy = wrapper_malloc(send_sz); + memcpy(cpy, &buffctl, send_sz); + tcpq_queue_tail(sdata->cd[i].tx, cpy, send_sz); + sdata->ds[i].events |= POLLOUT; + } +} + +void send_sent_to_all_clients(struct server_status_data *sdata) +{ + int i; + void *cpy; + uint8_t byte = SERVER_SENT; + + /* Push data to clients */ + for (i = 1; i < sdata->ccount; i++) { + cpy = wrapper_malloc(1); + memcpy(cpy, &byte, 1); + tcpq_queue_tail(sdata->cd[i].tx, cpy, 1); + sdata->ds[i].events |= POLLOUT; + } +} +void accept_clients(struct server_status_data *sdata) +{ + int res; + int new_fd; + socklen_t socklen; + int poll_events; + int timeout = -1; + struct pollfd *ds = sdata->ds; + struct cdata *cd = sdata->cd; + + ds->events = POLLIN; + ds->revents = 0; + sdata->ccount = 1; + + while (sdata->ccount <= MAX_CLIENTS && client_count < wait_count) { + poll_events = poll(ds, 1, timeout); + if (poll_events < 0) { + crit("poll() failed"); + } else if (poll_events == 0) { + log(2, "poll() returned with no results!"); + continue; + } + + log(9, "poll: %d events", poll_events); + + if (ds->revents) { + + /* new connections come in */ + if (ds->revents & (POLLIN|POLLPRI)) { + + do { + socklen = sizeof((cd + sdata->ccount)->peer); +retry_accept: + new_fd = accept(ds->fd, + (struct sockaddr *)&((cd + sdata->ccount)->peer), + &socklen); + if (new_fd == -1 && errno == EINTR) { + goto retry_accept; + } + + if (new_fd == -1) { + log(2, "accept() returned with error %d", errno); + break; + } + + /* Send group info, before set non block */ + res = write(new_fd, &mcinfo, sizeof(struct mc_info)); + if (res < 0) { + log(3, "Error opening connection: %s", + inet_ntoa((cd + sdata->ccount)->peer.sin_addr)); + close(new_fd); + continue; + } + + /* Can set to non block becasue we will poll it in future. */ + res = fcntl(new_fd, F_SETFL, O_NONBLOCK); + if (res == -1) { + log(2, "fcntl() returned with error %d", errno); + close(new_fd); + continue; + } + + (ds + sdata->ccount)->fd = new_fd; + + log(5, "New connection %d: %s", + sdata->ccount, inet_ntoa((cd + sdata->ccount)->peer.sin_addr)); + sdata->ccount++; + client_count++; + } while (sdata->ccount <= MAX_CLIENTS); + + } else { + log(2, "Error on tcp socket"); + } + + poll_events--; + ds->revents = 0; + } + } + + ds->events = 0; +} + +void accept_clients_may_spawn(struct server_status_data *sdata) +{ + /* Wait the parent to let us accpet clients */ + sl_wait_parent(&(sdata->sl)); + + sdata->ds[0].fd = sfd; + accept_clients(sdata); + if (client_count < wait_count) { + sdata->sl.this = spawn_thread(&sdata->pchild, tcp_server_main, 1); + + /* Let child TCP servers accept connections */ + sl_release_child(&(sdata->sl)); + /* Wait until all childern TCP servers got all their clients */ + sl_wait_child(&(sdata->sl)); + } + + /* Tell parent all client are accepted/connected */ + sl_release_parent(&(sdata->sl)); +} + +void keep_on_receiving_client_request(struct server_status_data *sdata) +{ + char buf[TCP_BUFF_SIZE]; + void *cpy; + struct request_ctl *req; + uint32_t *rqb; + struct packet_ctl *ans; + long rq_index; + long total_sz; + + long read_out; + struct pollfd *ds; + struct cdata *cd; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + read_out = read(ds->fd, buf, cd->await); + if (read_out <= 0) { + log(5, "Client disconnected (r): %s", + inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; + } + + log(7, "New data (%ld + %ld) on conn %d: %s", + read_out, tcpq_queue_dsize(cd->rx), sdata->cindex, + inet_ntoa(cd->peer.sin_addr)); + cpy = wrapper_malloc(read_out); + memcpy(cpy, buf, read_out); + tcpq_queue_tail(cd->rx, cpy, read_out); + cd->await -= read_out; + + /* Full header received */ + if (tcpq_queue_dsize(cd->rx) == sizeof(struct request_ctl)) { + cpy = tcpq_queue_flat_peek(cd->rx, &read_out); + req = (struct request_ctl *)cpy; + cd->await = req->req_count * sizeof(uint32_t); + log(6, "Client request for %u packets on %d: %s", + req->req_count, sdata->cindex, inet_ntoa(cd->peer.sin_addr)); + /* req->rqc may be zero? So do not return from here */ + } + + /* Whole request(struct request_ctl + all rq blocks) received */ + if (cd->await == 0) { + cpy = tcpq_dqueue_flat(cd->rx, &read_out); + req = (struct request_ctl *)cpy; + rqb = (uint32_t *) (cpy + sizeof(struct request_ctl)); + for (rq_index = 0; rq_index < req->req_count; rq_index++) { + ans = packet_get(*(rqb + rq_index)); + total_sz = ans->data_size + sizeof(struct packet_ctl); + log(6, "Send packet %u (%u bytes) on %d", + rqb[rq_index], ans->data_size, sdata->cindex); + cpy = wrapper_malloc(total_sz); + memcpy(cpy, ans, total_sz); + tcpq_queue_tail(cd->tx, cpy, total_sz); + } + + if (rq_index > 0) { + /* Data need to be sent out */ + ds->events |= POLLOUT; + } + } +} + +void handle_error_event(struct server_status_data *sdata) +{ + struct cdata *cd; + + cd = &(sdata->cd[sdata->cindex]); + log(5, "Closing connection %d: %s", + sdata->cindex, inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); +} + +void handle_client_ready(struct server_status_data *sdata) +{ + if (sdata->state == S_SYNC) { + sdata->sync_count++; + + log(7, "Client SYNC %d of %d", + sdata->sync_count, sdata->ccount - 1); + + if (sdata->sync_count == sdata->ccount - 1) { + /* All client SYNC done */ + sdata->state = S_SEND; + + /* Wait child ready */ + sl_wait_child(&(sdata->sl)); + /* Tell parent I am ready, release parent */ + sl_release_parent(&(sdata->sl)); + + /* Wait parent to send mcast */ + sl_wait_parent(&(sdata->sl)); + /* tell child mcast done, release child */ + sl_release_child(&(sdata->sl)); + + send_sent_to_all_clients(sdata); + } + } +} + +void handle_client_done(struct server_status_data *sdata) +{ + if (sdata->state == S_SEND) { + log(7, "Client START %d of %d", + sdata->sync_count, sdata->ccount - 1); + + sdata->sync_count--; + if (sdata->sync_count == 0) { + /* Got all client START */ + sdata->state = S_PREP; + log(7, "All received, now tell UDP Server prepare next buffer"); + /* Remeber old buffctl.pkt_count before it change */ + sdata->last_buff_pkt_count = buffctl.pkt_count; + + /* Wait child clients done */ + sl_wait_child(&(sdata->sl)); + /* Tell parent all our clients are done, release parent */ + sl_release_parent(&(sdata->sl)); + } + } +} + +void handle_client_events(struct server_status_data *sdata) +{ + struct pollfd *ds; + struct cdata *cd; + int read_out; + uint8_t msgtype; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + /* read message type */ + read_out = read(ds->fd, &msgtype, 1); + if (read_out <= 0) { + log(5, "Client disconnected due to read error %d, ret:%d (r): %s", + errno, read_out, inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; /* continue to check other clients */ + } + + switch (msgtype) { + case CLIENT_READY: + handle_client_ready(sdata); + break; + case CLIENT_DONE: + handle_client_done(sdata); + break; + case CLIENT_REQ: + /* wait a whole struct request_ctl */ + cd->await = sizeof(struct request_ctl); + break; + default: + log(4, "Wrong message type from %s", + inet_ntoa(cd->peer.sin_addr)); + break; + } +} + +void handle_pullin_event(struct server_status_data *sdata) +{ + struct cdata *cd; + + cd = &(sdata->cd[sdata->cindex]); + + /* Await is set only for repeat request */ + if (cd->await) { + keep_on_receiving_client_request(sdata); + } else { + handle_client_events(sdata); + } +} + +void handle_pullout_event(struct server_status_data *sdata) +{ + char buf[TCP_BUFF_SIZE]; + struct pollfd *ds; + struct cdata *cd; + long transmit_sz; + void *cpy; + long written_in; + + ds = &(sdata->ds[sdata->cindex]); + cd = &(sdata->cd[sdata->cindex]); + + if (cd->tx->count == 0) { + ds->events = POLLIN; /* Just make sure */ + return; + } + + log(7, "handle_pullout_event servs No.%d conn", sdata->cindex); + + /* If there is some data to be sent, try to do it now */ + cpy = tcpq_dequeue_head(cd->tx, &transmit_sz); + memcpy(buf, cpy, transmit_sz); + free(cpy); + + written_in = write(ds->fd, buf, transmit_sz); + if (written_in <= 0) { + log(5, "Client disconnected (w): %s", + inet_ntoa(cd->peer.sin_addr)); + kill_client(sdata); + return; + } + + if (written_in != transmit_sz) { + log(6, "Partial wrote: %ld out of %ld bytes sent", + written_in, transmit_sz); + cpy = wrapper_malloc(transmit_sz - written_in); + memcpy(cpy, buf + written_in, transmit_sz - written_in); + tcpq_queue_head(cd->tx, cpy, transmit_sz - written_in); + } else { + log(7, "Sent %ld bytes to %s", + written_in, inet_ntoa(cd->peer.sin_addr)); + if (cd->tx->count == 0) { + /* Do not listen pollout event anymore */ + ds->events = POLLIN; + } + } +} + +void check_clients(struct server_status_data *sdata) +{ + int poll_events; + struct pollfd *ds; + int timeout = -1; + + if (sdata->ccount == 1) { + log(5, "No more clients, start exiting. s,c,p:%d,%d,%d", + sdata->state, sdata->ccount, buffctl.pkt_count); + /* No client existed */ + switch(sdata->state) { + case S_SYNC: + /* Wait all children ready */ + sl_wait_child(&(sdata->sl)); + /* Tell parent I am ready, release parent */ + sl_release_parent(&(sdata->sl)); + + /* Wait parent to send mcast */ + sl_wait_parent(&(sdata->sl)); + /* Tell child mcast done, release child */ + sl_release_child(&(sdata->sl)); + case S_SEND: + sdata->state = S_PREP; + /* Remeber old buffctl.pkt_count before it change */ + sdata->last_buff_pkt_count = buffctl.pkt_count; + + /* Wait child clients done */ + sl_wait_child(&(sdata->sl)); + /* Tell parent all our clients are done */ + sl_release_parent(&(sdata->sl)); + } + /* return to main loop to finish the dummy run */ + return; + } + + /* poll clients only */ + poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout); + if (poll_events < 0) { + crit("poll() failed"); + } else if (poll_events == 0) { + log(2, "poll() returned with no results!"); + return; + } + + log(9, "poll clients: %d events", poll_events); + + /* check all connected clients */ + for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) { + ds = &(sdata->ds[sdata->cindex]); + if (!ds->revents) { + continue; + } + + if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) { + handle_error_event(sdata); + } else if (ds->revents & (POLLIN|POLLPRI)) { + handle_pullin_event(sdata); + } else if (ds->revents & POLLOUT) { + handle_pullout_event(sdata); + } + } +} + +void * tcp_server_main(void *args) +{ + int i; + struct server_status_data ssdata; + struct server_status_data *sdata; + + sdata = &ssdata; + init_sdata(sdata, args); + + accept_clients_may_spawn(sdata); + + while (1) { + if (sdata->state == S_PREP) { + /* Wait UDP server to prepare the buffctl of this round to send */ + sl_wait_parent(&(sdata->sl)); + log(6, "After waiting UDP server preparation. s,c,p:%d,%d,%d", + sdata->state, sdata->ccount - 1, buffctl.pkt_count); + /* Tell children buffer pareparation done, release child */ + sl_release_child(&(sdata->sl)); + send_buffctl_to_all_clients(sdata); + sdata->state = S_SYNC; + } + + check_clients(sdata); + + if (sdata->state == S_PREP && sdata->last_buff_pkt_count == 0) { + log(5, "No more output, TCP server finished"); + if (sdata->sl.this && pthread_join(sdata->pchild, 0) < 0) { + crit("pthread_join()"); + } + + /* shutdown conn to clients */ + for (i = sdata->ccount - 1; i > 0; i--) { + shutdown((sdata->ds[i]).fd, 2); + } + + if (!sdata->sl.this) { + /* leaf server */ + close(sfd); + sfd = -1; + } + return 0; + } + } +} diff --git a/code/jasmine/server-udp.c b/code/jasmine/server-udp.c new file mode 100755 index 00000000..c641289b --- /dev/null +++ b/code/jasmine/server-udp.c @@ -0,0 +1,95 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <pthread.h> +#include <signal.h> + +#include "buffer.h" +#include "udp-common.h" +#include "misc.h" +#include "server.h" + +void * udp_server_main(void *args) +{ + pthread_t pchild; + struct sockaddr_in maddr; + int ms; + int i; + struct semlink sl; + char adr[128]; + long long total = 0; + + sprintf(adr, "%s%s", MCAST_ADDR_BASE, MCAST_ADDR_SUFFIX); + maddr = make_addr(adr, server_port); + mcinfo.group = maddr; + ms = init_mcast_socket(&local_addr, &maddr); + + sl.this = spawn_thread(&pchild, tcp_server_main, 1); + sl.parent = NULL; + + buffer_init(); + + /* Let TCP servers accept connections */ + sl_release_child(&sl); + /* Wait until all TCP servers got all their clients */ + sl_wait_child(&sl); + + log(7, "UDP Server: All clients were accepted"); + + do { + /* one buffer round */ + + i = buffer_fill(STDIN_FILENO); + + if (!client_count) { + /* No need to send, make it the last run of main loop, also let + TCP Server to do dummy run right away(not waiting until all + data sent). */ + i = 0; + buffctl.pkt_count = 0; + } + + log(7, "Signal TCP Server to send buffctl"); + + /* Tell tcp to Send headers, release child */ + sl_release_child(&sl); + /* Wait all clients ready to start receiving */ + sl_wait_child(&sl); + + /* multicast data */ + while (i--) { + sendto(ms, packetctl[i], + packetctl[i]->data_size + sizeof(struct packet_ctl), 0, + (struct sockaddr *)&maddr, sizeof(maddr)); + } + + log(7, "send finish"); + /* Tell tcp to Send SERVER_SENT, release child */ + sl_release_child(&sl); + /* wait tcp to send SERVER_SENT */ + sl_wait_child(&sl); + + total = total + buffctl.buffer_size; + log(1, "Buffer Done: sent %lld bytes", total); + } while (buffctl.pkt_count); + + + if (pthread_join(pchild, 0) < 0) { + crit("pthread_join() TCP Server"); + } + + log(1, "All Done"); + close(ms); + return 0; +} diff --git a/code/jasmine/server.c b/code/jasmine/server.c new file mode 100755 index 00000000..4a9a3279 --- /dev/null +++ b/code/jasmine/server.c @@ -0,0 +1,123 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <pthread.h> +#include <string.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include "buffer.h" +#include "tcp-common.h" +#include "misc.h" +#include "server.h" + +/* Global variables shared among server*.c */ +int wait_count = 0; +int client_count = 0; +int server_port = DEF_PORT; /* Currently for both UDP and TCP */ +struct in_addr local_addr; +/* listen socket - global for all TCP threads */ +int sfd = -1; + +void init_semaphores(struct semaphores *sp) +{ + if (sem_init(&sp->wait_parent, 0, 0) < 0) { + crit("sem_init() wait_parent"); + } + + if (sem_init(&sp->wait_child, 0, 0) < 0) { + crit("sem_init() wait_child"); + } +} + +void sl_wait_child(struct semlink *sl) +{ + if (sl->this) { + P(sl->this->wait_child); + } +} + +void sl_release_child(struct semlink *sl) +{ + if (sl->this) { + V(sl->this->wait_parent); + } +} + +void sl_wait_parent(struct semlink *sl) +{ + P(sl->parent->wait_parent); +} + +void sl_release_parent(struct semlink *sl) +{ + V(sl->parent->wait_child); +} + +/* Wrapper for pthread_create, additionaly may initialize/pass thru + a semaphores parameter */ +struct semaphores * spawn_thread(pthread_t *pchild, + void *(*entry)(void *), + int create_sems) +{ + struct semaphores *sp = NULL; + + if (create_sems != 0) { + sp = (struct semaphores *) wrapper_malloc(sizeof(struct semaphores)); + init_semaphores(sp); + } + + if (pthread_create(pchild, 0, entry, (void *)sp) != 0) { + crit("pthread_create"); + } + return sp; +} + +int main(int argc, char *argv[]) +{ + pthread_t pchild; + + log(9, "buffer size:%ld, buffer head size:%ld, data size:%ld", + PACKET_SIZE, sizeof(struct packet_ctl), PACKET_PAYLOAD_SIZE); + + if (argc < 3) { + printf("Usage: %s local_ip num_of_clients [port]\n", argv[0]); + return -1; + } + + if (!inet_aton(argv[1], &local_addr)) { + crit("can not resolve address: %s", argv[1]); + } + + wait_count = atoi(argv[2]); + if (!wait_count) { + crit("can not serv to 0 client\n"); + } + + if (argc > 3) { + server_port = atoi(argv[3]); + if (!server_port) { + server_port = DEF_PORT; + } + } + + /* sfd is shared by all TCP threads as sdata->ds[0].fd */ + sfd = init_tcp_server_socket(0, server_port); + set_nonblock(sfd); + + /* start UDP servers thread */ + (void)spawn_thread(&pchild, udp_server_main, 0); + if (pthread_join(pchild, 0) < 0) { + crit("pthread_join() UDP Server"); + } + + return 0; +} diff --git a/code/jasmine/server.h b/code/jasmine/server.h new file mode 100755 index 00000000..23dbed54 --- /dev/null +++ b/code/jasmine/server.h @@ -0,0 +1,63 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_SERVER_H +#define _MCAST_SERVER_H + +#include <pthread.h> +#include <semaphore.h> + +struct semaphores { + sem_t wait_parent; + sem_t wait_child; +}; + +struct semlink { + struct semaphores *this; /* used by parent to point to the struct semaphores + which it created during spawn child. */ + struct semaphores *parent; /* used by child to point to the struct + semaphores which it created by parent */ +}; + +void sl_wait_child(struct semlink *sl); +void sl_release_child(struct semlink *sl); +void sl_wait_parent(struct semlink *sl); +void sl_release_parent(struct semlink *sl); + +/* Server state machine */ +#define S_PREP 0 +#define S_SYNC 1 +#define S_SEND 2 + +extern int wait_count; +extern int client_count; +extern int server_port; +extern struct in_addr local_addr; +extern int sfd; + +void *udp_server_main(void *args); +void *tcp_server_main(void *args); +void init_semaphores(struct semaphores *sp); +struct semaphores * spawn_thread(pthread_t *pchild, + void *(*entry)(void *), + int create_sems); + +#define V(x) do { \ + if (sem_post(&x) < 0) { \ + crit("sem_post()"); \ + } \ +} while(0) + +#define P(x) do { \ + if (sem_wait(&x) != 0) { \ + crit("sem_wait()"); \ + } \ +} while(0) + +#endif diff --git a/code/jasmine/tcp-common.c b/code/jasmine/tcp-common.c new file mode 100755 index 00000000..69a57ba0 --- /dev/null +++ b/code/jasmine/tcp-common.c @@ -0,0 +1,60 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> + +#include "buffer.h" +#include "tcp-common.h" +#include "misc.h" + +int init_tcp_server_socket(char *addr, unsigned short port) +{ + int s; + struct sockaddr_in sin; + + port = port ? port : TCP_DPORT; + sin = make_addr(addr, port); + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + crit("socket() failed"); + } + + if (bind(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) { + crit("bind() failed, error binding tcp socket"); + } + + if (listen(s, 5) < 0) { + crit("listen() failed, error listening on socket"); + } + + log(4, "init_tcp_server_socket %s:%d", inet_ntoa(sin.sin_addr), port); + return s; +} + +int init_tcp_client_socket(char *addr, unsigned short port) +{ + int s; + struct sockaddr_in sin; + + port = port ? port : TCP_DPORT; + sin = make_addr(addr, port); + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + crit("socket() failed"); + } + + if (connect(s, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) { + crit("connect() failed"); + } + + log(4, "init_tcp_client_socket %s:%d", inet_ntoa(sin.sin_addr), port); + return s; +} diff --git a/code/jasmine/tcp-common.h b/code/jasmine/tcp-common.h new file mode 100755 index 00000000..6ac48e1c --- /dev/null +++ b/code/jasmine/tcp-common.h @@ -0,0 +1,18 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_TCP_COMMON_H +#define _MCAST_TCP_COMMON_H + +#define TCP_DPORT DEF_PORT + +int init_tcp_server_socket(char *addr, unsigned short port); +int init_tcp_client_socket(char *addr, unsigned short port); + +#endif diff --git a/code/jasmine/tcp-queue.c b/code/jasmine/tcp-queue.c new file mode 100755 index 00000000..63a57f2e --- /dev/null +++ b/code/jasmine/tcp-queue.c @@ -0,0 +1,178 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +/* TCP poll message single linked queue */ +#include <stdlib.h> +#include <string.h> + +#include "misc.h" +#include "tcp-queue.h" + +void tcpq_queue_tail(struct tcpq *q, void *data, long size) +{ + struct qmsg *tmp; + + if (!q) { + return; + } + + tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg)); + tmp->next = NULL; + + if (!q->head) { + q->head = q->tail = tmp; + } else { + q->tail->next = tmp; + q->tail = tmp; + } + + q->tail->data = data; + q->tail->size = size; + q->size += size; + q->count++; +} + +void tcpq_queue_head(struct tcpq *q, void *data, long size) +{ + struct qmsg *tmp; + + if (!q) { + return; + } + + tmp = (struct qmsg *)wrapper_malloc(sizeof(struct qmsg)); + tmp->next = NULL; + + if (!q->head) { + q->head = q->tail = tmp; + } else { + tmp->next = q->head; + q->head = tmp; + } + + q->head->data = data; + q->head->size = size; + q->size += size; + q->count++; +} + +void * tcpq_dequeue_head(struct tcpq *q, long *size) +{ + void *res = NULL; + struct qmsg *tmp; + + if (q && q->head) { + res = q->head->data; + *size = q->head->size; + + tmp = q->head; + q->head = q->head->next; + if (!q->head) { + q->tail = NULL; + } + + free(tmp); + q->count--; + q->size -= *size; + } + return res; +} + +void * tcpq_queue_peek(struct tcpq *q, long *size) +{ + if (q && q->head) { + *size = q->head->size; + return q->head->data; + } + + return NULL; +} + +long tcpq_queue_dsize(struct tcpq *q) +{ + if (q) { + return q->size; + } + + return 0; +} + +void tcpq_queue_free(struct tcpq *q) +{ + struct qmsg *tmp; + + if (!q) { + return; + } + + while (q->head) { + tmp = q->head; + q->head = tmp->next; + free(tmp->data); + free(tmp); + } + + q->count = 0; + q->size = 0; + q->head = q->tail = NULL; +} + +struct tcpq * tcpq_queue_init(void) +{ + struct tcpq *q = wrapper_malloc(sizeof(struct tcpq)); + + q->count = 0; + q->size = 0; + q->head = q->tail = NULL; + return q; +} + +void * tcpq_dqueue_flat(struct tcpq *q, long *size) +{ + void *res; + struct qmsg *tmp; + long offs = 0; + + if (!q || q->count == 0) { + return NULL; + } + + res = wrapper_malloc(q->size); + *size = q->size; + + while (q->head) { + memcpy(res + offs, q->head->data, q->head->size); + offs += q->head->size; + + tmp = q->head; + q->head = tmp->next; + free(tmp->data); + free(tmp); + } + tcpq_queue_free(q); + return res; +} + +void * tcpq_queue_flat_peek(struct tcpq *q, long *size) +{ + void *cpy; + + if (!q) { + return NULL; + } + + if (q->count > 1) { + cpy = tcpq_dqueue_flat(q, size); + tcpq_queue_tail(q, cpy, *size); /* use tcpq_queue_head is also OK */ + } else { + cpy = tcpq_queue_peek(q, size); + } + + return cpy; +} diff --git a/code/jasmine/tcp-queue.h b/code/jasmine/tcp-queue.h new file mode 100755 index 00000000..4f096f85 --- /dev/null +++ b/code/jasmine/tcp-queue.h @@ -0,0 +1,35 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_TCP_QUEUE_H +#define _MCAST_TCP_QUEUE_H + +struct tcpq { + struct qmsg *head, *tail; + long count; /* message count in a queue */ + long size; /* Total data size of a queue */ +}; + +struct qmsg { + struct qmsg *next; + void *data; + long size; +}; + +struct tcpq * tcpq_queue_init(void); +void tcpq_queue_free(struct tcpq *q); +long tcpq_queue_dsize(struct tcpq *q); +void tcpq_queue_tail(struct tcpq *q, void *data, long size); +void tcpq_queue_head(struct tcpq *q, void *data, long size); +void * tcpq_dequeue_head(struct tcpq *q, long *size); +void * tcpq_queue_peek(struct tcpq *q, long *size); +void * tcpq_dqueue_flat(struct tcpq *q, long *size); +void * tcpq_queue_flat_peek(struct tcpq *q, long *size); + +#endif diff --git a/code/jasmine/udp-common.c b/code/jasmine/udp-common.c new file mode 100755 index 00000000..c7314226 --- /dev/null +++ b/code/jasmine/udp-common.c @@ -0,0 +1,97 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#include <fcntl.h> +#include <net/if.h> +#include <sys/ioctl.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> + +#include "buffer.h" +#include "udp-common.h" +#include "misc.h" + +struct mc_info mcinfo; + +int init_mcast_socket(struct in_addr *local_addr, + struct sockaddr_in *maddr) +{ + struct ip_mreqn mreqn; /* multicast request new */ + int ms; /* multicast socket */ + u_short msock_port; + struct sockaddr_in bind_addr; + + int msock_reuse = MCAST_REUSE; + u_char msock_loop = MCAST_LOOP; + u_char msock_ttl = MCAST_TTL; + + if ((ms = socket(PF_INET, SOCK_DGRAM, 0)) < 0) { + crit("socket() failed, error allocating multicast socket"); + } + + // In order to let client and server on same host to use same port + if (setsockopt(ms, SOL_SOCKET, SO_REUSEADDR, + &msock_reuse, sizeof(msock_reuse)) < 0) { + crit("setsockopt() failed, can't set reuse flag"); + } + + if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_TTL, + &msock_ttl,sizeof(msock_ttl)) < 0) { + crit("setsockopt() failed, can't set ttl value"); + } + + if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_LOOP, + &msock_loop, sizeof(msock_loop)) < 0) { + crit("setsockopt() failed, can't set multicast packet looping"); + } + + /* TODO: Do we neet non-block? */ + + log(4, "Using multicast address: %s:%d", + inet_ntoa(maddr->sin_addr), ntohs(maddr->sin_port)); + + mreqn.imr_multiaddr = maddr->sin_addr; + mreqn.imr_address = *local_addr; + mreqn.imr_ifindex = 0; + + /* Interface and remote mcast address choosing */ + + /* This tell interface which has mreqn.imr_address to join + mreqn.imr_multiaddr group, it has nothing to do with socket, port, and + mreqn.imr_address. */ + if (setsockopt(ms, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreqn, sizeof(mreqn)) < 0) { + crit("setsockopt() failed, %s can't join multicast group", + inet_ntoa(*local_addr)); + } + + /* local address choosing */ + + /* This tell kernel to use interface which has mreqn.imr_address as device + and mreqn.imr_address as source addr when sending any multicast through + socket. */ + if (setsockopt(ms, IPPROTO_IP, IP_MULTICAST_IF, + &mreqn, sizeof(mreqn)) < 0) { + crit("setsockopt() failed to IP_MULTICAST_IF.\n"); + } + + /* Local port choosing, remote port is choosed when calling sendto()! */ + + /* This let us uses specific udp port number as source port when sending + any multicast datagrams. So ip in maddr is useless here and should be + INADDR_ANY, otherwise, bind will return Invalid argument. */ + msock_port = ntohs(maddr->sin_port); + bind_addr = make_addr(0, msock_port); + if (bind(ms, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) { + crit("bind() error, can't bind mcast port"); + } + + return ms; +} diff --git a/code/jasmine/udp-common.h b/code/jasmine/udp-common.h new file mode 100755 index 00000000..861545b4 --- /dev/null +++ b/code/jasmine/udp-common.h @@ -0,0 +1,42 @@ +/*############################################################################## +# Copyright (c) 2017 ZTE Coreporation and others. +# hu.zhijiang@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +##############################################################################*/ + +#ifndef _MCAST_UDP_COMMON_H +#define _MCAST_UDP_COMMON_H + +#include <unistd.h> +#include <netinet/ip.h> +#include <netinet/udp.h> + +#define MCAST_DPORT DEF_PORT + +#define MCAST_TTL 8 +/* + * Force reuse + */ +#define MCAST_REUSE 1 + +/* + * Loop to let server host itself can receive data too. + */ +#define MCAST_LOOP 1 +#define MCAST_ADDR_BASE "224.238.0." +#define MCAST_ADDR_SUFFIX "31" + +// Initial data +struct mc_info { + struct sockaddr_in group; +}; + +extern struct mc_info mcinfo; + +// How to capture udp packets: tcpdump -i ethx udp -vv -n + +int init_mcast_socket(struct in_addr *local_addr, struct sockaddr_in *maddr); +#endif diff --git a/deploy/check_openstack_progress.sh b/deploy/check_openstack_progress.sh index 3512d6bc..f0ab1e90 100755 --- a/deploy/check_openstack_progress.sh +++ b/deploy/check_openstack_progress.sh @@ -44,8 +44,9 @@ while true; do fi count=$[count + 1] - openstack_install_active=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $12}' | grep -c "active" ` - openstack_install_failed=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $12}' | grep -c "install-failed" ` + # get 'Role_status' column + openstack_install_active=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $13}' | grep -c "active" ` + openstack_install_failed=`daisy host-list --cluster-id $cluster_id | awk -F "|" '{print $13}' | grep -c "install-failed" ` if [ $openstack_install_active -eq $hosts_num ]; then echo "openstack installing successful ..." break @@ -54,7 +55,8 @@ while true; do tail -n 200 /var/log/daisy/kolla_$cluster_id* exit 1 else - progress=`daisy host-list --cluster-id $cluster_id |grep DISCOVERY_SUCCESSFUL |awk -F "|" '{print $11}'|sed s/[[:space:]]//g|sed ':a;N;$ s/\n/ /g;ba'` + # get 'Role_progress' column + progress=`daisy host-list --cluster-id $cluster_id |grep DISCOVERY_SUCCESSFUL |awk -F "|" '{print $12}'|sed s/[[:space:]]//g|sed ':a;N;$ s/\n/ /g;ba'` echo " openstack in installing , progress of each node is $progress%" sleep 30 fi diff --git a/deploy/config/bm_environment/zte-baremetal1/deploy.yml b/deploy/config/bm_environment/zte-baremetal1/deploy.yml index eafff5fd..58516e88 100644 --- a/deploy/config/bm_environment/zte-baremetal1/deploy.yml +++ b/deploy/config/bm_environment/zte-baremetal1/deploy.yml @@ -21,3 +21,4 @@ disks: daisy_passwd: 'r00tme'
daisy_ip: '10.20.7.3'
daisy_gateway: '10.20.7.1'
+ceph_disk_name: '/dev/sdb'
diff --git a/deploy/config/vm_environment/zte-virtual1/deploy.yml b/deploy/config/vm_environment/zte-virtual1/deploy.yml index 1bf254f5..12fa1690 100644 --- a/deploy/config/vm_environment/zte-virtual1/deploy.yml +++ b/deploy/config/vm_environment/zte-virtual1/deploy.yml @@ -10,4 +10,4 @@ disks: daisy_passwd: 'r00tme'
daisy_ip: '10.20.11.2'
daisy_gateway: '10.20.11.1'
-deploy_env: 'virtual'
\ No newline at end of file +ceph_disk_name: '/dev/sdb'
diff --git a/deploy/get_conf.py b/deploy/get_conf.py index a2d7bf6a..37da2beb 100755 --- a/deploy/get_conf.py +++ b/deploy/get_conf.py @@ -91,7 +91,8 @@ def dha_config_parse(s, dha_file): def config(dha_file, network_file): data = init(dha_file) + ceph_disk_name = data.get('ceph_disk_name') hosts_name = dha_config_parse(data, dha_file) data = init(network_file) network_map, vip, interface_map = network_config_parse(data, network_file) - return interface_map, hosts_name, network_map, vip + return interface_map, hosts_name, network_map, vip, ceph_disk_name diff --git a/deploy/tempest.py b/deploy/tempest.py index 263e62e0..bd1bc04f 100644 --- a/deploy/tempest.py +++ b/deploy/tempest.py @@ -66,7 +66,7 @@ def prepare_install(): print("get config...") conf = cfg.ConfigOpts() parse(conf, sys.argv[1:]) - host_interface_map, hosts_name, network_map, vip = \ + host_interface_map, hosts_name, network_map, vip, ceph_disk_name = \ get_conf.config(conf['dha'], conf['network']) if conf['cluster'] and conf['cluster'] == 'yes': print("add cluster...") @@ -88,6 +88,17 @@ def prepare_install(): cluster_id = cluster_info.id add_hosts_interface(cluster_id, hosts_info, hosts_name, host_interface_map, vip) + if len(hosts_name) == 1: + protocol_type = 'LVM' + service_name = 'cinder' + elif len(hosts_name) > 2: + protocol_type = 'RAW' + service_name = 'ceph' + else: + print('hosts_num is %s' % len(hosts_name)) + protocol_type = None + enable_cinder_backend(cluster_id, service_name, + ceph_disk_name, protocol_type) if 'isbare' in conf and conf['isbare'] == 0: install_os_for_vm_step1(cluster_id) else: @@ -117,9 +128,7 @@ def install_os_for_vm_step1(cluster_id): def install_os_for_bm_oneshot(cluster_id): - cluster_meta = {'cluster_id': cluster_id, - 'pxe_only': "false", - 'skip_pxe_ipmi': "false"} + cluster_meta = {'cluster_id': cluster_id} client.install.install(**cluster_meta) @@ -217,5 +226,22 @@ def add_host_role(cluster_id, host_id, host_exp_name, host_real_name, vip): client.roles.update(computer_role_id, **role_computer_update_meta) +def enable_cinder_backend(cluster_id, service_name, disk_name, protocol_type): + role_meta = {'filters': {'cluster_id': cluster_id}} + role_list_generator = client.roles.list(**role_meta) + role_list = [role for role in role_list_generator] + lb_role_id = [role.id for role in role_list if + role.name == "CONTROLLER_LB"][0] + service_disk_meta = {'service': service_name, + 'disk_location': 'local', + 'partition': disk_name, + 'protocol_type': protocol_type, + 'role_id': lb_role_id} + try: + client.disk_array.service_disk_add(**service_disk_meta) + except Exception as e: + print e + + if __name__ == "__main__": prepare_install() diff --git a/docker/Dockerfile b/docker/Dockerfile index 6026db15..06002bff 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -15,6 +15,10 @@ RUN yum -y install \ make \ rpm \ rpm-build \ + gcc \ + autoconf \ + automake \ + glibc-devel \ python-sphinx \ python-XStatic-Angular \ python-XStatic-Angular-Bootstrap \ diff --git a/docs/developer/design/multicast.rst b/docs/developer/design/multicast.rst new file mode 100644 index 00000000..89422fe6 --- /dev/null +++ b/docs/developer/design/multicast.rst @@ -0,0 +1,278 @@ +Detailed Design +=============== + +Protocol Design +--------------- + +1. All Protocol headers are 1 byte long or align to 4 bytes. +2. Packet size should not exceed above 1500(MTU) bytes including UDP/IP header and should +be align to 4 bytes. In future, MTU can be modified larger than 1500(Jumbo Frame) through +cmd line option to enlarge the data throughput. + +/* Packet header definition (align to 4 bytes) */ +struct packet_ctl { + uint32_t seq; // packet seq number start from 0, unique in server life cycle. + uint32_t crc; // checksum + uint32_t data_size; // payload length + uint8_t data[0]; +}; + +/* Buffer info definition (align to 4 bytes) */ +struct buffer_ctl { + uint32_t buffer_id; // buffer seq number start from 0, unique in server life cycle. + uint32_t buffer_size; // payload total length of a buffer + uint32_t packet_id_base; // seq number of the first packet in this buffer. + uint32_t pkt_count; // number of packet in this buffer, 0 means EOF. +}; + + +3. 1-byte-long header definition + +Signals such as the four below are 1 byte long, to simplify the receive process(since it +cannot be spitted ). + +#define CLIENT_READY 0x1 +#define CLIENT_REQ 0x2 +#define CLIENT_DONE 0x4 +#define SERVER_SENT 0x8 + +Note: Please see the collaboration diagram for their meanings. + +4. Retransmission Request Header + +/* Retransmition Request Header (align to 4 bytes) */ +struct request_ctl { + uint32_t req_count; // How many seqs below. + uint32_t seqs[0]; // packet seqs. +}; + +5. Buffer operations + +void buffer_init(); // Init the buffer_ctl structure and all(say 1024) packet_ctl +structures. Allocate buffer memory. +long buffer_fill(int fd); // fill a buffer from fd, such as stdin +long buffer_flush(int fd); // flush a buffer to fd, say stdout +struct packet_ctl *packet_put(struct packet_ctl *new_pkt);// put a packet to a buffer +and return a free memory slot for the next packet. +struct packet_ctl *packet_get(uint32_t seq);// get a packet data in buffer by +indicating the packet seq. + + +How to sync between server threads +---------------------------------- + +If children's aaa() operation need to wait the parents's init() to be done, then do it +literally like this: + + UDP Server + TCP Server1 = spawn( )----> TCP Server1 + init() + TCP Server2 = spawn( )-----> TCP Server2 + V(sem)----------------------> P(sem) // No child any more + V(sem)---------------------> P(sem) + aaa() // No need to V(sem), for no child + aaa() + +If parent's send() operation need to wait the children's ready() done, then do it +literally too, but is a reverse way: + + UDP Server TCP Server1 TCP Server2 + // No child any more + ready() ready() + P(sem) <--------------------- V(sem) + P(sem) <------------------ V(sem) + send() + +Note that the aaa() and ready() operations above run in parallel. If this is not the +case due to race condition, the sequence above can be modified into this below: + + UDP Server TCP Server1 TCP Server2 + // No child any more + ready() + P(sem) <--------------------- V(sem) + ready() + P(sem) <------------------- V(sem) + send() + + +In order to implement such chained/zipper sync pattern, a pair of semaphores is +needed between the parent and the child. One is used by child to wait parent , the +other is used by parent to wait child. semaphore pair can be allocated by parent +and pass the pointer to the child over spawn() operation such as pthread_create(). + +/* semaphore pair definition */ +struct semaphores { + sem_t wait_parent; + sem_t wait_child; +}; + +Then the semaphore pair can be recorded by threads by using the semlink struct below: +struct semlink { + struct semaphores *this; /* used by parent to point to the struct semaphores + which it created during spawn child. */ + struct semaphores *parent; /* used by child to point to the struct + semaphores which it created by parent */ +}; + +chained/zipper sync API: + +void sl_wait_child(struct semlink *sl); +void sl_release_child(struct semlink *sl); +void sl_wait_parent(struct semlink *sl); +void sl_release_parent(struct semlink *sl); + +API usage is like this. + +Thread1(root parent) Thread2(child) Thread3(grandchild) +sl_wait_parent(noop op) +sl_release_child + +---------->sl_wait_parent + sl_release_child + +-----------> sl_wait_parent + sl_release_child(noop op) + ... + sl_wait_child(noop op) + + sl_release_parent + sl_wait_child <------------- + + sl_release_parent +sl_wait_child <------------ +sl_release_parent(noop op) + +API implementation: + +void sl_wait_child(struct semlink *sl) +{ + if (sl->this) { + P(sl->this->wait_child); + } +} + +void sl_release_child(struct semlink *sl) +{ + if (sl->this) { + V(sl->this->wait_parent); + } +} + +void sl_wait_parent(struct semlink *sl) +{ + if (sl->parent) { + P(sl->parent->wait_parent); + } +} + +void sl_release_parent(struct semlink *sl) +{ + if (sl->parent) { + V(sl->parent->wait_child); + } +} + +Client flow chart +----------------- +See Collaboration Diagram + +UDP thread flow chart +--------------------- +See Collaboration Diagram + +TCP thread flow chart +--------------------- + + +S_INIT --- (UDP initialized) ---> S_ACCEPT --- (accept clients) --+ + | + /----------------------------------------------------------------/ + V +S_PREP --- (UDP prepared abuffer) + ^ | + | \--> S_SYNC --- (clients ClIENT_READY) + | | + | \--> S_SEND --- (clients CLIENT_DONE) + | | + | V + \---------------(bufferctl.pkt_count != 0)-----------------------+ + | + V + exit() <--- (bufferctl.pkt_count == 0) + + +TCP using poll and message queue +-------------------------------- + +TCP uses poll() to sync with client's events as well as output event from itself, so +that we can use non-block socket operations to reduce the latency. POLLIN means there +are message from client and POLLOUT means we are ready to send message/retransmission +packets to client. + +poll main loop pseudo code: +void check_clients(struct server_status_data *sdata) +{ + poll_events = poll(&(sdata->ds[1]), sdata->ccount - 1, timeout); + + /* check all connected clients */ + for (sdata->cindex = 1; sdata->cindex < sdata->ccount; sdata->cindex++) { + ds = &(sdata->ds[sdata->cindex]); + if (!ds->revents) { + continue; + } + + if (ds->revents & (POLLERR|POLLHUP|POLLNVAL)) { + handle_error_event(sdata); + } else if (ds->revents & (POLLIN|POLLPRI)) { + handle_pullin_event(sdata); // may set POLLOUT into ds->events + // to trigger handle_pullout_event(). + } else if (ds->revents & POLLOUT) { + handle_pullout_event(sdata); + } + } +} + +For TCP, since the message from client may not complete and send data may be also +interrupted due to non-block fashion, there should be one send message queue and a +receive message queue on the server side for each client (client do not use non-block +operations). + +TCP message queue definition: + +struct tcpq { + struct qmsg *head, *tail; + long count; /* message count in a queue */ + long size; /* Total data size of a queue */ +}; + +TCP message queue item definition: + +struct qmsg { + struct qmsg *next; + void *data; + long size; +}; + +TCP message queue API: + +// Allocate and init a queue. +struct tcpq * tcpq_queue_init(void); + +// Free a queue. +void tcpq_queue_free(struct tcpq *q); + +// Return queue length. +long tcpq_queue_dsize(struct tcpq *q); + +// queue new message to tail. +void tcpq_queue_tail(struct tcpq *q, void *data, long size); + +// queue message that cannot be sent currently back to queue head. +void tcpq_queue_head(struct tcpq *q, void *data, long size); + +// get one piece from queue head. +void * tcpq_dequeue_head(struct tcpq *q, long *size); + +// Serialize all pieces of a queue, and move it out of queue, to ease the further +//operation on it. +void * tcpq_dqueue_flat(struct tcpq *q, long *size); + +// Serialize all pieces of a queue, do not move it out of queue, to ease the further +//operation on it. +void * tcpq_queue_flat_peek(struct tcpq *q, long *size); diff --git a/docs/developer/spec/multicast.rst b/docs/developer/spec/multicast.rst new file mode 100644 index 00000000..ba314d3a --- /dev/null +++ b/docs/developer/spec/multicast.rst @@ -0,0 +1,190 @@ +Requirement +=========== +1. When deploying a large OPNFV/OpenStack cluster, we would like to take the advantage of UDP +multicast to prevent the network bottleneck when distributing Kolla container from one +Installer Server to all target hosts by using unicast. + +2. When it comes to auto scaling (extension) of compute nodes, use unicast is acceptable, since +the number of nodes in this condition is usually small. + +The basic step to introduce multicast to deployment is: +a. Still setup the monopolistic docker registry server on Daisy server as a failsafe. +b. Daisy server, as the multicast server, prepares the image file to be transmitted, and count +how many target hosts(as the multicast clients)that should receive the image file +simultaneously. +c. Multicast clients tell the multicast server about ready to receive the image. +d. Multicast server transmits image over UDP multicast channel. +e. Multicast clients report success after received the whole image. +f. Setup docker registry server on each target hosts based upon received docker image. +g. Setup Kolla ansible to use 127.0.0.1 as the registry server IP so that the real docker +container retrieving network activities only take place inside target hosts. + + +Design +====== + +Methods to achieve +------------------ + +TIPC +++++ + +TIPC or its wrapper such as ZeroMQ is good at multicast, but it is not suitable as an +installer: +1. The default TIPC kernel module equipped by CentOS7(kernel verison 3.10) is NOT stable +especially in L3 multicast(although we can use L2 multicast, but the network will be limited to +L2). If errors happen, it is hard for us to recover a node from kernel panic. + +2. TIPC's design is based on a stable node cluster environment, esp in Lossless Ethernet. But +the real environment is generally not in that case. When multicast is broken, Installer should +switch to unicast, but TIPC currently do not have such capability. + +Top level design +---------------- +1. There are two kinds of thread on the server side, one is UDP multicast thread the other is +TCP sync/retransmit thread. There will be more than one TCP threads since one TCP thread can +only serve a limited client (say 64~128) in order to limit the CPU load and unicast retransmit +network usage. + +2. There is only one thread on client side. + +3. All the packets that a client lost during UDP multicast will be request by client to the TCP +thread and resend by using TCP unicast, if unicast still cannot deliver the packets successfully, +the client will failback to using the monopolistic docker registry server on Daisy server as a +failsafe option. + +4. Each packet needs checksum. + + +UDP Server Design (runs on Daisy Server) +---------------------------------------- + +1. Multicast group IP and Port should be configurable, as well as the interface that will be +used as the egress of the multicast packets. The user will pass the interface's IP as the +handle to find the egress. + +2. Image data to be sent is passed to server through stdin. + +3. Consider the size of image is large (xGB), the server cannot pre-allocate whole buffer to +hold all image at once. Besides, since the data is from stdin and the actual length is +unpredictable. So the server should split the data into small size buffers and send to the +clients one by one. Furthermore, buffer shall be divided into packets which size is MTU +including the UDP/IP header. Then the buffer size can be , for example 1024 * MTU including the +UDP/IP header. + +4. After sending one buffer to client the server should stop and get feedback from client to +see if all clients have got all packets in that buffer. If any clients lost any buffer, client +should request the server to resend packets from a more stable way(TCP). + +5. when got the EOF from stdin, server should send a buffer which size is 0 as an EOF signal to +the client to let it know about the end of sending. + + +TCP Server Design (runs on Daisy Server) +---------------------------------------- + +1. All TCP server threads and the only one UDP thread share one process. The UDP thread is the +parent thread, and the first TCP thread is the child, while the second TCP thread is the +grandchild, and so on. Thus, for each TCP thread, there is only one parent and at most one +child. + +2. TCP thread accepts the connect request from client. The number of client is predefined by +server cmdline parameter. Each TCP thread connect with at most ,say 64 clients, if there are +more clients to be connected to, then a child TCP thread is spawned by the parent. + +3. Before UDP thread sending any buffer to client, all TCP threads should send UDP multicast +IP/Port information to their clients beforehand. + +4. During each buffer sending cycle, TCP threads send a special protocol message to tell +clients about the size/id of the buffer and id of each packet in it. After getting +acknowledgements from all clients, TCP threads then signal the UDP thread to start +multicasting buffer over UDP. After multicasting finished, TCP threads notifies clients +multicast is done, and wait acknowledgements from clients again. If clients requests +retransmission, then it is the responsibility of TCP threads to resend packets over unicast. +If no retransmission needed, then clients should signal TCP threads that they are ready for +the next buffer to come. + +5. Repeat step 4 if buffer size is not 0 in the last round, otherwise, TCP server shutdown +connection and exit. + + +Server cmdline usage example +---------------------------- + +./server <local_ip> <number_of_clients> [port] < kolla_image.tgz + +<local_ip> is used here to specify the multicast egress interface. But which interface will be +used by TCP is leaved to route table to decide. +<number_of_clients> indicates the number of clients , thus the number of target hosts which +need to receive the image. +[port] is the port that will be used by both UDP and TCP. Default value can be used if user +does not provide it. + + +Client Design(Target Host side) +-------------------------------- + +1. Each target hosts has only one client process. + +2. Client connect to TCP server according to the cmdline parameters right after start up. + +3. After connecting to TCP server, client first read from TCP server the multicast group +information which can be used to create the multicast receive socket then. + +4. During each buffer receiving cycle, the client first read from TCP server the buffer info, +prepare the receive buffer, and acknowledge the TCP server that it is ready to receive. Then, +client receive buffer from the multicast socket until TCP server notifying the end of +multicast. By compare the buffer info and the received packets, the client knows whether to +send the retransmission request or not and whether to wait retransmission packet or not. +After all packets are received from UDP/TCP, the client eventually flush buffer to stdout +and tells the TCP server about ready to receive the next buffer. + +5. Repeat step 4 if buffer size is not 0 in the last round, otherwise, client shutdowns +connection and exit. + +Client cmdline usage example +---------------------------- + +./client <local_ip> <server_ip> [port] > kolla_image.tgz + +<local_ip> is used here to specify the multicast ingress interface. But which interface +will be used by TCP is leaved to route table to decide. +<server_ip> indicates the TCP server IP to be connected to. +[port] is the port that will be used by both connect to TCP server and receive multicast +data. + + +Collaboration diagram among UDP Server, TCP Server(illustrate only one TCP thread) +and Clients: + + +UDP Server TCP Server Client + | | | +init mcast group +init mcast send socket + ----------------------------------> + accept clients + <------------------------connet------------------ + --------------------send mcast group info-------> + <---------------------------------- + state = PREP +do { +read data from stdin +prepare one buffer + -----------------------------------> + state = SYNC + -------------------send buffer info--------------> + <----------------------send ClIENT_READY----------- + <---------------------------------- + state = SEND + + ================================================send buffer over UDP multicast======> + -----------------------------------> + -----------------------send SERVER_SENT-----------> + [<-------------------send CLIENT_REQUEST----------] + [--------------send buffer over TCP unicast------>] + flush buffer to stdout + <-------------------send CLIENT_DONE--------------- + <---------------------------------- + state = PREP +while (buffer.len != 0) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/requirements.txt diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..d556de77 --- /dev/null +++ b/setup.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +from setuptools import setup + +setup( + name="daisy", + version="master", + url="https://www.opnfv.org", +) diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 00000000..0483907e --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,5 @@ +pytest +pytest-cov +pytest-faker +pytest-mock + diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/__init__.py diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unit/__init__.py diff --git a/tests/unit/test_placeholder.py b/tests/unit/test_placeholder.py new file mode 100644 index 00000000..457e464c --- /dev/null +++ b/tests/unit/test_placeholder.py @@ -0,0 +1,2 @@ +def test_holder(): + assert True diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..28fbf8f5 --- /dev/null +++ b/tox.ini @@ -0,0 +1,42 @@ +# Tox (http://tox.testrun.org/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. + +[tox] +envlist = py27,pep8 +skipsdist = True + +[testenv] +usedevelop = True +install_command = pip install -U {opts} {packages} +deps = + -rrequirements.txt + -rtest-requirements.txt +commands= + py.test \ + --basetemp={envtmpdir} \ + --cov \ + --cov-report term-missing \ + --cov-report xml \ + {posargs} +setenv= + HOME = {envtmpdir} + PYTHONPATH = {toxinidir} + +[testenv:pep8] +deps = flake8 +commands = flake8 {toxinidir} + +[flake8] +# H803 skipped on purpose per list discussion. +# E123, E125 skipped as they are invalid PEP-8. + +show-source = True +ignore = E123,E125,H803,E501 +builtins = _ +exclude = build,dist,doc,legacy,.eggs,.git,.tox,.venv + +[pytest] +testpaths = tests +python_functions = test_* |