summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp')
-rw-r--r--VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp406
1 files changed, 406 insertions, 0 deletions
diff --git a/VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp b/VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp
new file mode 100644
index 00000000..e493ef3f
--- /dev/null
+++ b/VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp
@@ -0,0 +1,406 @@
+/*
+// Copyright (c) 2010-2017 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+
+#include <inttypes.h>
+#include <string>
+#include <cstdio>
+#include <iostream>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sstream>
+#include <set>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <cerrno>
+#include <cstdlib>
+#include <map>
+
+#include "path.hpp"
+#include "bundle.hpp"
+#include "stream.hpp"
+#include "stream2.hpp"
+#include "allocator.hpp"
+#include "timestamp.hpp"
+#include "streamextract.hpp"
+#include "pcapreader.hpp"
+#include "pcapwriter.hpp"
+#include "flowtable.hpp"
+#include "stream3.hpp"
+#include "netsocket.hpp"
+#include "pcappktref.hpp"
+#include "progress.hpp"
+#include "mappedfile.hpp"
+#include "streamsorter.hpp"
+
+using namespace std;
+
+static bool is_dir(const string& path_dir_out)
+{
+ struct stat s = { 0 };
+
+ if (stat(path_dir_out.c_str(), &s)) {
+ return false;
+ }
+
+ return s.st_mode & S_IFDIR;
+}
+
+StreamExtract::StreamExtract(const ProgramConfig &cfg)
+ : ft2(cfg.flowTableSize),
+ streamSorter(cfg.flowTableSize, cfg.path_dir_out, 1024UL*1024*1024*8),
+ cfg(cfg)
+{
+}
+
+vector<Bundle> StreamExtract::createBundles(const string& streamPath)
+{
+ map<uint32_t, Bundle>::iterator iterBundle;
+ map<uint32_t, Bundle> bundles;
+ set<uint32_t> servers;
+
+ Stream2 s;
+ ifstream binIn;
+
+ binIn.open(streamPath.c_str());
+ binIn.seekg(0, binIn.end);
+ Progress progress(binIn.tellg());
+ binIn.seekg(0, binIn.beg);
+
+ while (!s.fromFile(&binIn)) {
+ if (progress.couldRefresh()) {
+ progress.setProgress(binIn.tellg());
+ progress.refresh();
+ }
+ if (!s.streamHdr.completedTCP)
+ continue;
+ if (!s.streamHdr.serverHdrLen)
+ continue;
+ /* The current implementation does not support clients
+ that are also servers. */
+ servers.insert(s.streamHdr.serverIP);
+ if (servers.find(s.streamHdr.clientIP) != servers.end())
+ continue;
+
+ /* Since each application is represented as a path
+ graph (there is only one reply for a given request
+ and only one request after a given reply), each
+ application must run on a unique server. For this
+ reason, check if the socket on the server already
+ is occupied and if so, keep incrementing the socket
+ until the collision is resolved. */
+ iterBundle = bundles.find(s.streamHdr.clientIP);
+
+ if (iterBundle == bundles.end()) {
+ bundles.insert(make_pair(s.streamHdr.clientIP, Bundle()));
+ iterBundle = bundles.find(s.streamHdr.clientIP);
+ }
+
+ (*iterBundle).second.addStream(s.streamHdr.streamId, s.getServerNetSocket().port);
+ }
+
+ progress.setProgress();
+ progress.refresh(true);
+
+ binIn.close();
+
+ vector<Bundle> ret;
+
+ ret.reserve(bundles.size());
+
+ for (map<uint32_t, Bundle>::const_iterator i = bundles.begin(); i != bundles.end(); ++i)
+ ret.push_back(i->second);
+
+ return ret;
+}
+
+set<uint32_t> StreamExtract::getBundleStreamIDs(const vector<Bundle*>& bundleSamples)
+{
+ set<uint32_t> streamIDs;
+
+ for (size_t i = 0; i < bundleSamples.size(); ++i) {
+ const vector<uint32_t> &bundleStreamIDs = bundleSamples[i]->getStream();
+
+ for (vector<uint32_t>::const_iterator j = bundleStreamIDs.begin(); j != bundleStreamIDs.end(); ++j) {
+ streamIDs.insert(*j);
+ }
+ }
+
+ return streamIDs;
+}
+
+static size_t getRandom(size_t limit)
+{
+ size_t r = rand();
+ size_t rand_limit = (RAND_MAX/limit)*limit;
+
+ while (r > rand_limit)
+ r = rand();
+
+ return r % limit;
+}
+
+static void removeFill(vector<Bundle*> *from, size_t idx)
+{
+ Bundle *last = from->back();
+ from->pop_back();
+
+ if (idx != from->size())
+ (*from)[idx] = last;
+}
+
+static vector<Bundle*> takeSamples(vector<Bundle>& bundles, size_t sampleCount)
+{
+ vector<Bundle*> bundleSamples;
+
+ bundleSamples.reserve(bundles.size());
+
+ cout << "Sampling " << sampleCount << " bundles out of " << bundles.size() << endl;
+ for (size_t i = 0; i < bundles.size(); ++i)
+ bundleSamples.push_back(&bundles[i]);
+
+ srand(1000);
+ while (bundleSamples.size() > sampleCount) {
+ size_t r = getRandom(bundleSamples.size());
+ removeFill(&bundleSamples, r);
+ }
+ return bundleSamples;
+}
+
+static size_t replaceWithRunningTotals(vector<size_t> *streamLength)
+{
+ size_t runningTotal = 0;
+ for (size_t i = 0; i < streamLength->size(); ++i) {
+ size_t len = (*streamLength)[i] + sizeof(uint32_t);
+ (*streamLength)[i] = runningTotal;
+ runningTotal += len;
+ }
+ return runningTotal;
+}
+
+static void printPorts(const vector<Bundle> &bundles)
+{
+ set<uint32_t> streamIDs;
+
+ for (size_t i = 0; i < bundles.size(); ++i) {
+ const vector<uint32_t> &ports = bundles[i].getPorts();
+
+ for (size_t j = 0; j < ports.size(); ++j) {
+ if (j + 1 == ports.size())
+ cout << ports[j] << ",END" << endl;
+ else
+ cout << ports[j] << "," << ports[j +1] << endl;
+ }
+ }
+}
+
+string StreamExtract::createStreamPcapFileName(int id)
+{
+ stringstream ss;
+
+ ss << cfg.path_dir_out << "/s" << id << ".pcap";
+
+ return ss.str();
+}
+
+int StreamExtract::writeToPcaps(const string &sourceFilePath, const set<uint32_t> &streamIDs)
+{
+ set<uint32_t>::const_iterator i = streamIDs.begin();
+
+ MappedFile mappedFile;
+ if (mappedFile.open(sourceFilePath)) {
+ cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl;
+ return -1;
+ }
+
+ PcapPkt::allocator = NULL;
+
+ Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg());
+ cout << "Writing " << streamIDs.size() << " streams to pcaps" << endl;
+ uint8_t *data2 = mappedFile.getMapBeg();
+ while (data2 < mappedFile.getMapEnd()) {
+ uint32_t id = *reinterpret_cast<uint32_t *>(data2);
+
+ data2 += sizeof(id);
+ uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2);
+ data2 += sizeof(pktCount);
+ Stream s(id, pktCount);
+ while (pktCount--) {
+ PcapPkt p(data2);
+
+ data2 += p.memSize();
+ s.addPkt(p);
+ }
+
+ while (i != streamIDs.end() && (*i) < id)
+ i++;
+ if (i == streamIDs.end())
+ break;
+ if (*i > id)
+ continue;
+
+ const string pcapPath = createStreamPcapFileName(id);
+
+ s.toPcap(pcapPath);
+ if (progress.couldRefresh()) {
+ progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg());
+ progress.refresh();
+ mappedFile.sync();
+ }
+ }
+
+ progress.setProgress(data2 - mappedFile.getMapBeg());
+ progress.refresh(true);
+
+ mappedFile.close();
+ return 0;
+}
+
+int StreamExtract::writeToLua(const string& binFilePath, const Path &smallFinalBin, const string& luaFilePath, const string &orderedTemp)
+{
+ vector<Bundle> bundles = createBundles(binFilePath);
+ vector<Bundle*> bundleSamples = takeSamples(bundles, cfg.sampleCount);
+ set<uint32_t> streamIDs = getBundleStreamIDs(bundleSamples);
+
+ if (cfg.write_pcaps)
+ writeToPcaps(orderedTemp, streamIDs);
+
+ ofstream outLua;
+ ofstream outSmallBin;
+ outLua.open(luaFilePath.c_str());
+ outLua << "bf = \""<< smallFinalBin.getFileName() << "\"" << endl;
+ outLua << "s = {}\n";
+ set<uint32_t>::iterator i = streamIDs.begin();
+
+ set<NetSocket> serverSockets;
+ ifstream binIn;
+ Stream2 s;
+
+ outSmallBin.open(smallFinalBin.str().c_str());
+ binIn.open(binFilePath.c_str());
+ while (!s.fromFile(&binIn)) {
+ while (i != streamIDs.end() && (*i) < s.streamHdr.streamId)
+ i++;
+ if (i == streamIDs.end())
+ break;
+ if (*i > s.streamHdr.streamId)
+ continue;
+ s.calcOffsets(&outSmallBin);
+ s.toFile(&outSmallBin);
+ while (serverSockets.find(s.getServerNetSocket()) != serverSockets.end()) {
+ NetSocket ns = s.getServerNetSocket();
+
+ ns.port++;
+ s.setServerNetSocket(ns);
+ }
+ serverSockets.insert(s.getServerNetSocket());
+
+ s.toLua(&outLua, "bf", "s");
+ }
+ binIn.close();
+
+ uint32_t bundleCount = 0;
+
+ outLua << "bundles = {}" << endl;
+ for (size_t i = 0; i < bundleSamples.size(); ++i) {
+ bundleSamples[i]->toLua(&outLua, "s", ++bundleCount);
+ }
+ outLua << "return bundles" << endl;
+ outLua.close();
+ return 0;
+}
+
+int StreamExtract::writeFinalBin(const string& sourceFilePath, const string& destFilePath)
+{
+ MappedFile mappedFile;
+ if (mappedFile.open(sourceFilePath)) {
+ cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl;
+ return -1;
+ }
+ ofstream binOut;
+
+ binOut.open(destFilePath.c_str());
+ PcapPkt::allocator = NULL;
+
+ Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg());
+
+ int streamCount = 0;
+ uint8_t *data2 = mappedFile.getMapBeg();
+ while (data2 < mappedFile.getMapEnd()) {
+ uint32_t id = *reinterpret_cast<uint32_t *>(data2);
+
+ data2 += sizeof(id);
+ uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2);
+ data2 += sizeof(pktCount);
+ Stream s(id, pktCount);
+ while (pktCount--) {
+ PcapPkt p(data2);
+
+ data2 += p.memSize();
+ s.addPkt(p);
+ }
+ s.toFile(&binOut);
+ streamCount++;
+ if (progress.couldRefresh()) {
+ progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg());
+ progress.refresh();
+ mappedFile.sync();
+ }
+ }
+
+ progress.setProgress(data2 - mappedFile.getMapBeg());
+ progress.refresh(true);
+
+ binOut.close();
+ mappedFile.close();
+ return 0;
+}
+
+int StreamExtract::run()
+{
+ Path p(cfg.path_dir_out);
+ p.mkdir();
+
+ string orderedTemp = p.add("/a").str();
+
+ string finalBin = p.add("/b").str();
+ Path smallfinalBin = p.add("/data.bin").str();
+ string luaFile = p.add("/cfg.lua").str();
+
+ cout << "Writing to directory '" << p.str() << "'" << endl;
+ cout << "Ordered streams '" << orderedTemp << "'" << endl;
+ cout << "Final binary output '" << finalBin << "'" << endl;
+ cout << "lua file '" << luaFile << "' will contain " << cfg.sampleCount << " bundles" << endl;
+
+ if (cfg.run_first_step) {
+ cout << "starting sorting" << endl;
+ streamSorter.sort(cfg.path_file_in_pcap, orderedTemp);
+ cout << "writing final binary file (converting format)" << endl;
+ if (writeFinalBin(orderedTemp, finalBin))
+ return -1;
+ } else {
+ cout << "Skipping first step" << endl;
+ if (!Path(finalBin).isFile()) {
+ cerr << "File is missing:" << finalBin << endl;
+ return -1;
+ }
+ }
+ cout << "writing Lua '" << luaFile << "'" << endl;
+ if (writeToLua(finalBin, smallfinalBin, luaFile, orderedTemp))
+ return -1;
+ return 0;
+}