From d606b9cce4d010b145cd18b93b8b902a4db76343 Mon Sep 17 00:00:00 2001 From: Jan Huwald Date: Mon, 28 Jan 2013 17:33:06 +0100 Subject: add hjoin diff --git a/Makefile b/Makefile index 98912d7..24b9395 100644 --- a/Makefile +++ b/Makefile @@ -3,6 +3,13 @@ R := ../.. include $R/common.mk endif +NTBIN := hjoin +NTLIB := -lboost_iostreams + +$W/%: $W/%.cpp + @/bin/echo -e "GCC\t$@" + @$(CXX) $(NTLIB) -o $@ $^ + rec_clean :: $W/clean $W/clean: - rm -f $(@D)/*~ + cd $(@D) && rm -f *~ $(NTBIN) diff --git a/hjoin.cpp b/hjoin.cpp new file mode 100644 index 0000000..7e46819 --- /dev/null +++ b/hjoin.cpp @@ -0,0 +1,93 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using boost::iostreams::file_descriptor_source; +using boost::iostreams::stream_buffer; +using boost::iostreams::stream; +using boost::iostreams::file_descriptor_flags; + +typedef stream Src; +typedef tuple PqP; +struct Cmp { + bool operator() (PqP &x1, PqP &x2) { + if (get<2>(x1) > get<2>(x2)) return true; + if (get<2>(x1) < get<2>(x2)) return false; + return get<1>(x1) > get<1>(x2); + } +}; + +int main(int argc, char **argv) { + // the fds we use as inputs; externally supplied + auto fds = {3, 4}; + // in-line delimiters; first one is used for output + array delim = {'\t', ' ', 0}; + // default cell value if none is supplied on a line + string cellDef = ""; + if (argc == 2) + cellDef = argv[1]; + + priority_queue, Cmp> queue; + auto tryRead = [&](Src *src, int fd) { + array str; + src->getline(str.data(), str.size()); + if (*src) { + char *delimPos = find_first_of(str.begin(), str.end(), delim.begin(), delim.end()); + queue.emplace(src, fd, + string(str.data(), delimPos - str.data()), + string(*delimPos ? (delimPos+1) : delimPos)); + }else{ + delete src; + } + }; + + // init queue with new streams + for (auto fd : fds) + tryRead(new Src(file_descriptor_source(fd, boost::iostreams::close_handle)), fd); + + // iterate over prio queue to print joined strings + string curKey(""); + map curVals; + auto maybeFlushLine = [&]() { + if (curVals.empty()) + return; + + cout << curKey; + for (auto fd : fds) { + cout << delim[0]; + if (curVals.count(fd)) { + cout << curVals[fd]; + }else{ + cout << cellDef; + } + } + cout << endl; + }; + while (!queue.empty()) { + PqP cur = queue.top(); + queue.pop(); + + // start next line if we see a new key the first time + if (get<2>(cur) != curKey) { + maybeFlushLine(); + curKey = get<2>(cur); + curVals.clear(); + } + + // add value to line buffer + auto fd = get<1>(cur); + assert(!curVals.count(fd)); + curVals[fd] = get<3>(cur); + + // read next element from the file we just processed a line from + tryRead(get<0>(cur), get<1>(cur)); + } + maybeFlushLine(); + + return EXIT_SUCCESS; +} -- cgit v0.10.1