diff options
author | Jan Huwald <jh@sotun.de> | 2013-01-28 16:33:06 (GMT) |
---|---|---|
committer | Jan Huwald <jh@sotun.de> | 2013-01-28 16:33:06 (GMT) |
commit | d606b9cce4d010b145cd18b93b8b902a4db76343 (patch) | |
tree | 3a33abdc871bbb2dcf09977e98617fb6748e4857 | |
parent | b02e9b19e76245052762b7e44d45d12bae83e76e (diff) |
add hjoin
-rw-r--r-- | Makefile | 9 | ||||
-rw-r--r-- | hjoin.cpp | 93 |
2 files changed, 101 insertions, 1 deletions
@@ -3,6 +3,13 @@ R := ../.. include $R/common.mk endif +NTBIN := hjoin +NTLIB := -lboost_iostreams + +$W/%: $W/%.cpp + @/bin/echo -e "GCC\t$@" + @$(CXX) $(NTLIB) -o $@ $^ + rec_clean :: $W/clean $W/clean: - rm -f $(@D)/*~ + cd $(@D) && rm -f *~ $(NTBIN) diff --git a/hjoin.cpp b/hjoin.cpp new file mode 100644 index 0000000..7e46819 --- /dev/null +++ b/hjoin.cpp @@ -0,0 +1,93 @@ +#include <map> +#include <array> +#include <iostream> +#include <boost/iostreams/device/file_descriptor.hpp> +#include <boost/iostreams/stream_buffer.hpp> +#include <boost/iostreams/stream.hpp> +#include <queue> + +using namespace std; +using boost::iostreams::file_descriptor_source; +using boost::iostreams::stream_buffer; +using boost::iostreams::stream; +using boost::iostreams::file_descriptor_flags; + +typedef stream<file_descriptor_source> Src; +typedef tuple<Src*, int, string, string> PqP; +struct Cmp { + bool operator() (PqP &x1, PqP &x2) { + if (get<2>(x1) > get<2>(x2)) return true; + if (get<2>(x1) < get<2>(x2)) return false; + return get<1>(x1) > get<1>(x2); + } +}; + +int main(int argc, char **argv) { + // the fds we use as inputs; externally supplied + auto fds = {3, 4}; + // in-line delimiters; first one is used for output + array<char, 3> delim = {'\t', ' ', 0}; + // default cell value if none is supplied on a line + string cellDef = ""; + if (argc == 2) + cellDef = argv[1]; + + priority_queue<PqP, vector<PqP>, Cmp> queue; + auto tryRead = [&](Src *src, int fd) { + array<char, 1000> str; + src->getline(str.data(), str.size()); + if (*src) { + char *delimPos = find_first_of(str.begin(), str.end(), delim.begin(), delim.end()); + queue.emplace(src, fd, + string(str.data(), delimPos - str.data()), + string(*delimPos ? (delimPos+1) : delimPos)); + }else{ + delete src; + } + }; + + // init queue with new streams + for (auto fd : fds) + tryRead(new Src(file_descriptor_source(fd, boost::iostreams::close_handle)), fd); + + // iterate over prio queue to print joined strings + string curKey(""); + map<int, string> curVals; + auto maybeFlushLine = [&]() { + if (curVals.empty()) + return; + + cout << curKey; + for (auto fd : fds) { + cout << delim[0]; + if (curVals.count(fd)) { + cout << curVals[fd]; + }else{ + cout << cellDef; + } + } + cout << endl; + }; + while (!queue.empty()) { + PqP cur = queue.top(); + queue.pop(); + + // start next line if we see a new key the first time + if (get<2>(cur) != curKey) { + maybeFlushLine(); + curKey = get<2>(cur); + curVals.clear(); + } + + // add value to line buffer + auto fd = get<1>(cur); + assert(!curVals.count(fd)); + curVals[fd] = get<3>(cur); + + // read next element from the file we just processed a line from + tryRead(get<0>(cur), get<1>(cur)); + } + maybeFlushLine(); + + return EXIT_SUCCESS; +} |