diff options
Diffstat (limited to 'VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp')
-rw-r--r-- | VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp | 406 |
1 files changed, 406 insertions, 0 deletions
diff --git a/VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp b/VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp new file mode 100644 index 00000000..e493ef3f --- /dev/null +++ b/VNFs/DPPD-PROX/tools/flow_extract/streamextract.cpp @@ -0,0 +1,406 @@ +/* +// Copyright (c) 2010-2017 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#include <inttypes.h> +#include <string> +#include <cstdio> +#include <iostream> +#include <sys/stat.h> +#include <sys/types.h> +#include <sstream> +#include <set> +#include <arpa/inet.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/mman.h> +#include <cerrno> +#include <cstdlib> +#include <map> + +#include "path.hpp" +#include "bundle.hpp" +#include "stream.hpp" +#include "stream2.hpp" +#include "allocator.hpp" +#include "timestamp.hpp" +#include "streamextract.hpp" +#include "pcapreader.hpp" +#include "pcapwriter.hpp" +#include "flowtable.hpp" +#include "stream3.hpp" +#include "netsocket.hpp" +#include "pcappktref.hpp" +#include "progress.hpp" +#include "mappedfile.hpp" +#include "streamsorter.hpp" + +using namespace std; + +static bool is_dir(const string& path_dir_out) +{ + struct stat s = { 0 }; + + if (stat(path_dir_out.c_str(), &s)) { + return false; + } + + return s.st_mode & S_IFDIR; +} + +StreamExtract::StreamExtract(const ProgramConfig &cfg) + : ft2(cfg.flowTableSize), + streamSorter(cfg.flowTableSize, cfg.path_dir_out, 1024UL*1024*1024*8), + cfg(cfg) +{ +} + +vector<Bundle> StreamExtract::createBundles(const string& streamPath) +{ + map<uint32_t, Bundle>::iterator iterBundle; + map<uint32_t, Bundle> bundles; + set<uint32_t> servers; + + Stream2 s; + ifstream binIn; + + binIn.open(streamPath.c_str()); + binIn.seekg(0, binIn.end); + Progress progress(binIn.tellg()); + binIn.seekg(0, binIn.beg); + + while (!s.fromFile(&binIn)) { + if (progress.couldRefresh()) { + progress.setProgress(binIn.tellg()); + progress.refresh(); + } + if (!s.streamHdr.completedTCP) + continue; + if (!s.streamHdr.serverHdrLen) + continue; + /* The current implementation does not support clients + that are also servers. */ + servers.insert(s.streamHdr.serverIP); + if (servers.find(s.streamHdr.clientIP) != servers.end()) + continue; + + /* Since each application is represented as a path + graph (there is only one reply for a given request + and only one request after a given reply), each + application must run on a unique server. For this + reason, check if the socket on the server already + is occupied and if so, keep incrementing the socket + until the collision is resolved. */ + iterBundle = bundles.find(s.streamHdr.clientIP); + + if (iterBundle == bundles.end()) { + bundles.insert(make_pair(s.streamHdr.clientIP, Bundle())); + iterBundle = bundles.find(s.streamHdr.clientIP); + } + + (*iterBundle).second.addStream(s.streamHdr.streamId, s.getServerNetSocket().port); + } + + progress.setProgress(); + progress.refresh(true); + + binIn.close(); + + vector<Bundle> ret; + + ret.reserve(bundles.size()); + + for (map<uint32_t, Bundle>::const_iterator i = bundles.begin(); i != bundles.end(); ++i) + ret.push_back(i->second); + + return ret; +} + +set<uint32_t> StreamExtract::getBundleStreamIDs(const vector<Bundle*>& bundleSamples) +{ + set<uint32_t> streamIDs; + + for (size_t i = 0; i < bundleSamples.size(); ++i) { + const vector<uint32_t> &bundleStreamIDs = bundleSamples[i]->getStream(); + + for (vector<uint32_t>::const_iterator j = bundleStreamIDs.begin(); j != bundleStreamIDs.end(); ++j) { + streamIDs.insert(*j); + } + } + + return streamIDs; +} + +static size_t getRandom(size_t limit) +{ + size_t r = rand(); + size_t rand_limit = (RAND_MAX/limit)*limit; + + while (r > rand_limit) + r = rand(); + + return r % limit; +} + +static void removeFill(vector<Bundle*> *from, size_t idx) +{ + Bundle *last = from->back(); + from->pop_back(); + + if (idx != from->size()) + (*from)[idx] = last; +} + +static vector<Bundle*> takeSamples(vector<Bundle>& bundles, size_t sampleCount) +{ + vector<Bundle*> bundleSamples; + + bundleSamples.reserve(bundles.size()); + + cout << "Sampling " << sampleCount << " bundles out of " << bundles.size() << endl; + for (size_t i = 0; i < bundles.size(); ++i) + bundleSamples.push_back(&bundles[i]); + + srand(1000); + while (bundleSamples.size() > sampleCount) { + size_t r = getRandom(bundleSamples.size()); + removeFill(&bundleSamples, r); + } + return bundleSamples; +} + +static size_t replaceWithRunningTotals(vector<size_t> *streamLength) +{ + size_t runningTotal = 0; + for (size_t i = 0; i < streamLength->size(); ++i) { + size_t len = (*streamLength)[i] + sizeof(uint32_t); + (*streamLength)[i] = runningTotal; + runningTotal += len; + } + return runningTotal; +} + +static void printPorts(const vector<Bundle> &bundles) +{ + set<uint32_t> streamIDs; + + for (size_t i = 0; i < bundles.size(); ++i) { + const vector<uint32_t> &ports = bundles[i].getPorts(); + + for (size_t j = 0; j < ports.size(); ++j) { + if (j + 1 == ports.size()) + cout << ports[j] << ",END" << endl; + else + cout << ports[j] << "," << ports[j +1] << endl; + } + } +} + +string StreamExtract::createStreamPcapFileName(int id) +{ + stringstream ss; + + ss << cfg.path_dir_out << "/s" << id << ".pcap"; + + return ss.str(); +} + +int StreamExtract::writeToPcaps(const string &sourceFilePath, const set<uint32_t> &streamIDs) +{ + set<uint32_t>::const_iterator i = streamIDs.begin(); + + MappedFile mappedFile; + if (mappedFile.open(sourceFilePath)) { + cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl; + return -1; + } + + PcapPkt::allocator = NULL; + + Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg()); + cout << "Writing " << streamIDs.size() << " streams to pcaps" << endl; + uint8_t *data2 = mappedFile.getMapBeg(); + while (data2 < mappedFile.getMapEnd()) { + uint32_t id = *reinterpret_cast<uint32_t *>(data2); + + data2 += sizeof(id); + uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2); + data2 += sizeof(pktCount); + Stream s(id, pktCount); + while (pktCount--) { + PcapPkt p(data2); + + data2 += p.memSize(); + s.addPkt(p); + } + + while (i != streamIDs.end() && (*i) < id) + i++; + if (i == streamIDs.end()) + break; + if (*i > id) + continue; + + const string pcapPath = createStreamPcapFileName(id); + + s.toPcap(pcapPath); + if (progress.couldRefresh()) { + progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg()); + progress.refresh(); + mappedFile.sync(); + } + } + + progress.setProgress(data2 - mappedFile.getMapBeg()); + progress.refresh(true); + + mappedFile.close(); + return 0; +} + +int StreamExtract::writeToLua(const string& binFilePath, const Path &smallFinalBin, const string& luaFilePath, const string &orderedTemp) +{ + vector<Bundle> bundles = createBundles(binFilePath); + vector<Bundle*> bundleSamples = takeSamples(bundles, cfg.sampleCount); + set<uint32_t> streamIDs = getBundleStreamIDs(bundleSamples); + + if (cfg.write_pcaps) + writeToPcaps(orderedTemp, streamIDs); + + ofstream outLua; + ofstream outSmallBin; + outLua.open(luaFilePath.c_str()); + outLua << "bf = \""<< smallFinalBin.getFileName() << "\"" << endl; + outLua << "s = {}\n"; + set<uint32_t>::iterator i = streamIDs.begin(); + + set<NetSocket> serverSockets; + ifstream binIn; + Stream2 s; + + outSmallBin.open(smallFinalBin.str().c_str()); + binIn.open(binFilePath.c_str()); + while (!s.fromFile(&binIn)) { + while (i != streamIDs.end() && (*i) < s.streamHdr.streamId) + i++; + if (i == streamIDs.end()) + break; + if (*i > s.streamHdr.streamId) + continue; + s.calcOffsets(&outSmallBin); + s.toFile(&outSmallBin); + while (serverSockets.find(s.getServerNetSocket()) != serverSockets.end()) { + NetSocket ns = s.getServerNetSocket(); + + ns.port++; + s.setServerNetSocket(ns); + } + serverSockets.insert(s.getServerNetSocket()); + + s.toLua(&outLua, "bf", "s"); + } + binIn.close(); + + uint32_t bundleCount = 0; + + outLua << "bundles = {}" << endl; + for (size_t i = 0; i < bundleSamples.size(); ++i) { + bundleSamples[i]->toLua(&outLua, "s", ++bundleCount); + } + outLua << "return bundles" << endl; + outLua.close(); + return 0; +} + +int StreamExtract::writeFinalBin(const string& sourceFilePath, const string& destFilePath) +{ + MappedFile mappedFile; + if (mappedFile.open(sourceFilePath)) { + cerr << "Failed to open file " << sourceFilePath << ":" << strerror(errno) << endl; + return -1; + } + ofstream binOut; + + binOut.open(destFilePath.c_str()); + PcapPkt::allocator = NULL; + + Progress progress((uint64_t)mappedFile.getMapEnd() - (uint64_t)mappedFile.getMapBeg()); + + int streamCount = 0; + uint8_t *data2 = mappedFile.getMapBeg(); + while (data2 < mappedFile.getMapEnd()) { + uint32_t id = *reinterpret_cast<uint32_t *>(data2); + + data2 += sizeof(id); + uint32_t pktCount = *reinterpret_cast<uint32_t *>(data2); + data2 += sizeof(pktCount); + Stream s(id, pktCount); + while (pktCount--) { + PcapPkt p(data2); + + data2 += p.memSize(); + s.addPkt(p); + } + s.toFile(&binOut); + streamCount++; + if (progress.couldRefresh()) { + progress.setProgress((uint64_t)data2 - (uint64_t)mappedFile.getMapBeg()); + progress.refresh(); + mappedFile.sync(); + } + } + + progress.setProgress(data2 - mappedFile.getMapBeg()); + progress.refresh(true); + + binOut.close(); + mappedFile.close(); + return 0; +} + +int StreamExtract::run() +{ + Path p(cfg.path_dir_out); + p.mkdir(); + + string orderedTemp = p.add("/a").str(); + + string finalBin = p.add("/b").str(); + Path smallfinalBin = p.add("/data.bin").str(); + string luaFile = p.add("/cfg.lua").str(); + + cout << "Writing to directory '" << p.str() << "'" << endl; + cout << "Ordered streams '" << orderedTemp << "'" << endl; + cout << "Final binary output '" << finalBin << "'" << endl; + cout << "lua file '" << luaFile << "' will contain " << cfg.sampleCount << " bundles" << endl; + + if (cfg.run_first_step) { + cout << "starting sorting" << endl; + streamSorter.sort(cfg.path_file_in_pcap, orderedTemp); + cout << "writing final binary file (converting format)" << endl; + if (writeFinalBin(orderedTemp, finalBin)) + return -1; + } else { + cout << "Skipping first step" << endl; + if (!Path(finalBin).isFile()) { + cerr << "File is missing:" << finalBin << endl; + return -1; + } + } + cout << "writing Lua '" << luaFile << "'" << endl; + if (writeToLua(finalBin, smallfinalBin, luaFile, orderedTemp)) + return -1; + return 0; +} |