summaryrefslogtreecommitdiff
path: root/hjoin.cpp
blob: 7e46819119eb204856feec7604df5713bbb4a497 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <map>
#include <array>
#include <iostream>
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream_buffer.hpp>
#include <boost/iostreams/stream.hpp> 
#include <queue>

using namespace std;
using boost::iostreams::file_descriptor_source;
using boost::iostreams::stream_buffer;
using boost::iostreams::stream;
using boost::iostreams::file_descriptor_flags;

typedef stream<file_descriptor_source> Src;
typedef tuple<Src*, int, string, string> PqP;
struct Cmp {
  bool operator() (PqP &x1, PqP &x2) {
    if (get<2>(x1) > get<2>(x2)) return true;
    if (get<2>(x1) < get<2>(x2)) return false;
    return get<1>(x1) > get<1>(x2);
  }  
};

int main(int argc, char **argv) {
  // the fds we use as inputs; externally supplied
  auto fds = {3, 4}; 
  // in-line delimiters; first one is used for output
  array<char, 3> delim = {'\t', ' ', 0};
  // default cell value if none is supplied on a line
  string cellDef = "";
  if (argc == 2)
    cellDef = argv[1];

  priority_queue<PqP, vector<PqP>, Cmp> queue;
  auto tryRead = [&](Src *src, int fd) {
    array<char, 1000> str;
    src->getline(str.data(), str.size());
    if (*src) {
      char *delimPos = find_first_of(str.begin(), str.end(), delim.begin(), delim.end());
      queue.emplace(src, fd,
		    string(str.data(), delimPos - str.data()),
	            string(*delimPos ? (delimPos+1) : delimPos));
    }else{
      delete src;
    }
  };

  // init queue with new streams
  for (auto fd : fds)
      tryRead(new Src(file_descriptor_source(fd, boost::iostreams::close_handle)), fd);

  // iterate over prio queue to print joined strings
  string curKey("");
  map<int, string> curVals;
  auto maybeFlushLine = [&]() {
    if (curVals.empty())
      return;

    cout << curKey;
    for (auto fd : fds) {
      cout << delim[0];
      if (curVals.count(fd)) {
	cout << curVals[fd];
      }else{
	cout << cellDef;
      }
    }
    cout << endl;
  };
  while (!queue.empty()) {
    PqP cur = queue.top();
    queue.pop();

    // start next line if we see a new key the first time
    if (get<2>(cur) != curKey) {
      maybeFlushLine();
      curKey = get<2>(cur);
      curVals.clear();
    }

    // add value to line buffer
    auto fd = get<1>(cur);
    assert(!curVals.count(fd));
    curVals[fd] = get<3>(cur);
    
    // read next element from the file we just processed a line from
    tryRead(get<0>(cur), get<1>(cur));
  }
  maybeFlushLine();

  return EXIT_SUCCESS;
}
contact: Jan Huwald // Impressum