summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Huwald <jh@sotun.de>2013-01-28 16:33:06 (GMT)
committerJan Huwald <jh@sotun.de>2013-01-28 16:33:06 (GMT)
commitd606b9cce4d010b145cd18b93b8b902a4db76343 (patch)
tree3a33abdc871bbb2dcf09977e98617fb6748e4857
parentb02e9b19e76245052762b7e44d45d12bae83e76e (diff)
add hjoin
-rw-r--r--Makefile9
-rw-r--r--hjoin.cpp93
2 files changed, 101 insertions, 1 deletions
diff --git a/Makefile b/Makefile
index 98912d7..24b9395 100644
--- a/Makefile
+++ b/Makefile
@@ -3,6 +3,13 @@ R := ../..
include $R/common.mk
endif
+NTBIN := hjoin
+NTLIB := -lboost_iostreams
+
+$W/%: $W/%.cpp
+ @/bin/echo -e "GCC\t$@"
+ @$(CXX) $(NTLIB) -o $@ $^
+
rec_clean :: $W/clean
$W/clean:
- rm -f $(@D)/*~
+ cd $(@D) && rm -f *~ $(NTBIN)
diff --git a/hjoin.cpp b/hjoin.cpp
new file mode 100644
index 0000000..7e46819
--- /dev/null
+++ b/hjoin.cpp
@@ -0,0 +1,93 @@
+#include <map>
+#include <array>
+#include <iostream>
+#include <boost/iostreams/device/file_descriptor.hpp>
+#include <boost/iostreams/stream_buffer.hpp>
+#include <boost/iostreams/stream.hpp>
+#include <queue>
+
+using namespace std;
+using boost::iostreams::file_descriptor_source;
+using boost::iostreams::stream_buffer;
+using boost::iostreams::stream;
+using boost::iostreams::file_descriptor_flags;
+
+typedef stream<file_descriptor_source> Src;
+typedef tuple<Src*, int, string, string> PqP;
+struct Cmp {
+ bool operator() (PqP &x1, PqP &x2) {
+ if (get<2>(x1) > get<2>(x2)) return true;
+ if (get<2>(x1) < get<2>(x2)) return false;
+ return get<1>(x1) > get<1>(x2);
+ }
+};
+
+int main(int argc, char **argv) {
+ // the fds we use as inputs; externally supplied
+ auto fds = {3, 4};
+ // in-line delimiters; first one is used for output
+ array<char, 3> delim = {'\t', ' ', 0};
+ // default cell value if none is supplied on a line
+ string cellDef = "";
+ if (argc == 2)
+ cellDef = argv[1];
+
+ priority_queue<PqP, vector<PqP>, Cmp> queue;
+ auto tryRead = [&](Src *src, int fd) {
+ array<char, 1000> str;
+ src->getline(str.data(), str.size());
+ if (*src) {
+ char *delimPos = find_first_of(str.begin(), str.end(), delim.begin(), delim.end());
+ queue.emplace(src, fd,
+ string(str.data(), delimPos - str.data()),
+ string(*delimPos ? (delimPos+1) : delimPos));
+ }else{
+ delete src;
+ }
+ };
+
+ // init queue with new streams
+ for (auto fd : fds)
+ tryRead(new Src(file_descriptor_source(fd, boost::iostreams::close_handle)), fd);
+
+ // iterate over prio queue to print joined strings
+ string curKey("");
+ map<int, string> curVals;
+ auto maybeFlushLine = [&]() {
+ if (curVals.empty())
+ return;
+
+ cout << curKey;
+ for (auto fd : fds) {
+ cout << delim[0];
+ if (curVals.count(fd)) {
+ cout << curVals[fd];
+ }else{
+ cout << cellDef;
+ }
+ }
+ cout << endl;
+ };
+ while (!queue.empty()) {
+ PqP cur = queue.top();
+ queue.pop();
+
+ // start next line if we see a new key the first time
+ if (get<2>(cur) != curKey) {
+ maybeFlushLine();
+ curKey = get<2>(cur);
+ curVals.clear();
+ }
+
+ // add value to line buffer
+ auto fd = get<1>(cur);
+ assert(!curVals.count(fd));
+ curVals[fd] = get<3>(cur);
+
+ // read next element from the file we just processed a line from
+ tryRead(get<0>(cur), get<1>(cur));
+ }
+ maybeFlushLine();
+
+ return EXIT_SUCCESS;
+}
contact: Jan Huwald // Impressum